/ cli / transports / ccrClient.ts
ccrClient.ts
  1  import { randomUUID } from 'crypto'
  2  import type {
  3    SDKPartialAssistantMessage,
  4    StdoutMessage,
  5  } from 'src/entrypoints/sdk/controlTypes.js'
  6  import { decodeJwtExpiry } from '../../bridge/jwtUtils.js'
  7  import { logForDebugging } from '../../utils/debug.js'
  8  import { logForDiagnosticsNoPII } from '../../utils/diagLogs.js'
  9  import { errorMessage, getErrnoCode } from '../../utils/errors.js'
 10  import { createAxiosInstance } from '../../utils/proxy.js'
 11  import {
 12    registerSessionActivityCallback,
 13    unregisterSessionActivityCallback,
 14  } from '../../utils/sessionActivity.js'
 15  import {
 16    getSessionIngressAuthHeaders,
 17    getSessionIngressAuthToken,
 18  } from '../../utils/sessionIngressAuth.js'
 19  import type {
 20    RequiresActionDetails,
 21    SessionState,
 22  } from '../../utils/sessionState.js'
 23  import { sleep } from '../../utils/sleep.js'
 24  import { getClaudeCodeUserAgent } from '../../utils/userAgent.js'
 25  import {
 26    RetryableError,
 27    SerialBatchEventUploader,
 28  } from './SerialBatchEventUploader.js'
 29  import type { SSETransport, StreamClientEvent } from './SSETransport.js'
 30  import { WorkerStateUploader } from './WorkerStateUploader.js'
 31  
 32  /** Default interval between heartbeat events (20s; server TTL is 60s). */
 33  const DEFAULT_HEARTBEAT_INTERVAL_MS = 20_000
 34  
 35  /**
 36   * stream_event messages accumulate in a delay buffer for up to this many ms
 37   * before enqueue. Mirrors HybridTransport's batching window. text_delta
 38   * events for the same content block accumulate into a single full-so-far
 39   * snapshot per flush — each emitted event is self-contained so a client
 40   * connecting mid-stream sees complete text, not a fragment.
 41   */
 42  const STREAM_EVENT_FLUSH_INTERVAL_MS = 100
 43  
 44  /** Hoisted axios validateStatus callback to avoid per-request closure allocation. */
 45  function alwaysValidStatus(): boolean {
 46    return true
 47  }
 48  
 49  export type CCRInitFailReason =
 50    | 'no_auth_headers'
 51    | 'missing_epoch'
 52    | 'worker_register_failed'
 53  
 54  /** Thrown by initialize(); carries a typed reason for the diag classifier. */
 55  export class CCRInitError extends Error {
 56    constructor(readonly reason: CCRInitFailReason) {
 57      super(`CCRClient init failed: ${reason}`)
 58    }
 59  }
 60  
 61  /**
 62   * Consecutive 401/403 with a VALID-LOOKING token before giving up. An
 63   * expired JWT short-circuits this (exits immediately — deterministic,
 64   * retry is futile). This threshold is for the uncertain case: token's
 65   * exp is in the future but server says 401 (userauth down, KMS hiccup,
 66   * clock skew). 10 × 20s heartbeat ≈ 200s to ride it out.
 67   */
 68  const MAX_CONSECUTIVE_AUTH_FAILURES = 10
 69  
 70  type EventPayload = {
 71    uuid: string
 72    type: string
 73    [key: string]: unknown
 74  }
 75  
 76  type ClientEvent = {
 77    payload: EventPayload
 78    ephemeral?: boolean
 79  }
 80  
 81  /**
 82   * Structural subset of a stream_event carrying a text_delta. Not a narrowing
 83   * of SDKPartialAssistantMessage — RawMessageStreamEvent's delta is a union and
 84   * narrowing through two levels defeats the discriminant.
 85   */
 86  type CoalescedStreamEvent = {
 87    type: 'stream_event'
 88    uuid: string
 89    session_id: string
 90    parent_tool_use_id: string | null
 91    event: {
 92      type: 'content_block_delta'
 93      index: number
 94      delta: { type: 'text_delta'; text: string }
 95    }
 96  }
 97  
 98  /**
 99   * Accumulator state for text_delta coalescing. Keyed by API message ID so
100   * lifetime is tied to the assistant message — cleared when the complete
101   * SDKAssistantMessage arrives (writeEvent), which is reliable even when
102   * abort/error paths skip content_block_stop/message_stop delivery.
103   */
104  export type StreamAccumulatorState = {
105    /** API message ID (msg_...) → blocks[blockIndex] → chunk array. */
106    byMessage: Map<string, string[][]>
107    /**
108     * {session_id}:{parent_tool_use_id} → active message ID.
109     * content_block_delta events don't carry the message ID (only
110     * message_start does), so we track which message is currently streaming
111     * for each scope. At most one message streams per scope at a time.
112     */
113    scopeToMessage: Map<string, string>
114  }
115  
116  export function createStreamAccumulator(): StreamAccumulatorState {
117    return { byMessage: new Map(), scopeToMessage: new Map() }
118  }
119  
120  function scopeKey(m: {
121    session_id: string
122    parent_tool_use_id: string | null
123  }): string {
124    return `${m.session_id}:${m.parent_tool_use_id ?? ''}`
125  }
126  
127  /**
128   * Accumulate text_delta stream_events into full-so-far snapshots per content
129   * block. Each flush emits ONE event per touched block containing the FULL
130   * accumulated text from the start of the block — a client connecting
131   * mid-stream receives a self-contained snapshot, not a fragment.
132   *
133   * Non-text-delta events pass through unchanged. message_start records the
134   * active message ID for the scope; content_block_delta appends chunks;
135   * the snapshot event reuses the first text_delta UUID seen for that block in
136   * this flush so server-side idempotency remains stable across retries.
137   *
138   * Cleanup happens in writeEvent when the complete assistant message arrives
139   * (reliable), not here on stop events (abort/error paths skip those).
140   */
141  export function accumulateStreamEvents(
142    buffer: SDKPartialAssistantMessage[],
143    state: StreamAccumulatorState,
144  ): EventPayload[] {
145    const out: EventPayload[] = []
146    // chunks[] → snapshot already in `out` this flush. Keyed by the chunks
147    // array reference (stable per {messageId, index}) so subsequent deltas
148    // rewrite the same entry instead of emitting one event per delta.
149    const touched = new Map<string[], CoalescedStreamEvent>()
150    for (const msg of buffer) {
151      switch (msg.event.type) {
152        case 'message_start': {
153          const id = msg.event.message.id
154          const prevId = state.scopeToMessage.get(scopeKey(msg))
155          if (prevId) state.byMessage.delete(prevId)
156          state.scopeToMessage.set(scopeKey(msg), id)
157          state.byMessage.set(id, [])
158          out.push(msg)
159          break
160        }
161        case 'content_block_delta': {
162          if (msg.event.delta.type !== 'text_delta') {
163            out.push(msg)
164            break
165          }
166          const messageId = state.scopeToMessage.get(scopeKey(msg))
167          const blocks = messageId ? state.byMessage.get(messageId) : undefined
168          if (!blocks) {
169            // Delta without a preceding message_start (reconnect mid-stream,
170            // or message_start was in a prior buffer that got dropped). Pass
171            // through raw — can't produce a full-so-far snapshot without the
172            // prior chunks anyway.
173            out.push(msg)
174            break
175          }
176          const chunks = (blocks[msg.event.index] ??= [])
177          chunks.push(msg.event.delta.text)
178          const existing = touched.get(chunks)
179          if (existing) {
180            existing.event.delta.text = chunks.join('')
181            break
182          }
183          const snapshot: CoalescedStreamEvent = {
184            type: 'stream_event',
185            uuid: msg.uuid,
186            session_id: msg.session_id,
187            parent_tool_use_id: msg.parent_tool_use_id,
188            event: {
189              type: 'content_block_delta',
190              index: msg.event.index,
191              delta: { type: 'text_delta', text: chunks.join('') },
192            },
193          }
194          touched.set(chunks, snapshot)
195          out.push(snapshot)
196          break
197        }
198        default:
199          out.push(msg)
200      }
201    }
202    return out
203  }
204  
205  /**
206   * Clear accumulator entries for a completed assistant message. Called from
207   * writeEvent when the SDKAssistantMessage arrives — the reliable end-of-stream
208   * signal that fires even when abort/interrupt/error skip SSE stop events.
209   */
210  export function clearStreamAccumulatorForMessage(
211    state: StreamAccumulatorState,
212    assistant: {
213      session_id: string
214      parent_tool_use_id: string | null
215      message: { id: string }
216    },
217  ): void {
218    state.byMessage.delete(assistant.message.id)
219    const scope = scopeKey(assistant)
220    if (state.scopeToMessage.get(scope) === assistant.message.id) {
221      state.scopeToMessage.delete(scope)
222    }
223  }
224  
225  type RequestResult = { ok: true } | { ok: false; retryAfterMs?: number }
226  
227  type WorkerEvent = {
228    payload: EventPayload
229    is_compaction?: boolean
230    agent_id?: string
231  }
232  
233  export type InternalEvent = {
234    event_id: string
235    event_type: string
236    payload: Record<string, unknown>
237    event_metadata?: Record<string, unknown> | null
238    is_compaction: boolean
239    created_at: string
240    agent_id?: string
241  }
242  
243  type ListInternalEventsResponse = {
244    data: InternalEvent[]
245    next_cursor?: string
246  }
247  
248  type WorkerStateResponse = {
249    worker?: {
250      external_metadata?: Record<string, unknown>
251    }
252  }
253  
254  /**
255   * Manages the worker lifecycle protocol with CCR v2:
256   * - Epoch management: reads worker_epoch from CLAUDE_CODE_WORKER_EPOCH env var
257   * - Runtime state reporting: PUT /sessions/{id}/worker
258   * - Heartbeat: POST /sessions/{id}/worker/heartbeat for liveness detection
259   *
260   * All writes go through this.request().
261   */
262  export class CCRClient {
263    private workerEpoch = 0
264    private readonly heartbeatIntervalMs: number
265    private readonly heartbeatJitterFraction: number
266    private heartbeatTimer: NodeJS.Timeout | null = null
267    private heartbeatInFlight = false
268    private closed = false
269    private consecutiveAuthFailures = 0
270    private currentState: SessionState | null = null
271    private readonly sessionBaseUrl: string
272    private readonly sessionId: string
273    private readonly http = createAxiosInstance({ keepAlive: true })
274  
275    // stream_event delay buffer — accumulates content deltas for up to
276    // STREAM_EVENT_FLUSH_INTERVAL_MS before enqueueing (reduces POST count
277    // and enables text_delta coalescing). Mirrors HybridTransport's pattern.
278    private streamEventBuffer: SDKPartialAssistantMessage[] = []
279    private streamEventTimer: ReturnType<typeof setTimeout> | null = null
280    // Full-so-far text accumulator. Persists across flushes so each emitted
281    // text_delta event carries the complete text from the start of the block —
282    // mid-stream reconnects see a self-contained snapshot. Keyed by API message
283    // ID; cleared in writeEvent when the complete assistant message arrives.
284    private streamTextAccumulator = createStreamAccumulator()
285  
286    private readonly workerState: WorkerStateUploader
287    private readonly eventUploader: SerialBatchEventUploader<ClientEvent>
288    private readonly internalEventUploader: SerialBatchEventUploader<WorkerEvent>
289    private readonly deliveryUploader: SerialBatchEventUploader<{
290      eventId: string
291      status: 'received' | 'processing' | 'processed'
292    }>
293  
294    /**
295     * Called when the server returns 409 (a newer worker epoch superseded ours).
296     * Default: process.exit(1) — correct for spawn-mode children where the
297     * parent bridge re-spawns. In-process callers (replBridge) MUST override
298     * this to close gracefully instead; exit would kill the user's REPL.
299     */
300    private readonly onEpochMismatch: () => never
301  
302    /**
303     * Auth header source. Defaults to the process-wide session-ingress token
304     * (CLAUDE_CODE_SESSION_ACCESS_TOKEN env var). Callers managing multiple
305     * concurrent sessions with distinct JWTs MUST inject this — the env-var
306     * path is a process global and would stomp across sessions.
307     */
308    private readonly getAuthHeaders: () => Record<string, string>
309  
310    constructor(
311      transport: SSETransport,
312      sessionUrl: URL,
313      opts?: {
314        onEpochMismatch?: () => never
315        heartbeatIntervalMs?: number
316        heartbeatJitterFraction?: number
317        /**
318         * Per-instance auth header source. Omit to read the process-wide
319         * CLAUDE_CODE_SESSION_ACCESS_TOKEN (single-session callers — REPL,
320         * daemon). Required for concurrent multi-session callers.
321         */
322        getAuthHeaders?: () => Record<string, string>
323      },
324    ) {
325      this.onEpochMismatch =
326        opts?.onEpochMismatch ??
327        (() => {
328          // eslint-disable-next-line custom-rules/no-process-exit
329          process.exit(1)
330        })
331      this.heartbeatIntervalMs =
332        opts?.heartbeatIntervalMs ?? DEFAULT_HEARTBEAT_INTERVAL_MS
333      this.heartbeatJitterFraction = opts?.heartbeatJitterFraction ?? 0
334      this.getAuthHeaders = opts?.getAuthHeaders ?? getSessionIngressAuthHeaders
335      // Session URL: https://host/v1/code/sessions/{id}
336      if (sessionUrl.protocol !== 'http:' && sessionUrl.protocol !== 'https:') {
337        throw new Error(
338          `CCRClient: Expected http(s) URL, got ${sessionUrl.protocol}`,
339        )
340      }
341      const pathname = sessionUrl.pathname.replace(/\/$/, '')
342      this.sessionBaseUrl = `${sessionUrl.protocol}//${sessionUrl.host}${pathname}`
343      // Extract session ID from the URL path (last segment)
344      this.sessionId = pathname.split('/').pop() || ''
345  
346      this.workerState = new WorkerStateUploader({
347        send: body =>
348          this.request(
349            'put',
350            '/worker',
351            { worker_epoch: this.workerEpoch, ...body },
352            'PUT worker',
353          ).then(r => r.ok),
354        baseDelayMs: 500,
355        maxDelayMs: 30_000,
356        jitterMs: 500,
357      })
358  
359      this.eventUploader = new SerialBatchEventUploader<ClientEvent>({
360        maxBatchSize: 100,
361        maxBatchBytes: 10 * 1024 * 1024,
362        // flushStreamEventBuffer() enqueues a full 100ms window of accumulated
363        // stream_events in one call. A burst of mixed delta types that don't
364        // fold into a single snapshot could exceed the old cap (50) and deadlock
365        // on the SerialBatchEventUploader backpressure check. Match
366        // HybridTransport's bound — high enough to be memory-only.
367        maxQueueSize: 100_000,
368        send: async batch => {
369          const result = await this.request(
370            'post',
371            '/worker/events',
372            { worker_epoch: this.workerEpoch, events: batch },
373            'client events',
374          )
375          if (!result.ok) {
376            throw new RetryableError(
377              'client event POST failed',
378              result.retryAfterMs,
379            )
380          }
381        },
382        baseDelayMs: 500,
383        maxDelayMs: 30_000,
384        jitterMs: 500,
385      })
386  
387      this.internalEventUploader = new SerialBatchEventUploader<WorkerEvent>({
388        maxBatchSize: 100,
389        maxBatchBytes: 10 * 1024 * 1024,
390        maxQueueSize: 200,
391        send: async batch => {
392          const result = await this.request(
393            'post',
394            '/worker/internal-events',
395            { worker_epoch: this.workerEpoch, events: batch },
396            'internal events',
397          )
398          if (!result.ok) {
399            throw new RetryableError(
400              'internal event POST failed',
401              result.retryAfterMs,
402            )
403          }
404        },
405        baseDelayMs: 500,
406        maxDelayMs: 30_000,
407        jitterMs: 500,
408      })
409  
410      this.deliveryUploader = new SerialBatchEventUploader<{
411        eventId: string
412        status: 'received' | 'processing' | 'processed'
413      }>({
414        maxBatchSize: 64,
415        maxQueueSize: 64,
416        send: async batch => {
417          const result = await this.request(
418            'post',
419            '/worker/events/delivery',
420            {
421              worker_epoch: this.workerEpoch,
422              updates: batch.map(d => ({
423                event_id: d.eventId,
424                status: d.status,
425              })),
426            },
427            'delivery batch',
428          )
429          if (!result.ok) {
430            throw new RetryableError('delivery POST failed', result.retryAfterMs)
431          }
432        },
433        baseDelayMs: 500,
434        maxDelayMs: 30_000,
435        jitterMs: 500,
436      })
437  
438      // Ack each received client_event so CCR can track delivery status.
439      // Wired here (not in initialize()) so the callback is registered the
440      // moment new CCRClient() returns — remoteIO must be free to call
441      // transport.connect() immediately after without racing the first
442      // SSE catch-up frame against an unwired onEventCallback.
443      transport.setOnEvent((event: StreamClientEvent) => {
444        this.reportDelivery(event.event_id, 'received')
445      })
446    }
447  
448    /**
449     * Initialize the session worker:
450     * 1. Take worker_epoch from the argument, or fall back to
451     *    CLAUDE_CODE_WORKER_EPOCH (set by env-manager / bridge spawner)
452     * 2. Report state as 'idle'
453     * 3. Start heartbeat timer
454     *
455     * In-process callers (replBridge) pass the epoch directly — they
456     * registered the worker themselves and there is no parent process
457     * setting env vars.
458     */
459    async initialize(epoch?: number): Promise<Record<string, unknown> | null> {
460      const startMs = Date.now()
461      if (Object.keys(this.getAuthHeaders()).length === 0) {
462        throw new CCRInitError('no_auth_headers')
463      }
464      if (epoch === undefined) {
465        const rawEpoch = process.env.CLAUDE_CODE_WORKER_EPOCH
466        epoch = rawEpoch ? parseInt(rawEpoch, 10) : NaN
467      }
468      if (isNaN(epoch)) {
469        throw new CCRInitError('missing_epoch')
470      }
471      this.workerEpoch = epoch
472  
473      // Concurrent with the init PUT — neither depends on the other.
474      const restoredPromise = this.getWorkerState()
475  
476      const result = await this.request(
477        'put',
478        '/worker',
479        {
480          worker_status: 'idle',
481          worker_epoch: this.workerEpoch,
482          // Clear stale pending_action/task_summary left by a prior
483          // worker crash — the in-session clears don't survive process restart.
484          external_metadata: {
485            pending_action: null,
486            task_summary: null,
487          },
488        },
489        'PUT worker (init)',
490      )
491      if (!result.ok) {
492        // 409 → onEpochMismatch may throw, but request() catches it and returns
493        // false. Without this check we'd continue to startHeartbeat(), leaking a
494        // 20s timer against a dead epoch. Throw so connect()'s rejection handler
495        // fires instead of the success path.
496        throw new CCRInitError('worker_register_failed')
497      }
498      this.currentState = 'idle'
499      this.startHeartbeat()
500  
501      // sessionActivity's refcount-gated timer fires while an API call or tool
502      // is in-flight; without a write the container lease can expire mid-wait.
503      // v1 wires this in WebSocketTransport per-connection.
504      registerSessionActivityCallback(() => {
505        void this.writeEvent({ type: 'keep_alive' })
506      })
507  
508      logForDebugging(`CCRClient: initialized, epoch=${this.workerEpoch}`)
509      logForDiagnosticsNoPII('info', 'cli_worker_lifecycle_initialized', {
510        epoch: this.workerEpoch,
511        duration_ms: Date.now() - startMs,
512      })
513  
514      // Await the concurrent GET and log state_restored here, after the PUT
515      // has succeeded — logging inside getWorkerState() raced: if the GET
516      // resolved before the PUT failed, diagnostics showed both init_failed
517      // and state_restored for the same session.
518      const { metadata, durationMs } = await restoredPromise
519      if (!this.closed) {
520        logForDiagnosticsNoPII('info', 'cli_worker_state_restored', {
521          duration_ms: durationMs,
522          had_state: metadata !== null,
523        })
524      }
525      return metadata
526    }
527  
528    // Control_requests are marked processed and not re-delivered on
529    // restart, so read back what the prior worker wrote.
530    private async getWorkerState(): Promise<{
531      metadata: Record<string, unknown> | null
532      durationMs: number
533    }> {
534      const startMs = Date.now()
535      const authHeaders = this.getAuthHeaders()
536      if (Object.keys(authHeaders).length === 0) {
537        return { metadata: null, durationMs: 0 }
538      }
539      const data = await this.getWithRetry<WorkerStateResponse>(
540        `${this.sessionBaseUrl}/worker`,
541        authHeaders,
542        'worker_state',
543      )
544      return {
545        metadata: data?.worker?.external_metadata ?? null,
546        durationMs: Date.now() - startMs,
547      }
548    }
549  
550    /**
551     * Send an authenticated HTTP request to CCR. Handles auth headers,
552     * 409 epoch mismatch, and error logging. Returns { ok: true } on 2xx.
553     * On 429, reads Retry-After (integer seconds) so the uploader can honor
554     * the server's backoff hint instead of blindly exponentiating.
555     */
556    private async request(
557      method: 'post' | 'put',
558      path: string,
559      body: unknown,
560      label: string,
561      { timeout = 10_000 }: { timeout?: number } = {},
562    ): Promise<RequestResult> {
563      const authHeaders = this.getAuthHeaders()
564      if (Object.keys(authHeaders).length === 0) return { ok: false }
565  
566      try {
567        const response = await this.http[method](
568          `${this.sessionBaseUrl}${path}`,
569          body,
570          {
571            headers: {
572              ...authHeaders,
573              'Content-Type': 'application/json',
574              'anthropic-version': '2023-06-01',
575              'User-Agent': getClaudeCodeUserAgent(),
576            },
577            validateStatus: alwaysValidStatus,
578            timeout,
579          },
580        )
581  
582        if (response.status >= 200 && response.status < 300) {
583          this.consecutiveAuthFailures = 0
584          return { ok: true }
585        }
586        if (response.status === 409) {
587          this.handleEpochMismatch()
588        }
589        if (response.status === 401 || response.status === 403) {
590          // A 401 with an expired JWT is deterministic — no retry will
591          // ever succeed. Check the token's own exp before burning
592          // wall-clock on the threshold loop.
593          const tok = getSessionIngressAuthToken()
594          const exp = tok ? decodeJwtExpiry(tok) : null
595          if (exp !== null && exp * 1000 < Date.now()) {
596            logForDebugging(
597              `CCRClient: session_token expired (exp=${new Date(exp * 1000).toISOString()}) — no refresh was delivered, exiting`,
598              { level: 'error' },
599            )
600            logForDiagnosticsNoPII('error', 'cli_worker_token_expired_no_refresh')
601            this.onEpochMismatch()
602          }
603          // Token looks valid but server says 401 — possible server-side
604          // blip (userauth down, KMS hiccup). Count toward threshold.
605          this.consecutiveAuthFailures++
606          if (this.consecutiveAuthFailures >= MAX_CONSECUTIVE_AUTH_FAILURES) {
607            logForDebugging(
608              `CCRClient: ${this.consecutiveAuthFailures} consecutive auth failures with a valid-looking token — server-side auth unrecoverable, exiting`,
609              { level: 'error' },
610            )
611            logForDiagnosticsNoPII('error', 'cli_worker_auth_failures_exhausted')
612            this.onEpochMismatch()
613          }
614        }
615        logForDebugging(`CCRClient: ${label} returned ${response.status}`, {
616          level: 'warn',
617        })
618        logForDiagnosticsNoPII('warn', 'cli_worker_request_failed', {
619          method,
620          path,
621          status: response.status,
622        })
623        if (response.status === 429) {
624          const raw = response.headers?.['retry-after']
625          const seconds = typeof raw === 'string' ? parseInt(raw, 10) : NaN
626          if (!isNaN(seconds) && seconds >= 0) {
627            return { ok: false, retryAfterMs: seconds * 1000 }
628          }
629        }
630        return { ok: false }
631      } catch (error) {
632        logForDebugging(`CCRClient: ${label} failed: ${errorMessage(error)}`, {
633          level: 'warn',
634        })
635        logForDiagnosticsNoPII('warn', 'cli_worker_request_error', {
636          method,
637          path,
638          error_code: getErrnoCode(error),
639        })
640        return { ok: false }
641      }
642    }
643  
644    /** Report worker state to CCR via PUT /sessions/{id}/worker. */
645    reportState(state: SessionState, details?: RequiresActionDetails): void {
646      if (state === this.currentState && !details) return
647      this.currentState = state
648      this.workerState.enqueue({
649        worker_status: state,
650        requires_action_details: details
651          ? {
652              tool_name: details.tool_name,
653              action_description: details.action_description,
654              request_id: details.request_id,
655            }
656          : null,
657      })
658    }
659  
660    /** Report external metadata to CCR via PUT /worker. */
661    reportMetadata(metadata: Record<string, unknown>): void {
662      this.workerState.enqueue({ external_metadata: metadata })
663    }
664  
665    /**
666     * Handle epoch mismatch (409 Conflict). A newer CC instance has replaced
667     * this one — exit immediately.
668     */
669    private handleEpochMismatch(): never {
670      logForDebugging('CCRClient: Epoch mismatch (409), shutting down', {
671        level: 'error',
672      })
673      logForDiagnosticsNoPII('error', 'cli_worker_epoch_mismatch')
674      this.onEpochMismatch()
675    }
676  
677    /** Start periodic heartbeat. */
678    private startHeartbeat(): void {
679      this.stopHeartbeat()
680      const schedule = (): void => {
681        const jitter =
682          this.heartbeatIntervalMs *
683          this.heartbeatJitterFraction *
684          (2 * Math.random() - 1)
685        this.heartbeatTimer = setTimeout(tick, this.heartbeatIntervalMs + jitter)
686      }
687      const tick = (): void => {
688        void this.sendHeartbeat()
689        // stopHeartbeat nulls the timer; check after the fire-and-forget send
690        // but before rescheduling so close() during sendHeartbeat is honored.
691        if (this.heartbeatTimer === null) return
692        schedule()
693      }
694      schedule()
695    }
696  
697    /** Stop heartbeat timer. */
698    private stopHeartbeat(): void {
699      if (this.heartbeatTimer) {
700        clearTimeout(this.heartbeatTimer)
701        this.heartbeatTimer = null
702      }
703    }
704  
705    /** Send a heartbeat via POST /sessions/{id}/worker/heartbeat. */
706    private async sendHeartbeat(): Promise<void> {
707      if (this.heartbeatInFlight) return
708      this.heartbeatInFlight = true
709      try {
710        const result = await this.request(
711          'post',
712          '/worker/heartbeat',
713          { session_id: this.sessionId, worker_epoch: this.workerEpoch },
714          'Heartbeat',
715          { timeout: 5_000 },
716        )
717        if (result.ok) {
718          logForDebugging('CCRClient: Heartbeat sent')
719        }
720      } finally {
721        this.heartbeatInFlight = false
722      }
723    }
724  
725    /**
726     * Write a StdoutMessage as a client event via POST /sessions/{id}/worker/events.
727     * These events are visible to frontend clients via the SSE stream.
728     * Injects a UUID if missing to ensure server-side idempotency on retry.
729     *
730     * stream_event messages are held in a 100ms delay buffer and accumulated
731     * (text_deltas for the same content block emit a full-so-far snapshot per
732     * flush). A non-stream_event write flushes the buffer first so downstream
733     * ordering is preserved.
734     */
735    async writeEvent(message: StdoutMessage): Promise<void> {
736      if (message.type === 'stream_event') {
737        this.streamEventBuffer.push(message)
738        if (!this.streamEventTimer) {
739          this.streamEventTimer = setTimeout(
740            () => void this.flushStreamEventBuffer(),
741            STREAM_EVENT_FLUSH_INTERVAL_MS,
742          )
743        }
744        return
745      }
746      await this.flushStreamEventBuffer()
747      if (message.type === 'assistant') {
748        clearStreamAccumulatorForMessage(this.streamTextAccumulator, message)
749      }
750      await this.eventUploader.enqueue(this.toClientEvent(message))
751    }
752  
753    /** Wrap a StdoutMessage as a ClientEvent, injecting a UUID if missing. */
754    private toClientEvent(message: StdoutMessage): ClientEvent {
755      const msg = message as unknown as Record<string, unknown>
756      return {
757        payload: {
758          ...msg,
759          uuid: typeof msg.uuid === 'string' ? msg.uuid : randomUUID(),
760        } as EventPayload,
761      }
762    }
763  
764    /**
765     * Drain the stream_event delay buffer: accumulate text_deltas into
766     * full-so-far snapshots, clear the timer, enqueue the resulting events.
767     * Called from the timer, from writeEvent on a non-stream message, and from
768     * flush(). close() drops the buffer — call flush() first if you need
769     * delivery.
770     */
771    private async flushStreamEventBuffer(): Promise<void> {
772      if (this.streamEventTimer) {
773        clearTimeout(this.streamEventTimer)
774        this.streamEventTimer = null
775      }
776      if (this.streamEventBuffer.length === 0) return
777      const buffered = this.streamEventBuffer
778      this.streamEventBuffer = []
779      const payloads = accumulateStreamEvents(
780        buffered,
781        this.streamTextAccumulator,
782      )
783      await this.eventUploader.enqueue(
784        payloads.map(payload => ({ payload, ephemeral: true })),
785      )
786    }
787  
788    /**
789     * Write an internal worker event via POST /sessions/{id}/worker/internal-events.
790     * These events are NOT visible to frontend clients — they store worker-internal
791     * state (transcript messages, compaction markers) needed for session resume.
792     */
793    async writeInternalEvent(
794      eventType: string,
795      payload: Record<string, unknown>,
796      {
797        isCompaction = false,
798        agentId,
799      }: {
800        isCompaction?: boolean
801        agentId?: string
802      } = {},
803    ): Promise<void> {
804      const event: WorkerEvent = {
805        payload: {
806          type: eventType,
807          ...payload,
808          uuid: typeof payload.uuid === 'string' ? payload.uuid : randomUUID(),
809        } as EventPayload,
810        ...(isCompaction && { is_compaction: true }),
811        ...(agentId && { agent_id: agentId }),
812      }
813      await this.internalEventUploader.enqueue(event)
814    }
815  
816    /**
817     * Flush pending internal events. Call between turns and on shutdown
818     * to ensure transcript entries are persisted.
819     */
820    flushInternalEvents(): Promise<void> {
821      return this.internalEventUploader.flush()
822    }
823  
824    /**
825     * Flush pending client events (writeEvent queue). Call before close()
826     * when the caller needs delivery confirmation — close() abandons the
827     * queue. Resolves once the uploader drains or rejects; returns
828     * regardless of whether individual POSTs succeeded (check server state
829     * separately if that matters).
830     */
831    async flush(): Promise<void> {
832      await this.flushStreamEventBuffer()
833      return this.eventUploader.flush()
834    }
835  
836    /**
837     * Read foreground agent internal events from
838     * GET /sessions/{id}/worker/internal-events.
839     * Returns transcript entries from the last compaction boundary, or null on failure.
840     * Used for session resume.
841     */
842    async readInternalEvents(): Promise<InternalEvent[] | null> {
843      return this.paginatedGet('/worker/internal-events', {}, 'internal_events')
844    }
845  
846    /**
847     * Read all subagent internal events from
848     * GET /sessions/{id}/worker/internal-events?subagents=true.
849     * Returns a merged stream across all non-foreground agents, each from its
850     * compaction point. Used for session resume.
851     */
852    async readSubagentInternalEvents(): Promise<InternalEvent[] | null> {
853      return this.paginatedGet(
854        '/worker/internal-events',
855        { subagents: 'true' },
856        'subagent_events',
857      )
858    }
859  
860    /**
861     * Paginated GET with retry. Fetches all pages from a list endpoint,
862     * retrying each page on failure with exponential backoff + jitter.
863     */
864    private async paginatedGet(
865      path: string,
866      params: Record<string, string>,
867      context: string,
868    ): Promise<InternalEvent[] | null> {
869      const authHeaders = this.getAuthHeaders()
870      if (Object.keys(authHeaders).length === 0) return null
871  
872      const allEvents: InternalEvent[] = []
873      let cursor: string | undefined
874  
875      do {
876        const url = new URL(`${this.sessionBaseUrl}${path}`)
877        for (const [k, v] of Object.entries(params)) {
878          url.searchParams.set(k, v)
879        }
880        if (cursor) {
881          url.searchParams.set('cursor', cursor)
882        }
883  
884        const page = await this.getWithRetry<ListInternalEventsResponse>(
885          url.toString(),
886          authHeaders,
887          context,
888        )
889        if (!page) return null
890  
891        allEvents.push(...(page.data ?? []))
892        cursor = page.next_cursor
893      } while (cursor)
894  
895      logForDebugging(
896        `CCRClient: Read ${allEvents.length} internal events from ${path}${params.subagents ? ' (subagents)' : ''}`,
897      )
898      return allEvents
899    }
900  
901    /**
902     * Single GET request with retry. Returns the parsed response body
903     * on success, null if all retries are exhausted.
904     */
905    private async getWithRetry<T>(
906      url: string,
907      authHeaders: Record<string, string>,
908      context: string,
909    ): Promise<T | null> {
910      for (let attempt = 1; attempt <= 10; attempt++) {
911        let response
912        try {
913          response = await this.http.get<T>(url, {
914            headers: {
915              ...authHeaders,
916              'anthropic-version': '2023-06-01',
917              'User-Agent': getClaudeCodeUserAgent(),
918            },
919            validateStatus: alwaysValidStatus,
920            timeout: 30_000,
921          })
922        } catch (error) {
923          logForDebugging(
924            `CCRClient: GET ${url} failed (attempt ${attempt}/10): ${errorMessage(error)}`,
925            { level: 'warn' },
926          )
927          if (attempt < 10) {
928            const delay =
929              Math.min(500 * 2 ** (attempt - 1), 30_000) + Math.random() * 500
930            await sleep(delay)
931          }
932          continue
933        }
934  
935        if (response.status >= 200 && response.status < 300) {
936          return response.data
937        }
938        if (response.status === 409) {
939          this.handleEpochMismatch()
940        }
941        logForDebugging(
942          `CCRClient: GET ${url} returned ${response.status} (attempt ${attempt}/10)`,
943          { level: 'warn' },
944        )
945  
946        if (attempt < 10) {
947          const delay =
948            Math.min(500 * 2 ** (attempt - 1), 30_000) + Math.random() * 500
949          await sleep(delay)
950        }
951      }
952  
953      logForDebugging('CCRClient: GET retries exhausted', { level: 'error' })
954      logForDiagnosticsNoPII('error', 'cli_worker_get_retries_exhausted', {
955        context,
956      })
957      return null
958    }
959  
960    /**
961     * Report delivery status for a client-to-worker event.
962     * POST /v1/code/sessions/{id}/worker/events/delivery (batch endpoint)
963     */
964    reportDelivery(
965      eventId: string,
966      status: 'received' | 'processing' | 'processed',
967    ): void {
968      void this.deliveryUploader.enqueue({ eventId, status })
969    }
970  
971    /** Get the current epoch (for external use). */
972    getWorkerEpoch(): number {
973      return this.workerEpoch
974    }
975  
976    /** Internal-event queue depth — shutdown-snapshot backpressure signal. */
977    get internalEventsPending(): number {
978      return this.internalEventUploader.pendingCount
979    }
980  
981    /** Clean up uploaders and timers. */
982    close(): void {
983      this.closed = true
984      this.stopHeartbeat()
985      unregisterSessionActivityCallback()
986      if (this.streamEventTimer) {
987        clearTimeout(this.streamEventTimer)
988        this.streamEventTimer = null
989      }
990      this.streamEventBuffer = []
991      this.streamTextAccumulator.byMessage.clear()
992      this.streamTextAccumulator.scopeToMessage.clear()
993      this.workerState.close()
994      this.eventUploader.close()
995      this.internalEventUploader.close()
996      this.deliveryUploader.close()
997    }
998  }