/ cli / transports / WebSocketTransport.ts
WebSocketTransport.ts
  1  import type { StdoutMessage } from 'src/entrypoints/sdk/controlTypes.js'
  2  import type WsWebSocket from 'ws'
  3  import { logEvent } from '../../services/analytics/index.js'
  4  import { CircularBuffer } from '../../utils/CircularBuffer.js'
  5  import { logForDebugging } from '../../utils/debug.js'
  6  import { logForDiagnosticsNoPII } from '../../utils/diagLogs.js'
  7  import { isEnvTruthy } from '../../utils/envUtils.js'
  8  import { getWebSocketTLSOptions } from '../../utils/mtls.js'
  9  import {
 10    getWebSocketProxyAgent,
 11    getWebSocketProxyUrl,
 12  } from '../../utils/proxy.js'
 13  import {
 14    registerSessionActivityCallback,
 15    unregisterSessionActivityCallback,
 16  } from '../../utils/sessionActivity.js'
 17  import { jsonStringify } from '../../utils/slowOperations.js'
 18  import type { Transport } from './Transport.js'
 19  
 20  const KEEP_ALIVE_FRAME = '{"type":"keep_alive"}\n'
 21  
 22  const DEFAULT_MAX_BUFFER_SIZE = 1000
 23  const DEFAULT_BASE_RECONNECT_DELAY = 1000
 24  const DEFAULT_MAX_RECONNECT_DELAY = 30000
 25  /** Time budget for reconnection attempts before giving up (10 minutes). */
 26  const DEFAULT_RECONNECT_GIVE_UP_MS = 600_000
 27  const DEFAULT_PING_INTERVAL = 10000
 28  const DEFAULT_KEEPALIVE_INTERVAL = 300_000 // 5 minutes
 29  
 30  /**
 31   * Threshold for detecting system sleep/wake. If the gap between consecutive
 32   * reconnection attempts exceeds this, the machine likely slept. We reset
 33   * the reconnection budget and retry — the server will reject with permanent
 34   * close codes (4001/1002) if the session was reaped during sleep.
 35   */
 36  const SLEEP_DETECTION_THRESHOLD_MS = DEFAULT_MAX_RECONNECT_DELAY * 2 // 60s
 37  
 38  /**
 39   * WebSocket close codes that indicate a permanent server-side rejection.
 40   * The transport transitions to 'closed' immediately without retrying.
 41   */
 42  const PERMANENT_CLOSE_CODES = new Set([
 43    1002, // protocol error — server rejected handshake (e.g. session reaped)
 44    4001, // session expired / not found
 45    4003, // unauthorized
 46  ])
 47  
 48  export type WebSocketTransportOptions = {
 49    /** When false, the transport does not attempt automatic reconnection on
 50     *  disconnect. Use this when the caller has its own recovery mechanism
 51     *  (e.g. the REPL bridge poll loop). Defaults to true. */
 52    autoReconnect?: boolean
 53    /** Gates the tengu_ws_transport_* telemetry events. Set true at the
 54     *  REPL-bridge construction site so only Remote Control sessions (the
 55     *  Cloudflare-idle-timeout population) emit; print-mode workers stay
 56     *  silent. Defaults to false. */
 57    isBridge?: boolean
 58  }
 59  
 60  type WebSocketTransportState =
 61    | 'idle'
 62    | 'connected'
 63    | 'reconnecting'
 64    | 'closing'
 65    | 'closed'
 66  
 67  // Common interface between globalThis.WebSocket and ws.WebSocket
 68  type WebSocketLike = {
 69    close(): void
 70    send(data: string): void
 71    ping?(): void // Bun & ws both support this
 72  }
 73  
 74  export class WebSocketTransport implements Transport {
 75    private ws: WebSocketLike | null = null
 76    private lastSentId: string | null = null
 77    protected url: URL
 78    protected state: WebSocketTransportState = 'idle'
 79    protected onData?: (data: string) => void
 80    private onCloseCallback?: (closeCode?: number) => void
 81    private onConnectCallback?: () => void
 82    private headers: Record<string, string>
 83    private sessionId?: string
 84    private autoReconnect: boolean
 85    private isBridge: boolean
 86  
 87    // Reconnection state
 88    private reconnectAttempts = 0
 89    private reconnectStartTime: number | null = null
 90    private reconnectTimer: NodeJS.Timeout | null = null
 91    private lastReconnectAttemptTime: number | null = null
 92    // Wall-clock of last WS data-frame activity (inbound message or outbound
 93    // ws.send). Used to compute idle time at close — the signal for diagnosing
 94    // proxy idle-timeout RSTs (e.g. Cloudflare 5-min). Excludes ping/pong
 95    // control frames (proxies don't count those).
 96    private lastActivityTime = 0
 97  
 98    // Ping interval for connection health checks
 99    private pingInterval: NodeJS.Timeout | null = null
100    private pongReceived = true
101  
102    // Periodic keep_alive data frames to reset proxy idle timers
103    private keepAliveInterval: NodeJS.Timeout | null = null
104  
105    // Message buffering for replay on reconnection
106    private messageBuffer: CircularBuffer<StdoutMessage>
107    // Track which runtime's WS we're using so we can detach listeners
108    // with the matching API (removeEventListener vs. off).
109    private isBunWs = false
110  
111    // Captured at connect() time for handleOpenEvent timing. Stored as an
112    // instance field so the onOpen handler can be a stable class-property
113    // arrow function (removable in doDisconnect) instead of a closure over
114    // a local variable.
115    private connectStartTime = 0
116  
117    private refreshHeaders?: () => Record<string, string>
118  
119    constructor(
120      url: URL,
121      headers: Record<string, string> = {},
122      sessionId?: string,
123      refreshHeaders?: () => Record<string, string>,
124      options?: WebSocketTransportOptions,
125    ) {
126      this.url = url
127      this.headers = headers
128      this.sessionId = sessionId
129      this.refreshHeaders = refreshHeaders
130      this.autoReconnect = options?.autoReconnect ?? true
131      this.isBridge = options?.isBridge ?? false
132      this.messageBuffer = new CircularBuffer(DEFAULT_MAX_BUFFER_SIZE)
133    }
134  
135    public async connect(): Promise<void> {
136      if (this.state !== 'idle' && this.state !== 'reconnecting') {
137        logForDebugging(
138          `WebSocketTransport: Cannot connect, current state is ${this.state}`,
139          { level: 'error' },
140        )
141        logForDiagnosticsNoPII('error', 'cli_websocket_connect_failed')
142        return
143      }
144      this.state = 'reconnecting'
145  
146      this.connectStartTime = Date.now()
147      logForDebugging(`WebSocketTransport: Opening ${this.url.href}`)
148      logForDiagnosticsNoPII('info', 'cli_websocket_connect_opening')
149  
150      // Start with provided headers and add runtime headers
151      const headers = { ...this.headers }
152      if (this.lastSentId) {
153        headers['X-Last-Request-Id'] = this.lastSentId
154        logForDebugging(
155          `WebSocketTransport: Adding X-Last-Request-Id header: ${this.lastSentId}`,
156        )
157      }
158  
159      if (typeof Bun !== 'undefined') {
160        // Bun's WebSocket supports headers/proxy options but the DOM typings don't
161        // eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins
162        const ws = new globalThis.WebSocket(this.url.href, {
163          headers,
164          proxy: getWebSocketProxyUrl(this.url.href),
165          tls: getWebSocketTLSOptions() || undefined,
166        } as unknown as string[])
167        this.ws = ws
168        this.isBunWs = true
169  
170        ws.addEventListener('open', this.onBunOpen)
171        ws.addEventListener('message', this.onBunMessage)
172        ws.addEventListener('error', this.onBunError)
173        // eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins
174        ws.addEventListener('close', this.onBunClose)
175        // 'pong' is Bun-specific — not in DOM typings.
176        ws.addEventListener('pong', this.onPong)
177      } else {
178        const { default: WS } = await import('ws')
179        const ws = new WS(this.url.href, {
180          headers,
181          agent: getWebSocketProxyAgent(this.url.href),
182          ...getWebSocketTLSOptions(),
183        })
184        this.ws = ws
185        this.isBunWs = false
186  
187        ws.on('open', this.onNodeOpen)
188        ws.on('message', this.onNodeMessage)
189        ws.on('error', this.onNodeError)
190        ws.on('close', this.onNodeClose)
191        ws.on('pong', this.onPong)
192      }
193    }
194  
195    // --- Bun (native WebSocket) event handlers ---
196    // Stored as class-property arrow functions so they can be removed in
197    // doDisconnect(). Without removal, each reconnect orphans the old WS
198    // object + its 5 closures until GC, which accumulates under network
199    // instability. Mirrors the pattern in src/utils/mcpWebSocketTransport.ts.
200  
201    private onBunOpen = () => {
202      this.handleOpenEvent()
203      // Bun's WebSocket doesn't expose upgrade response headers,
204      // so replay all buffered messages. The server deduplicates by UUID.
205      if (this.lastSentId) {
206        this.replayBufferedMessages('')
207      }
208    }
209  
210    private onBunMessage = (event: MessageEvent) => {
211      const message =
212        typeof event.data === 'string' ? event.data : String(event.data)
213      this.lastActivityTime = Date.now()
214      logForDiagnosticsNoPII('info', 'cli_websocket_message_received', {
215        length: message.length,
216      })
217      if (this.onData) {
218        this.onData(message)
219      }
220    }
221  
222    private onBunError = () => {
223      logForDebugging('WebSocketTransport: Error', {
224        level: 'error',
225      })
226      logForDiagnosticsNoPII('error', 'cli_websocket_connect_error')
227      // close event fires after error — let it call handleConnectionError
228    }
229  
230    // eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins
231    private onBunClose = (event: CloseEvent) => {
232      const isClean = event.code === 1000 || event.code === 1001
233      logForDebugging(
234        `WebSocketTransport: Closed: ${event.code}`,
235        isClean ? undefined : { level: 'error' },
236      )
237      logForDiagnosticsNoPII('error', 'cli_websocket_connect_closed')
238      this.handleConnectionError(event.code)
239    }
240  
241    // --- Node (ws package) event handlers ---
242  
243    private onNodeOpen = () => {
244      // Capture ws before handleOpenEvent() invokes onConnectCallback — if the
245      // callback synchronously closes the transport, this.ws becomes null.
246      // The old inline-closure code had this safety implicitly via closure capture.
247      const ws = this.ws
248      this.handleOpenEvent()
249      if (!ws) return
250      // Check for last-id in upgrade response headers (ws package only)
251      const nws = ws as unknown as WsWebSocket & {
252        upgradeReq?: { headers?: Record<string, string> }
253      }
254      const upgradeResponse = nws.upgradeReq
255      if (upgradeResponse?.headers?.['x-last-request-id']) {
256        const serverLastId = upgradeResponse.headers['x-last-request-id']
257        this.replayBufferedMessages(serverLastId)
258      }
259    }
260  
261    private onNodeMessage = (data: Buffer) => {
262      const message = data.toString()
263      this.lastActivityTime = Date.now()
264      logForDiagnosticsNoPII('info', 'cli_websocket_message_received', {
265        length: message.length,
266      })
267      if (this.onData) {
268        this.onData(message)
269      }
270    }
271  
272    private onNodeError = (err: Error) => {
273      logForDebugging(`WebSocketTransport: Error: ${err.message}`, {
274        level: 'error',
275      })
276      logForDiagnosticsNoPII('error', 'cli_websocket_connect_error')
277      // close event fires after error — let it call handleConnectionError
278    }
279  
280    private onNodeClose = (code: number, _reason: Buffer) => {
281      const isClean = code === 1000 || code === 1001
282      logForDebugging(
283        `WebSocketTransport: Closed: ${code}`,
284        isClean ? undefined : { level: 'error' },
285      )
286      logForDiagnosticsNoPII('error', 'cli_websocket_connect_closed')
287      this.handleConnectionError(code)
288    }
289  
290    // --- Shared handlers ---
291  
292    private onPong = () => {
293      this.pongReceived = true
294    }
295  
296    private handleOpenEvent(): void {
297      const connectDuration = Date.now() - this.connectStartTime
298      logForDebugging('WebSocketTransport: Connected')
299      logForDiagnosticsNoPII('info', 'cli_websocket_connect_connected', {
300        duration_ms: connectDuration,
301      })
302  
303      // Reconnect success — capture attempt count + downtime before resetting.
304      // reconnectStartTime is null on first connect, non-null on reopen.
305      if (this.isBridge && this.reconnectStartTime !== null) {
306        logEvent('tengu_ws_transport_reconnected', {
307          attempts: this.reconnectAttempts,
308          downtimeMs: Date.now() - this.reconnectStartTime,
309        })
310      }
311  
312      this.reconnectAttempts = 0
313      this.reconnectStartTime = null
314      this.lastReconnectAttemptTime = null
315      this.lastActivityTime = Date.now()
316      this.state = 'connected'
317      this.onConnectCallback?.()
318  
319      // Start periodic pings to detect dead connections
320      this.startPingInterval()
321  
322      // Start periodic keep_alive data frames to reset proxy idle timers
323      this.startKeepaliveInterval()
324  
325      // Register callback for session activity signals
326      registerSessionActivityCallback(() => {
327        void this.write({ type: 'keep_alive' })
328      })
329    }
330  
331    protected sendLine(line: string): boolean {
332      if (!this.ws || this.state !== 'connected') {
333        logForDebugging('WebSocketTransport: Not connected')
334        logForDiagnosticsNoPII('info', 'cli_websocket_send_not_connected')
335        return false
336      }
337  
338      try {
339        this.ws.send(line)
340        this.lastActivityTime = Date.now()
341        return true
342      } catch (error) {
343        logForDebugging(`WebSocketTransport: Failed to send: ${error}`, {
344          level: 'error',
345        })
346        logForDiagnosticsNoPII('error', 'cli_websocket_send_error')
347        // Don't null this.ws here — let doDisconnect() (via handleConnectionError)
348        // handle cleanup so listeners are removed before the WS is released.
349        this.handleConnectionError()
350        return false
351      }
352    }
353  
354    /**
355     * Remove all listeners attached in connect() for the given WebSocket.
356     * Without this, each reconnect orphans the old WS object + its closures
357     * until GC — these accumulate under network instability. Mirrors the
358     * pattern in src/utils/mcpWebSocketTransport.ts.
359     */
360    private removeWsListeners(ws: WebSocketLike): void {
361      if (this.isBunWs) {
362        const nws = ws as unknown as globalThis.WebSocket
363        nws.removeEventListener('open', this.onBunOpen)
364        nws.removeEventListener('message', this.onBunMessage)
365        nws.removeEventListener('error', this.onBunError)
366        // eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins
367        nws.removeEventListener('close', this.onBunClose)
368        // 'pong' is Bun-specific — not in DOM typings
369        nws.removeEventListener('pong' as 'message', this.onPong)
370      } else {
371        const nws = ws as unknown as WsWebSocket
372        nws.off('open', this.onNodeOpen)
373        nws.off('message', this.onNodeMessage)
374        nws.off('error', this.onNodeError)
375        nws.off('close', this.onNodeClose)
376        nws.off('pong', this.onPong)
377      }
378    }
379  
380    protected doDisconnect(): void {
381      // Stop pinging and keepalive when disconnecting
382      this.stopPingInterval()
383      this.stopKeepaliveInterval()
384  
385      // Unregister session activity callback
386      unregisterSessionActivityCallback()
387  
388      if (this.ws) {
389        // Remove listeners BEFORE close() so the old WS + closures can be
390        // GC'd promptly instead of lingering until the next mark-and-sweep.
391        this.removeWsListeners(this.ws)
392        this.ws.close()
393        this.ws = null
394      }
395    }
396  
397    private handleConnectionError(closeCode?: number): void {
398      logForDebugging(
399        `WebSocketTransport: Disconnected from ${this.url.href}` +
400          (closeCode != null ? ` (code ${closeCode})` : ''),
401      )
402      logForDiagnosticsNoPII('info', 'cli_websocket_disconnected')
403      if (this.isBridge) {
404        // Fire on every close — including intermediate ones during a reconnect
405        // storm (those never surface to the onCloseCallback consumer). For the
406        // Cloudflare-5min-idle hypothesis: cluster msSinceLastActivity; if the
407        // peak sits at ~300s with closeCode 1006, that's the proxy RST.
408        logEvent('tengu_ws_transport_closed', {
409          closeCode,
410          msSinceLastActivity:
411            this.lastActivityTime > 0 ? Date.now() - this.lastActivityTime : -1,
412          // 'connected' = healthy drop (the Cloudflare case); 'reconnecting' =
413          // connect-rejection mid-storm. State isn't mutated until the branches
414          // below, so this reads the pre-close value.
415          wasConnected: this.state === 'connected',
416          reconnectAttempts: this.reconnectAttempts,
417        })
418      }
419      this.doDisconnect()
420  
421      if (this.state === 'closing' || this.state === 'closed') return
422  
423      // Permanent codes: don't retry — server has definitively ended the session.
424      // Exception: 4003 (unauthorized) can be retried when refreshHeaders is
425      // available and returns a new token (e.g. after the parent process mints
426      // a fresh session ingress token during reconnection).
427      let headersRefreshed = false
428      if (closeCode === 4003 && this.refreshHeaders) {
429        const freshHeaders = this.refreshHeaders()
430        if (freshHeaders.Authorization !== this.headers.Authorization) {
431          Object.assign(this.headers, freshHeaders)
432          headersRefreshed = true
433          logForDebugging(
434            'WebSocketTransport: 4003 received but headers refreshed, scheduling reconnect',
435          )
436          logForDiagnosticsNoPII('info', 'cli_websocket_4003_token_refreshed')
437        }
438      }
439  
440      if (
441        closeCode != null &&
442        PERMANENT_CLOSE_CODES.has(closeCode) &&
443        !headersRefreshed
444      ) {
445        logForDebugging(
446          `WebSocketTransport: Permanent close code ${closeCode}, not reconnecting`,
447          { level: 'error' },
448        )
449        logForDiagnosticsNoPII('error', 'cli_websocket_permanent_close', {
450          closeCode,
451        })
452        this.state = 'closed'
453        this.onCloseCallback?.(closeCode)
454        return
455      }
456  
457      // When autoReconnect is disabled, go straight to closed state.
458      // The caller (e.g. REPL bridge poll loop) handles recovery.
459      if (!this.autoReconnect) {
460        this.state = 'closed'
461        this.onCloseCallback?.(closeCode)
462        return
463      }
464  
465      // Schedule reconnection with exponential backoff and time budget
466      const now = Date.now()
467      if (!this.reconnectStartTime) {
468        this.reconnectStartTime = now
469      }
470  
471      // Detect system sleep/wake: if the gap since our last reconnection
472      // attempt greatly exceeds the max delay, the machine likely slept
473      // (e.g. laptop lid closed). Reset the budget and retry from scratch —
474      // the server will reject with permanent close codes (4001/1002) if
475      // the session was reaped while we were asleep.
476      if (
477        this.lastReconnectAttemptTime !== null &&
478        now - this.lastReconnectAttemptTime > SLEEP_DETECTION_THRESHOLD_MS
479      ) {
480        logForDebugging(
481          `WebSocketTransport: Detected system sleep (${Math.round((now - this.lastReconnectAttemptTime) / 1000)}s gap), resetting reconnection budget`,
482        )
483        logForDiagnosticsNoPII('info', 'cli_websocket_sleep_detected', {
484          gapMs: now - this.lastReconnectAttemptTime,
485        })
486        this.reconnectStartTime = now
487        this.reconnectAttempts = 0
488      }
489      this.lastReconnectAttemptTime = now
490  
491      const elapsed = now - this.reconnectStartTime
492      if (elapsed < DEFAULT_RECONNECT_GIVE_UP_MS) {
493        // Clear any existing reconnection timer to avoid duplicates
494        if (this.reconnectTimer) {
495          clearTimeout(this.reconnectTimer)
496          this.reconnectTimer = null
497        }
498  
499        // Refresh headers before reconnecting (e.g. to pick up a new session token).
500        // Skip if already refreshed by the 4003 path above.
501        if (!headersRefreshed && this.refreshHeaders) {
502          const freshHeaders = this.refreshHeaders()
503          Object.assign(this.headers, freshHeaders)
504          logForDebugging('WebSocketTransport: Refreshed headers for reconnect')
505        }
506  
507        this.state = 'reconnecting'
508        this.reconnectAttempts++
509  
510        const baseDelay = Math.min(
511          DEFAULT_BASE_RECONNECT_DELAY * Math.pow(2, this.reconnectAttempts - 1),
512          DEFAULT_MAX_RECONNECT_DELAY,
513        )
514        // Add ±25% jitter to avoid thundering herd
515        const delay = Math.max(
516          0,
517          baseDelay + baseDelay * 0.25 * (2 * Math.random() - 1),
518        )
519  
520        logForDebugging(
521          `WebSocketTransport: Reconnecting in ${Math.round(delay)}ms (attempt ${this.reconnectAttempts}, ${Math.round(elapsed / 1000)}s elapsed)`,
522        )
523        logForDiagnosticsNoPII('error', 'cli_websocket_reconnect_attempt', {
524          reconnectAttempts: this.reconnectAttempts,
525        })
526        if (this.isBridge) {
527          logEvent('tengu_ws_transport_reconnecting', {
528            attempt: this.reconnectAttempts,
529            elapsedMs: elapsed,
530            delayMs: Math.round(delay),
531          })
532        }
533  
534        this.reconnectTimer = setTimeout(() => {
535          this.reconnectTimer = null
536          void this.connect()
537        }, delay)
538      } else {
539        logForDebugging(
540          `WebSocketTransport: Reconnection time budget exhausted after ${Math.round(elapsed / 1000)}s for ${this.url.href}`,
541          { level: 'error' },
542        )
543        logForDiagnosticsNoPII('error', 'cli_websocket_reconnect_exhausted', {
544          reconnectAttempts: this.reconnectAttempts,
545          elapsedMs: elapsed,
546        })
547        this.state = 'closed'
548  
549        // Notify close callback
550        if (this.onCloseCallback) {
551          this.onCloseCallback(closeCode)
552        }
553      }
554    }
555  
556    close(): void {
557      // Clear any pending reconnection timer
558      if (this.reconnectTimer) {
559        clearTimeout(this.reconnectTimer)
560        this.reconnectTimer = null
561      }
562  
563      // Clear ping and keepalive intervals
564      this.stopPingInterval()
565      this.stopKeepaliveInterval()
566  
567      // Unregister session activity callback
568      unregisterSessionActivityCallback()
569  
570      this.state = 'closing'
571      this.doDisconnect()
572    }
573  
574    private replayBufferedMessages(lastId: string): void {
575      const messages = this.messageBuffer.toArray()
576      if (messages.length === 0) return
577  
578      // Find where to start replay based on server's last received message
579      let startIndex = 0
580      if (lastId) {
581        const lastConfirmedIndex = messages.findIndex(
582          message => 'uuid' in message && message.uuid === lastId,
583        )
584        if (lastConfirmedIndex >= 0) {
585          // Server confirmed messages up to lastConfirmedIndex — evict them
586          startIndex = lastConfirmedIndex + 1
587          // Rebuild the buffer with only unconfirmed messages
588          const remaining = messages.slice(startIndex)
589          this.messageBuffer.clear()
590          this.messageBuffer.addAll(remaining)
591          if (remaining.length === 0) {
592            this.lastSentId = null
593          }
594          logForDebugging(
595            `WebSocketTransport: Evicted ${startIndex} confirmed messages, ${remaining.length} remaining`,
596          )
597          logForDiagnosticsNoPII(
598            'info',
599            'cli_websocket_evicted_confirmed_messages',
600            {
601              evicted: startIndex,
602              remaining: remaining.length,
603            },
604          )
605        }
606      }
607  
608      const messagesToReplay = messages.slice(startIndex)
609      if (messagesToReplay.length === 0) {
610        logForDebugging('WebSocketTransport: No new messages to replay')
611        logForDiagnosticsNoPII('info', 'cli_websocket_no_messages_to_replay')
612        return
613      }
614  
615      logForDebugging(
616        `WebSocketTransport: Replaying ${messagesToReplay.length} buffered messages`,
617      )
618      logForDiagnosticsNoPII('info', 'cli_websocket_messages_to_replay', {
619        count: messagesToReplay.length,
620      })
621  
622      for (const message of messagesToReplay) {
623        const line = jsonStringify(message) + '\n'
624        const success = this.sendLine(line)
625        if (!success) {
626          this.handleConnectionError()
627          break
628        }
629      }
630      // Do NOT clear the buffer after replay — messages remain buffered until
631      // the server confirms receipt on the next reconnection. This prevents
632      // message loss if the connection drops after replay but before the server
633      // processes the messages.
634    }
635  
636    isConnectedStatus(): boolean {
637      return this.state === 'connected'
638    }
639  
640    isClosedStatus(): boolean {
641      return this.state === 'closed'
642    }
643  
644    setOnData(callback: (data: string) => void): void {
645      this.onData = callback
646    }
647  
648    setOnConnect(callback: () => void): void {
649      this.onConnectCallback = callback
650    }
651  
652    setOnClose(callback: (closeCode?: number) => void): void {
653      this.onCloseCallback = callback
654    }
655  
656    getStateLabel(): string {
657      return this.state
658    }
659  
660    async write(message: StdoutMessage): Promise<void> {
661      if ('uuid' in message && typeof message.uuid === 'string') {
662        this.messageBuffer.add(message)
663        this.lastSentId = message.uuid
664      }
665  
666      const line = jsonStringify(message) + '\n'
667  
668      if (this.state !== 'connected') {
669        // Message buffered for replay when connected (if it has a UUID)
670        return
671      }
672  
673      const sessionLabel = this.sessionId ? ` session=${this.sessionId}` : ''
674      const detailLabel = this.getControlMessageDetailLabel(message)
675  
676      logForDebugging(
677        `WebSocketTransport: Sending message type=${message.type}${sessionLabel}${detailLabel}`,
678      )
679  
680      this.sendLine(line)
681    }
682  
683    private getControlMessageDetailLabel(message: StdoutMessage): string {
684      if (message.type === 'control_request') {
685        const { request_id, request } = message
686        const toolName =
687          request.subtype === 'can_use_tool' ? request.tool_name : ''
688        return ` subtype=${request.subtype} request_id=${request_id}${toolName ? ` tool=${toolName}` : ''}`
689      }
690      if (message.type === 'control_response') {
691        const { subtype, request_id } = message.response
692        return ` subtype=${subtype} request_id=${request_id}`
693      }
694      return ''
695    }
696  
697    private startPingInterval(): void {
698      // Clear any existing interval
699      this.stopPingInterval()
700  
701      this.pongReceived = true
702      let lastTickTime = Date.now()
703  
704      // Send ping periodically to detect dead connections.
705      // If the previous ping got no pong, treat the connection as dead.
706      this.pingInterval = setInterval(() => {
707        if (this.state === 'connected' && this.ws) {
708          const now = Date.now()
709          const gap = now - lastTickTime
710          lastTickTime = now
711  
712          // Process-suspension detector. If the wall-clock gap between ticks
713          // greatly exceeds the 10s interval, the process was suspended
714          // (laptop lid, SIGSTOP, VM pause). setInterval does not queue
715          // missed ticks — it coalesces — so on wake this callback fires
716          // once with a huge gap. The socket is almost certainly dead:
717          // NAT mappings drop in 30s–5min, and the server has been
718          // retransmitting into the void. Don't wait for a ping/pong
719          // round-trip to confirm (ws.ping() on a dead socket returns
720          // immediately with no error — bytes go into the kernel send
721          // buffer). Assume dead and reconnect now. A spurious reconnect
722          // after a short sleep is cheap — replayBufferedMessages() handles
723          // it and the server dedups by UUID.
724          if (gap > SLEEP_DETECTION_THRESHOLD_MS) {
725            logForDebugging(
726              `WebSocketTransport: ${Math.round(gap / 1000)}s tick gap detected — process was suspended, forcing reconnect`,
727            )
728            logForDiagnosticsNoPII(
729              'info',
730              'cli_websocket_sleep_detected_on_ping',
731              { gapMs: gap },
732            )
733            this.handleConnectionError()
734            return
735          }
736  
737          if (!this.pongReceived) {
738            logForDebugging(
739              'WebSocketTransport: No pong received, connection appears dead',
740              { level: 'error' },
741            )
742            logForDiagnosticsNoPII('error', 'cli_websocket_pong_timeout')
743            this.handleConnectionError()
744            return
745          }
746  
747          this.pongReceived = false
748          try {
749            this.ws.ping?.()
750          } catch (error) {
751            logForDebugging(`WebSocketTransport: Ping failed: ${error}`, {
752              level: 'error',
753            })
754            logForDiagnosticsNoPII('error', 'cli_websocket_ping_failed')
755          }
756        }
757      }, DEFAULT_PING_INTERVAL)
758    }
759  
760    private stopPingInterval(): void {
761      if (this.pingInterval) {
762        clearInterval(this.pingInterval)
763        this.pingInterval = null
764      }
765    }
766  
767    private startKeepaliveInterval(): void {
768      this.stopKeepaliveInterval()
769  
770      // In CCR sessions, session activity heartbeats handle keep-alives
771      if (isEnvTruthy(process.env.CLAUDE_CODE_REMOTE)) {
772        return
773      }
774  
775      this.keepAliveInterval = setInterval(() => {
776        if (this.state === 'connected' && this.ws) {
777          try {
778            this.ws.send(KEEP_ALIVE_FRAME)
779            this.lastActivityTime = Date.now()
780            logForDebugging(
781              'WebSocketTransport: Sent periodic keep_alive data frame',
782            )
783          } catch (error) {
784            logForDebugging(
785              `WebSocketTransport: Periodic keep_alive failed: ${error}`,
786              { level: 'error' },
787            )
788            logForDiagnosticsNoPII('error', 'cli_websocket_keepalive_failed')
789          }
790        }
791      }, DEFAULT_KEEPALIVE_INTERVAL)
792    }
793  
794    private stopKeepaliveInterval(): void {
795      if (this.keepAliveInterval) {
796        clearInterval(this.keepAliveInterval)
797        this.keepAliveInterval = null
798      }
799    }
800  }