/ remote / SessionsWebSocket.ts
SessionsWebSocket.ts
  1  import { randomUUID } from 'crypto'
  2  import { getOauthConfig } from '../constants/oauth.js'
  3  import type { SDKMessage } from '../entrypoints/agentSdkTypes.js'
  4  import type {
  5    SDKControlCancelRequest,
  6    SDKControlRequest,
  7    SDKControlRequestInner,
  8    SDKControlResponse,
  9  } from '../entrypoints/sdk/controlTypes.js'
 10  import { logForDebugging } from '../utils/debug.js'
 11  import { errorMessage } from '../utils/errors.js'
 12  import { logError } from '../utils/log.js'
 13  import { getWebSocketTLSOptions } from '../utils/mtls.js'
 14  import { getWebSocketProxyAgent, getWebSocketProxyUrl } from '../utils/proxy.js'
 15  import { jsonParse, jsonStringify } from '../utils/slowOperations.js'
 16  
 17  const RECONNECT_DELAY_MS = 2000
 18  const MAX_RECONNECT_ATTEMPTS = 5
 19  const PING_INTERVAL_MS = 30000
 20  
 21  /**
 22   * Maximum retries for 4001 (session not found). During compaction the
 23   * server may briefly consider the session stale; a short retry window
 24   * lets the client recover without giving up permanently.
 25   */
 26  const MAX_SESSION_NOT_FOUND_RETRIES = 3
 27  
 28  /**
 29   * WebSocket close codes that indicate a permanent server-side rejection.
 30   * The client stops reconnecting immediately.
 31   * Note: 4001 (session not found) is handled separately with limited
 32   * retries since it can be transient during compaction.
 33   */
 34  const PERMANENT_CLOSE_CODES = new Set([
 35    4003, // unauthorized
 36  ])
 37  
 38  type WebSocketState = 'connecting' | 'connected' | 'closed'
 39  
 40  type SessionsMessage =
 41    | SDKMessage
 42    | SDKControlRequest
 43    | SDKControlResponse
 44    | SDKControlCancelRequest
 45  
 46  function isSessionsMessage(value: unknown): value is SessionsMessage {
 47    if (typeof value !== 'object' || value === null || !('type' in value)) {
 48      return false
 49    }
 50    // Accept any message with a string `type` field. Downstream handlers
 51    // (sdkMessageAdapter, RemoteSessionManager) decide what to do with
 52    // unknown types. A hardcoded allowlist here would silently drop new
 53    // message types the backend starts sending before the client is updated.
 54    return typeof value.type === 'string'
 55  }
 56  
 57  export type SessionsWebSocketCallbacks = {
 58    onMessage: (message: SessionsMessage) => void
 59    onClose?: () => void
 60    onError?: (error: Error) => void
 61    onConnected?: () => void
 62    /** Fired when a transient close is detected and a reconnect is scheduled.
 63     *  onClose fires only for permanent close (server ended / attempts exhausted). */
 64    onReconnecting?: () => void
 65  }
 66  
 67  // Common interface between globalThis.WebSocket and ws.WebSocket
 68  type WebSocketLike = {
 69    close(): void
 70    send(data: string): void
 71    ping?(): void // Bun & ws both support this
 72  }
 73  
 74  /**
 75   * WebSocket client for connecting to CCR sessions via /v1/sessions/ws/{id}/subscribe
 76   *
 77   * Protocol:
 78   * 1. Connect to wss://api.anthropic.com/v1/sessions/ws/{sessionId}/subscribe?organization_uuid=...
 79   * 2. Send auth message: { type: 'auth', credential: { type: 'oauth', token: '...' } }
 80   * 3. Receive SDKMessage stream from the session
 81   */
 82  export class SessionsWebSocket {
 83    private ws: WebSocketLike | null = null
 84    private state: WebSocketState = 'closed'
 85    private reconnectAttempts = 0
 86    private sessionNotFoundRetries = 0
 87    private pingInterval: NodeJS.Timeout | null = null
 88    private reconnectTimer: NodeJS.Timeout | null = null
 89  
 90    constructor(
 91      private readonly sessionId: string,
 92      private readonly orgUuid: string,
 93      private readonly getAccessToken: () => string,
 94      private readonly callbacks: SessionsWebSocketCallbacks,
 95    ) {}
 96  
 97    /**
 98     * Connect to the sessions WebSocket endpoint
 99     */
100    async connect(): Promise<void> {
101      if (this.state === 'connecting') {
102        logForDebugging('[SessionsWebSocket] Already connecting')
103        return
104      }
105  
106      this.state = 'connecting'
107  
108      const baseUrl = getOauthConfig().BASE_API_URL.replace('https://', 'wss://')
109      const url = `${baseUrl}/v1/sessions/ws/${this.sessionId}/subscribe?organization_uuid=${this.orgUuid}`
110  
111      logForDebugging(`[SessionsWebSocket] Connecting to ${url}`)
112  
113      // Get fresh token for each connection attempt
114      const accessToken = this.getAccessToken()
115      const headers = {
116        Authorization: `Bearer ${accessToken}`,
117        'anthropic-version': '2023-06-01',
118      }
119  
120      if (typeof Bun !== 'undefined') {
121        // Bun's WebSocket supports headers/proxy options but the DOM typings don't
122        // eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins
123        const ws = new globalThis.WebSocket(url, {
124          headers,
125          proxy: getWebSocketProxyUrl(url),
126          tls: getWebSocketTLSOptions() || undefined,
127        } as unknown as string[])
128        this.ws = ws
129  
130        ws.addEventListener('open', () => {
131          logForDebugging(
132            '[SessionsWebSocket] Connection opened, authenticated via headers',
133          )
134          this.state = 'connected'
135          this.reconnectAttempts = 0
136          this.sessionNotFoundRetries = 0
137          this.startPingInterval()
138          this.callbacks.onConnected?.()
139        })
140  
141        ws.addEventListener('message', (event: MessageEvent) => {
142          const data =
143            typeof event.data === 'string' ? event.data : String(event.data)
144          this.handleMessage(data)
145        })
146  
147        ws.addEventListener('error', () => {
148          const err = new Error('[SessionsWebSocket] WebSocket error')
149          logError(err)
150          this.callbacks.onError?.(err)
151        })
152  
153        // eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins
154        ws.addEventListener('close', (event: CloseEvent) => {
155          logForDebugging(
156            `[SessionsWebSocket] Closed: code=${event.code} reason=${event.reason}`,
157          )
158          this.handleClose(event.code)
159        })
160  
161        ws.addEventListener('pong', () => {
162          logForDebugging('[SessionsWebSocket] Pong received')
163        })
164      } else {
165        const { default: WS } = await import('ws')
166        const ws = new WS(url, {
167          headers,
168          agent: getWebSocketProxyAgent(url),
169          ...getWebSocketTLSOptions(),
170        })
171        this.ws = ws
172  
173        ws.on('open', () => {
174          logForDebugging(
175            '[SessionsWebSocket] Connection opened, authenticated via headers',
176          )
177          // Auth is handled via headers, so we're immediately connected
178          this.state = 'connected'
179          this.reconnectAttempts = 0
180          this.sessionNotFoundRetries = 0
181          this.startPingInterval()
182          this.callbacks.onConnected?.()
183        })
184  
185        ws.on('message', (data: Buffer) => {
186          this.handleMessage(data.toString())
187        })
188  
189        ws.on('error', (err: Error) => {
190          logError(new Error(`[SessionsWebSocket] Error: ${err.message}`))
191          this.callbacks.onError?.(err)
192        })
193  
194        ws.on('close', (code: number, reason: Buffer) => {
195          logForDebugging(
196            `[SessionsWebSocket] Closed: code=${code} reason=${reason.toString()}`,
197          )
198          this.handleClose(code)
199        })
200  
201        ws.on('pong', () => {
202          logForDebugging('[SessionsWebSocket] Pong received')
203        })
204      }
205    }
206  
207    /**
208     * Handle incoming WebSocket message
209     */
210    private handleMessage(data: string): void {
211      try {
212        const message: unknown = jsonParse(data)
213  
214        // Forward SDK messages to callback
215        if (isSessionsMessage(message)) {
216          this.callbacks.onMessage(message)
217        } else {
218          logForDebugging(
219            `[SessionsWebSocket] Ignoring message type: ${typeof message === 'object' && message !== null && 'type' in message ? String(message.type) : 'unknown'}`,
220          )
221        }
222      } catch (error) {
223        logError(
224          new Error(
225            `[SessionsWebSocket] Failed to parse message: ${errorMessage(error)}`,
226          ),
227        )
228      }
229    }
230  
231    /**
232     * Handle WebSocket close
233     */
234    private handleClose(closeCode: number): void {
235      this.stopPingInterval()
236  
237      if (this.state === 'closed') {
238        return
239      }
240  
241      this.ws = null
242  
243      const previousState = this.state
244      this.state = 'closed'
245  
246      // Permanent codes: stop reconnecting — server has definitively ended the session
247      if (PERMANENT_CLOSE_CODES.has(closeCode)) {
248        logForDebugging(
249          `[SessionsWebSocket] Permanent close code ${closeCode}, not reconnecting`,
250        )
251        this.callbacks.onClose?.()
252        return
253      }
254  
255      // 4001 (session not found) can be transient during compaction: the
256      // server may briefly consider the session stale while the CLI worker
257      // is busy with the compaction API call and not emitting events.
258      if (closeCode === 4001) {
259        this.sessionNotFoundRetries++
260        if (this.sessionNotFoundRetries > MAX_SESSION_NOT_FOUND_RETRIES) {
261          logForDebugging(
262            `[SessionsWebSocket] 4001 retry budget exhausted (${MAX_SESSION_NOT_FOUND_RETRIES}), not reconnecting`,
263          )
264          this.callbacks.onClose?.()
265          return
266        }
267        this.scheduleReconnect(
268          RECONNECT_DELAY_MS * this.sessionNotFoundRetries,
269          `4001 attempt ${this.sessionNotFoundRetries}/${MAX_SESSION_NOT_FOUND_RETRIES}`,
270        )
271        return
272      }
273  
274      // Attempt reconnection if we were connected
275      if (
276        previousState === 'connected' &&
277        this.reconnectAttempts < MAX_RECONNECT_ATTEMPTS
278      ) {
279        this.reconnectAttempts++
280        this.scheduleReconnect(
281          RECONNECT_DELAY_MS,
282          `attempt ${this.reconnectAttempts}/${MAX_RECONNECT_ATTEMPTS}`,
283        )
284      } else {
285        logForDebugging('[SessionsWebSocket] Not reconnecting')
286        this.callbacks.onClose?.()
287      }
288    }
289  
290    private scheduleReconnect(delay: number, label: string): void {
291      this.callbacks.onReconnecting?.()
292      logForDebugging(
293        `[SessionsWebSocket] Scheduling reconnect (${label}) in ${delay}ms`,
294      )
295      this.reconnectTimer = setTimeout(() => {
296        this.reconnectTimer = null
297        void this.connect()
298      }, delay)
299    }
300  
301    private startPingInterval(): void {
302      this.stopPingInterval()
303  
304      this.pingInterval = setInterval(() => {
305        if (this.ws && this.state === 'connected') {
306          try {
307            this.ws.ping?.()
308          } catch {
309            // Ignore ping errors, close handler will deal with connection issues
310          }
311        }
312      }, PING_INTERVAL_MS)
313    }
314  
315    /**
316     * Stop ping interval
317     */
318    private stopPingInterval(): void {
319      if (this.pingInterval) {
320        clearInterval(this.pingInterval)
321        this.pingInterval = null
322      }
323    }
324  
325    /**
326     * Send a control response back to the session
327     */
328    sendControlResponse(response: SDKControlResponse): void {
329      if (!this.ws || this.state !== 'connected') {
330        logError(new Error('[SessionsWebSocket] Cannot send: not connected'))
331        return
332      }
333  
334      logForDebugging('[SessionsWebSocket] Sending control response')
335      this.ws.send(jsonStringify(response))
336    }
337  
338    /**
339     * Send a control request to the session (e.g., interrupt)
340     */
341    sendControlRequest(request: SDKControlRequestInner): void {
342      if (!this.ws || this.state !== 'connected') {
343        logError(new Error('[SessionsWebSocket] Cannot send: not connected'))
344        return
345      }
346  
347      const controlRequest: SDKControlRequest = {
348        type: 'control_request',
349        request_id: randomUUID(),
350        request,
351      }
352  
353      logForDebugging(
354        `[SessionsWebSocket] Sending control request: ${request.subtype}`,
355      )
356      this.ws.send(jsonStringify(controlRequest))
357    }
358  
359    /**
360     * Check if connected
361     */
362    isConnected(): boolean {
363      return this.state === 'connected'
364    }
365  
366    /**
367     * Close the WebSocket connection
368     */
369    close(): void {
370      logForDebugging('[SessionsWebSocket] Closing connection')
371      this.state = 'closed'
372      this.stopPingInterval()
373  
374      if (this.reconnectTimer) {
375        clearTimeout(this.reconnectTimer)
376        this.reconnectTimer = null
377      }
378  
379      if (this.ws) {
380        // Null out event handlers to prevent race conditions during reconnect.
381        // Under Bun (native WebSocket), onX handlers are the clean way to detach.
382        // Under Node (ws package), the listeners were attached with .on() in connect(),
383        // but since we're about to close and null out this.ws, no cleanup is needed.
384        this.ws.close()
385        this.ws = null
386      }
387    }
388  
389    /**
390     * Force reconnect - closes existing connection and establishes a new one.
391     * Useful when the subscription becomes stale (e.g., after container shutdown).
392     */
393    reconnect(): void {
394      logForDebugging('[SessionsWebSocket] Force reconnecting')
395      this.reconnectAttempts = 0
396      this.sessionNotFoundRetries = 0
397      this.close()
398      // Small delay before reconnecting (stored in reconnectTimer so it can be cancelled)
399      this.reconnectTimer = setTimeout(() => {
400        this.reconnectTimer = null
401        void this.connect()
402      }, 500)
403    }
404  }