This commit is contained in:
nirholas
2026-03-31 12:36:42 +00:00
parent 38648ae5f4
commit da6c5e1ed7
10 changed files with 929 additions and 140 deletions

View File

@@ -1,50 +1,54 @@
import type { IPty } from "node-pty";
import type { WebSocket } from "ws";
import { SessionStore } from "./session-store.js";
export type Session = {
id: string;
ws: WebSocket;
pty: IPty;
createdAt: number;
};
export type { SessionInfo } from "./session-store.js";
export class SessionManager {
private sessions = new Map<string, Session>();
private store: SessionStore;
private maxSessions: number;
private spawnPty: (cols: number, rows: number) => IPty;
// Tracks which sessions have already had their PTY event listeners wired,
// so we don't double-register on reconnect.
private wiredPtys = new Set<string>();
constructor(
maxSessions: number,
spawnPty: (cols: number, rows: number) => IPty,
gracePeriodMs?: number,
scrollbackBytes?: number,
) {
this.maxSessions = maxSessions;
this.spawnPty = spawnPty;
this.store = new SessionStore(gracePeriodMs, scrollbackBytes);
}
get activeCount(): number {
return this.sessions.size;
return this.store.size;
}
get isFull(): boolean {
return this.sessions.size >= this.maxSessions;
return this.store.size >= this.maxSessions;
}
getSession(id: string): Session | undefined {
return this.sessions.get(id);
getSession(token: string) {
return this.store.get(token);
}
listSessions() {
return this.store.list();
}
/**
* Creates a new PTY session bound to the given WebSocket.
* Returns the session or null if at capacity.
* Spawns a new PTY, registers it in the session store, and wires up all
* event plumbing between the PTY and the WebSocket.
*
* Returns the session token, or null if at capacity or PTY spawn fails.
*/
create(ws: WebSocket, cols = 80, rows = 24): Session | null {
if (this.isFull) {
return null;
}
create(ws: WebSocket, cols = 80, rows = 24): string | null {
if (this.isFull) return null;
const id = crypto.randomUUID();
let pty: IPty;
try {
pty = this.spawnPty(cols, rows);
} catch (err) {
@@ -57,39 +61,102 @@ export class SessionManager {
return null;
}
const session: Session = { id, ws, pty, createdAt: Date.now() };
this.sessions.set(id, session);
const session = this.store.register(pty);
session.ws = ws;
const { token } = session;
this.wirePtyEvents(token, pty);
this.wireWsEvents(token, ws, pty);
console.log(
`[session ${token.slice(0, 8)}] Created (active: ${this.store.size}/${this.maxSessions})`,
);
return token;
}
/**
* Attaches a new WebSocket to an existing session identified by `token`.
*
* - Cancels the grace timer
* - Sends `{ type: "resumed", token }` to the client
* - Replays the scrollback buffer so the user sees their conversation
* - Resizes the PTY to the client's current terminal dimensions
*
* Returns true if the session was found, false otherwise.
*/
resume(token: string, ws: WebSocket, cols: number, rows: number): boolean {
const session = this.store.reattach(token, ws);
if (!session) return false;
console.log(
`[session ${token.slice(0, 8)}] Resumed (active: ${this.store.size}/${this.maxSessions})`,
);
// Tell the client it's a resumed session BEFORE sending scrollback bytes.
// The client uses this to clear the terminal first.
ws.send(JSON.stringify({ type: "resumed", token }));
// Replay buffered output
const scrollback = session.scrollback.read();
if (scrollback.length > 0) {
ws.send(scrollback);
}
// Sync PTY dimensions to the reconnected client
try {
session.pty.resize(cols, rows);
} catch {
// PTY may have exited
}
this.wireWsEvents(token, ws, session.pty);
return true;
}
/**
* Wire PTY → scrollback + WebSocket.
* Called once per session lifetime (idempotent via `wiredPtys` guard).
*/
private wirePtyEvents(token: string, pty: IPty): void {
if (this.wiredPtys.has(token)) return;
this.wiredPtys.add(token);
const session = this.store.get(token);
if (!session) return;
// PTY output -> WebSocket
pty.onData((data: string) => {
if (ws.readyState === ws.OPEN) {
// Always capture to scrollback for future replay
session.scrollback.write(data);
// Forward to the currently attached WebSocket, if any
const ws = session.ws;
if (ws && ws.readyState === 1 /* OPEN */) {
ws.send(data);
}
});
// PTY exit -> clean up
pty.onExit(({ exitCode, signal }) => {
this.wiredPtys.delete(token);
console.log(
`[session ${id}] PTY exited: code=${exitCode}, signal=${signal}`,
`[session ${token.slice(0, 8)}] PTY exited: code=${exitCode}, signal=${signal}`,
);
this.sessions.delete(id);
if (ws.readyState === ws.OPEN) {
ws.send(
JSON.stringify({
type: "exit",
exitCode,
signal,
}),
);
const ws = session.ws;
if (ws && ws.readyState === 1 /* OPEN */) {
ws.send(JSON.stringify({ type: "exit", exitCode, signal }));
ws.close(1000, "PTY exited");
}
this.store.destroy(token);
});
}
// WebSocket messages -> PTY stdin (or resize)
/**
* Wire WebSocket → PTY (input, resize, ping).
* On close/error, start the grace period instead of immediately destroying
* the session — this keeps the PTY alive for reconnection.
* Called once per WebSocket connection (safe to call again on reconnect).
*/
private wireWsEvents(token: string, ws: WebSocket, pty: IPty): void {
ws.on("message", (data: Buffer | string) => {
const str = data.toString();
// Try to parse as JSON for control messages
if (str.startsWith("{")) {
try {
const msg = JSON.parse(str) as Record<string, unknown>;
@@ -102,75 +169,44 @@ export class SessionManager {
return;
}
if (msg.type === "ping") {
if (ws.readyState === ws.OPEN) {
if (ws.readyState === 1 /* OPEN */) {
ws.send(JSON.stringify({ type: "pong" }));
}
return;
}
} catch {
// Not JSON, treat as terminal input
// Not JSON treat as terminal input
}
}
pty.write(str);
});
// WebSocket close -> kill PTY
ws.on("close", () => {
console.log(`[session ${id}] WebSocket closed`);
this.destroySession(id);
});
ws.on("error", (err) => {
console.error(`[session ${id}] WebSocket error:`, err.message);
this.destroySession(id);
});
console.log(
`[session ${id}] Created (active: ${this.sessions.size}/${this.maxSessions})`,
);
return session;
}
/**
* Gracefully destroys a session: SIGHUP, then SIGKILL after timeout.
*/
destroySession(id: string): void {
const session = this.sessions.get(id);
if (!session) return;
this.sessions.delete(id);
const { pty, ws } = session;
try {
pty.kill("SIGHUP");
} catch {
// PTY may already be dead
}
// Force kill after 5 seconds if still alive
const killTimer = setTimeout(() => {
try {
pty.kill("SIGKILL");
} catch {
// Already dead
const handleClose = () => {
console.log(`[session ${token.slice(0, 8)}] WebSocket closed`);
const session = this.store.get(token);
// Only start grace if this WS is still the one attached to the session
if (session && session.ws === ws) {
this.store.startGrace(token, () => {
/* logged inside startGrace */
});
}
}, 5000);
};
// If PTY exits before the timer, clear it
pty.onExit(() => clearTimeout(killTimer));
if (ws.readyState === ws.OPEN || ws.readyState === ws.CONNECTING) {
ws.close(1000, "Session destroyed");
}
ws.on("close", handleClose);
ws.on("error", (err) => {
console.error(`[session ${token.slice(0, 8)}] WebSocket error:`, err.message);
handleClose();
});
}
/**
* Destroys all sessions. Used during server shutdown.
* Force-kill a session immediately (used by the REST API).
*/
destroySession(token: string): void {
this.store.destroy(token);
}
destroyAll(): void {
for (const id of [...this.sessions.keys()]) {
this.destroySession(id);
}
this.store.destroyAll();
}
}