/ bridge / replBridgeTransport.ts
replBridgeTransport.ts
  1  import type { StdoutMessage } from 'src/entrypoints/sdk/controlTypes.js'
  2  import { CCRClient } from '../cli/transports/ccrClient.js'
  3  import type { HybridTransport } from '../cli/transports/HybridTransport.js'
  4  import { SSETransport } from '../cli/transports/SSETransport.js'
  5  import { logForDebugging } from '../utils/debug.js'
  6  import { errorMessage } from '../utils/errors.js'
  7  import { updateSessionIngressAuthToken } from '../utils/sessionIngressAuth.js'
  8  import type { SessionState } from '../utils/sessionState.js'
  9  import { registerWorker } from './workSecret.js'
 10  
 11  /**
 12   * Transport abstraction for replBridge. Covers exactly the surface that
 13   * replBridge.ts uses against HybridTransport so the v1/v2 choice is
 14   * confined to the construction site.
 15   *
 16   * - v1: HybridTransport (WS reads + POST writes to Session-Ingress)
 17   * - v2: SSETransport (reads) + CCRClient (writes to CCR v2 /worker/*)
 18   *
 19   * The v2 write path goes through CCRClient.writeEvent → SerialBatchEventUploader,
 20   * NOT through SSETransport.write() — SSETransport.write() targets the
 21   * Session-Ingress POST URL shape, which is wrong for CCR v2.
 22   */
 23  export type ReplBridgeTransport = {
 24    write(message: StdoutMessage): Promise<void>
 25    writeBatch(messages: StdoutMessage[]): Promise<void>
 26    close(): void
 27    isConnectedStatus(): boolean
 28    getStateLabel(): string
 29    setOnData(callback: (data: string) => void): void
 30    setOnClose(callback: (closeCode?: number) => void): void
 31    setOnConnect(callback: () => void): void
 32    connect(): void
 33    /**
 34     * High-water mark of the underlying read stream's event sequence numbers.
 35     * replBridge reads this before swapping transports so the new one can
 36     * resume from where the old one left off (otherwise the server replays
 37     * the entire session history from seq 0).
 38     *
 39     * v1 returns 0 — Session-Ingress WS doesn't use SSE sequence numbers;
 40     * replay-on-reconnect is handled by the server-side message cursor.
 41     */
 42    getLastSequenceNum(): number
 43    /**
 44     * Monotonic count of batches dropped via maxConsecutiveFailures.
 45     * Snapshot before writeBatch() and compare after to detect silent drops
 46     * (writeBatch() resolves normally even when batches were dropped).
 47     * v2 returns 0 — the v2 write path doesn't set maxConsecutiveFailures.
 48     */
 49    readonly droppedBatchCount: number
 50    /**
 51     * PUT /worker state (v2 only; v1 is a no-op). `requires_action` tells
 52     * the backend a permission prompt is pending — claude.ai shows the
 53     * "waiting for input" indicator. REPL/daemon callers don't need this
 54     * (user watches the REPL locally); multi-session worker callers do.
 55     */
 56    reportState(state: SessionState): void
 57    /** PUT /worker external_metadata (v2 only; v1 is a no-op). */
 58    reportMetadata(metadata: Record<string, unknown>): void
 59    /**
 60     * POST /worker/events/{id}/delivery (v2 only; v1 is a no-op). Populates
 61     * CCR's processing_at/processed_at columns. `received` is auto-fired by
 62     * CCRClient on every SSE frame and is not exposed here.
 63     */
 64    reportDelivery(eventId: string, status: 'processing' | 'processed'): void
 65    /**
 66     * Drain the write queue before close() (v2 only; v1 resolves
 67     * immediately — HybridTransport POSTs are already awaited per-write).
 68     */
 69    flush(): Promise<void>
 70  }
 71  
 72  /**
 73   * v1 adapter: HybridTransport already has the full surface (it extends
 74   * WebSocketTransport which has setOnConnect + getStateLabel). This is a
 75   * no-op wrapper that exists only so replBridge's `transport` variable
 76   * has a single type.
 77   */
 78  export function createV1ReplTransport(
 79    hybrid: HybridTransport,
 80  ): ReplBridgeTransport {
 81    return {
 82      write: msg => hybrid.write(msg),
 83      writeBatch: msgs => hybrid.writeBatch(msgs),
 84      close: () => hybrid.close(),
 85      isConnectedStatus: () => hybrid.isConnectedStatus(),
 86      getStateLabel: () => hybrid.getStateLabel(),
 87      setOnData: cb => hybrid.setOnData(cb),
 88      setOnClose: cb => hybrid.setOnClose(cb),
 89      setOnConnect: cb => hybrid.setOnConnect(cb),
 90      connect: () => void hybrid.connect(),
 91      // v1 Session-Ingress WS doesn't use SSE sequence numbers; replay
 92      // semantics are different. Always return 0 so the seq-num carryover
 93      // logic in replBridge is a no-op for v1.
 94      getLastSequenceNum: () => 0,
 95      get droppedBatchCount() {
 96        return hybrid.droppedBatchCount
 97      },
 98      reportState: () => {},
 99      reportMetadata: () => {},
100      reportDelivery: () => {},
101      flush: () => Promise.resolve(),
102    }
103  }
104  
105  /**
106   * v2 adapter: wrap SSETransport (reads) + CCRClient (writes, heartbeat,
107   * state, delivery tracking).
108   *
109   * Auth: v2 endpoints validate the JWT's session_id claim (register_worker.go:32)
110   * and worker role (environment_auth.py:856). OAuth tokens have neither.
111   * This is the inverse of the v1 replBridge path, which deliberately uses OAuth.
112   * The JWT is refreshed when the poll loop re-dispatches work — the caller
113   * invokes createV2ReplTransport again with the fresh token.
114   *
115   * Registration happens here (not in the caller) so the entire v2 handshake
116   * is one async step. registerWorker failure propagates — replBridge will
117   * catch it and stay on the poll loop.
118   */
119  export async function createV2ReplTransport(opts: {
120    sessionUrl: string
121    ingressToken: string
122    sessionId: string
123    /**
124     * SSE sequence-number high-water mark from the previous transport.
125     * Passed to the new SSETransport so its first connect() sends
126     * from_sequence_num / Last-Event-ID and the server resumes from where
127     * the old stream left off. Without this, every transport swap asks the
128     * server to replay the entire session history from seq 0.
129     */
130    initialSequenceNum?: number
131    /**
132     * Worker epoch from POST /bridge response. When provided, the server
133     * already bumped epoch (the /bridge call IS the register — see server
134     * PR #293280). When omitted (v1 CCR-v2 path via replBridge.ts poll loop),
135     * call registerWorker as before.
136     */
137    epoch?: number
138    /** CCRClient heartbeat interval. Defaults to 20s when omitted. */
139    heartbeatIntervalMs?: number
140    /** ±fraction per-beat jitter. Defaults to 0 (no jitter) when omitted. */
141    heartbeatJitterFraction?: number
142    /**
143     * When true, skip opening the SSE read stream — only the CCRClient write
144     * path is activated. Use for mirror-mode attachments that forward events
145     * but never receive inbound prompts or control requests.
146     */
147    outboundOnly?: boolean
148    /**
149     * Per-instance auth header source. When provided, CCRClient + SSETransport
150     * read auth from this closure instead of the process-wide
151     * CLAUDE_CODE_SESSION_ACCESS_TOKEN env var. Required for callers managing
152     * multiple concurrent sessions — the env-var path stomps across sessions.
153     * When omitted, falls back to the env var (single-session callers).
154     */
155    getAuthToken?: () => string | undefined
156  }): Promise<ReplBridgeTransport> {
157    const {
158      sessionUrl,
159      ingressToken,
160      sessionId,
161      initialSequenceNum,
162      getAuthToken,
163    } = opts
164  
165    // Auth header builder. If getAuthToken is provided, read from it
166    // (per-instance, multi-session safe). Otherwise write ingressToken to
167    // the process-wide env var (legacy single-session path — CCRClient's
168    // default getAuthHeaders reads it via getSessionIngressAuthHeaders).
169    let getAuthHeaders: (() => Record<string, string>) | undefined
170    if (getAuthToken) {
171      getAuthHeaders = (): Record<string, string> => {
172        const token = getAuthToken()
173        if (!token) return {}
174        return { Authorization: `Bearer ${token}` }
175      }
176    } else {
177      // CCRClient.request() and SSETransport.connect() both read auth via
178      // getSessionIngressAuthHeaders() → this env var. Set it before either
179      // touches the network.
180      updateSessionIngressAuthToken(ingressToken)
181    }
182  
183    const epoch = opts.epoch ?? (await registerWorker(sessionUrl, ingressToken))
184    logForDebugging(
185      `[bridge:repl] CCR v2: worker sessionId=${sessionId} epoch=${epoch}${opts.epoch !== undefined ? ' (from /bridge)' : ' (via registerWorker)'}`,
186    )
187  
188    // Derive SSE stream URL. Same logic as transportUtils.ts:26-33 but
189    // starting from an http(s) base instead of a --sdk-url that might be ws://.
190    const sseUrl = new URL(sessionUrl)
191    sseUrl.pathname = sseUrl.pathname.replace(/\/$/, '') + '/worker/events/stream'
192  
193    const sse = new SSETransport(
194      sseUrl,
195      {},
196      sessionId,
197      undefined,
198      initialSequenceNum,
199      getAuthHeaders,
200    )
201    let onCloseCb: ((closeCode?: number) => void) | undefined
202    const ccr = new CCRClient(sse, new URL(sessionUrl), {
203      getAuthHeaders,
204      heartbeatIntervalMs: opts.heartbeatIntervalMs,
205      heartbeatJitterFraction: opts.heartbeatJitterFraction,
206      // Default is process.exit(1) — correct for spawn-mode children. In-process,
207      // that kills the REPL. Close instead: replBridge's onClose wakes the poll
208      // loop, which picks up the server's re-dispatch (with fresh epoch).
209      onEpochMismatch: () => {
210        logForDebugging(
211          '[bridge:repl] CCR v2: epoch superseded (409) — closing for poll-loop recovery',
212        )
213        // Close resources in a try block so the throw always executes.
214        // If ccr.close() or sse.close() throw, we still need to unwind
215        // the caller (request()) — otherwise handleEpochMismatch's `never`
216        // return type is violated at runtime and control falls through.
217        try {
218          ccr.close()
219          sse.close()
220          onCloseCb?.(4090)
221        } catch (closeErr: unknown) {
222          logForDebugging(
223            `[bridge:repl] CCR v2: error during epoch-mismatch cleanup: ${errorMessage(closeErr)}`,
224            { level: 'error' },
225          )
226        }
227        // Don't return — the calling request() code continues after the 409
228        // branch, so callers see the logged warning and a false return. We
229        // throw to unwind; the uploaders catch it as a send failure.
230        throw new Error('epoch superseded')
231      },
232    })
233  
234    // CCRClient's constructor wired sse.setOnEvent → reportDelivery('received').
235    // remoteIO.ts additionally sends 'processing'/'processed' via
236    // setCommandLifecycleListener, which the in-process query loop fires. This
237    // transport's only caller (replBridge/daemonBridge) has no such wiring — the
238    // daemon's agent child is a separate process (ProcessTransport), and its
239    // notifyCommandLifecycle calls fire with listener=null in its own module
240    // scope. So events stay at 'received' forever, and reconnectSession re-queues
241    // them on every daemon restart (observed: 21→24→25 phantom prompts as
242    // "user sent a new message while you were working" system-reminders).
243    //
244    // Fix: ACK 'processed' immediately alongside 'received'. The window between
245    // SSE receipt and transcript-write is narrow (queue → SDK → child stdin →
246    // model); a crash there loses one prompt vs. the observed N-prompt flood on
247    // every restart. Overwrite the constructor's wiring to do both — setOnEvent
248    // replaces, not appends (SSETransport.ts:658).
249    sse.setOnEvent(event => {
250      ccr.reportDelivery(event.event_id, 'received')
251      ccr.reportDelivery(event.event_id, 'processed')
252    })
253  
254    // Both sse.connect() and ccr.initialize() are deferred to connect() below.
255    // replBridge's calling order is newTransport → setOnConnect → setOnData →
256    // setOnClose → connect(), and both calls need those callbacks wired first:
257    // sse.connect() opens the stream (events flow to onData/onClose immediately),
258    // and ccr.initialize().then() fires onConnectCb.
259    //
260    // onConnect fires once ccr.initialize() resolves. Writes go via
261    // CCRClient HTTP POST (SerialBatchEventUploader), not SSE, so the
262    // write path is ready the moment workerEpoch is set. SSE.connect()
263    // awaits its read loop and never resolves — don't gate on it.
264    // The SSE stream opens in parallel (~30ms) and starts delivering
265    // inbound events via setOnData; outbound doesn't need to wait for it.
266    let onConnectCb: (() => void) | undefined
267    let ccrInitialized = false
268    let closed = false
269  
270    return {
271      write(msg) {
272        return ccr.writeEvent(msg)
273      },
274      async writeBatch(msgs) {
275        // SerialBatchEventUploader already batches internally (maxBatchSize=100);
276        // sequential enqueue preserves order and the uploader coalesces.
277        // Check closed between writes to avoid sending partial batches after
278        // transport teardown (epoch mismatch, SSE drop).
279        for (const m of msgs) {
280          if (closed) break
281          await ccr.writeEvent(m)
282        }
283      },
284      close() {
285        closed = true
286        ccr.close()
287        sse.close()
288      },
289      isConnectedStatus() {
290        // Write-readiness, not read-readiness — replBridge checks this
291        // before calling writeBatch. SSE open state is orthogonal.
292        return ccrInitialized
293      },
294      getStateLabel() {
295        // SSETransport doesn't expose its state string; synthesize from
296        // what we can observe. replBridge only uses this for debug logging.
297        if (sse.isClosedStatus()) return 'closed'
298        if (sse.isConnectedStatus()) return ccrInitialized ? 'connected' : 'init'
299        return 'connecting'
300      },
301      setOnData(cb) {
302        sse.setOnData(cb)
303      },
304      setOnClose(cb) {
305        onCloseCb = cb
306        // SSE reconnect-budget exhaustion fires onClose(undefined) — map to
307        // 4092 so ws_closed telemetry can distinguish it from HTTP-status
308        // closes (SSETransport:280 passes response.status). Stop CCRClient's
309        // heartbeat timer before notifying replBridge. (sse.close() doesn't
310        // invoke this, so the epoch-mismatch path above isn't double-firing.)
311        sse.setOnClose(code => {
312          ccr.close()
313          cb(code ?? 4092)
314        })
315      },
316      setOnConnect(cb) {
317        onConnectCb = cb
318      },
319      getLastSequenceNum() {
320        return sse.getLastSequenceNum()
321      },
322      // v2 write path (CCRClient) doesn't set maxConsecutiveFailures — no drops.
323      droppedBatchCount: 0,
324      reportState(state) {
325        ccr.reportState(state)
326      },
327      reportMetadata(metadata) {
328        ccr.reportMetadata(metadata)
329      },
330      reportDelivery(eventId, status) {
331        ccr.reportDelivery(eventId, status)
332      },
333      flush() {
334        return ccr.flush()
335      },
336      connect() {
337        // Outbound-only: skip the SSE read stream entirely — no inbound
338        // events to receive, no delivery ACKs to send. Only the CCRClient
339        // write path (POST /worker/events) and heartbeat are needed.
340        if (!opts.outboundOnly) {
341          // Fire-and-forget — SSETransport.connect() awaits readStream()
342          // (the read loop) and only resolves on stream close/error. The
343          // spawn-mode path in remoteIO.ts does the same void discard.
344          void sse.connect()
345        }
346        void ccr.initialize(epoch).then(
347          () => {
348            ccrInitialized = true
349            logForDebugging(
350              `[bridge:repl] v2 transport ready for writes (epoch=${epoch}, sse=${sse.isConnectedStatus() ? 'open' : 'opening'})`,
351            )
352            onConnectCb?.()
353          },
354          (err: unknown) => {
355            logForDebugging(
356              `[bridge:repl] CCR v2 initialize failed: ${errorMessage(err)}`,
357              { level: 'error' },
358            )
359            // Close transport resources and notify replBridge via onClose
360            // so the poll loop can retry on the next work dispatch.
361            // Without this callback, replBridge never learns the transport
362            // failed to initialize and sits with transport === null forever.
363            ccr.close()
364            sse.close()
365            onCloseCb?.(4091) // 4091 = init failure, distinguishable from 4090 epoch mismatch
366          },
367        )
368      },
369    }
370  }