/ src / bridge / remoteBridgeCore.ts
remoteBridgeCore.ts
   1  // biome-ignore-all assist/source/organizeImports: ANT-ONLY import markers must not be reordered
   2  /**
   3   * Env-less Remote Control bridge core.
   4   *
   5   * "Env-less" = no Environments API layer. Distinct from "CCR v2" (the
   6   * /worker/* transport protocol) — the env-based path (replBridge.ts) can also
   7   * use CCR v2 transport via CLAUDE_CODE_USE_CCR_V2. This file is about removing
   8   * the poll/dispatch layer, not about which transport protocol is underneath.
   9   *
  10   * Unlike initBridgeCore (env-based, ~2400 lines), this connects directly
  11   * to the session-ingress layer without the Environments API work-dispatch
  12   * layer:
  13   *
  14   *   1. POST /v1/code/sessions              (OAuth, no env_id)  → session.id
  15   *   2. POST /v1/code/sessions/{id}/bridge  (OAuth)             → {worker_jwt, expires_in, api_base_url, worker_epoch}
  16   *      Each /bridge call bumps epoch — it IS the register. No separate /worker/register.
  17   *   3. createV2ReplTransport(worker_jwt, worker_epoch)         → SSE + CCRClient
  18   *   4. createTokenRefreshScheduler                             → proactive /bridge re-call (new JWT + new epoch)
  19   *   5. 401 on SSE → rebuild transport with fresh /bridge credentials (same seq-num)
  20   *
  21   * No register/poll/ack/stop/heartbeat/deregister environment lifecycle.
  22   * The Environments API historically existed because CCR's /worker/*
  23   * endpoints required a session_id+role=worker JWT that only the work-dispatch
  24   * layer could mint. Server PR #292605 (renamed in #293280) adds the /bridge endpoint as a direct
  25   * OAuth→worker_jwt exchange, making the env layer optional for REPL sessions.
  26   *
  27   * Gated by `tengu_bridge_repl_v2` GrowthBook flag in initReplBridge.ts.
  28   * REPL-only — daemon/print stay on env-based.
  29   */
  30  
  31  import { feature } from 'bun:bundle'
  32  import axios from 'axios'
  33  import {
  34    createV2ReplTransport,
  35    type ReplBridgeTransport,
  36  } from './replBridgeTransport.js'
  37  import { buildCCRv2SdkUrl } from './workSecret.js'
  38  import { toCompatSessionId } from './sessionIdCompat.js'
  39  import { FlushGate } from './flushGate.js'
  40  import { createTokenRefreshScheduler } from './jwtUtils.js'
  41  import { getTrustedDeviceToken } from './trustedDevice.js'
  42  import {
  43    getEnvLessBridgeConfig,
  44    type EnvLessBridgeConfig,
  45  } from './envLessBridgeConfig.js'
  46  import {
  47    handleIngressMessage,
  48    handleServerControlRequest,
  49    makeResultMessage,
  50    isEligibleBridgeMessage,
  51    extractTitleText,
  52    BoundedUUIDSet,
  53  } from './bridgeMessaging.js'
  54  import { logBridgeSkip } from './debugUtils.js'
  55  import { logForDebugging } from '../utils/debug.js'
  56  import { logForDiagnosticsNoPII } from '../utils/diagLogs.js'
  57  import { isInProtectedNamespace } from '../utils/envUtils.js'
  58  import { errorMessage } from '../utils/errors.js'
  59  import { sleep } from '../utils/sleep.js'
  60  import { registerCleanup } from '../utils/cleanupRegistry.js'
  61  import {
  62    type AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
  63    logEvent,
  64  } from '../services/analytics/index.js'
  65  import type { ReplBridgeHandle, BridgeState } from './replBridge.js'
  66  import type { Message } from '../types/message.js'
  67  import type { SDKMessage } from '../entrypoints/agentSdkTypes.js'
  68  import type {
  69    SDKControlRequest,
  70    SDKControlResponse,
  71  } from '../entrypoints/sdk/controlTypes.js'
  72  import type { PermissionMode } from '../utils/permissions/PermissionMode.js'
  73  
  74  const ANTHROPIC_VERSION = '2023-06-01'
  75  
  76  // Telemetry discriminator for ws_connected. 'initial' is the default and
  77  // never passed to rebuildTransport (which can only be called post-init);
  78  // Exclude<> makes that constraint explicit at both signatures.
  79  type ConnectCause = 'initial' | 'proactive_refresh' | 'auth_401_recovery'
  80  
  81  function oauthHeaders(accessToken: string): Record<string, string> {
  82    return {
  83      Authorization: `Bearer ${accessToken}`,
  84      'Content-Type': 'application/json',
  85      'anthropic-version': ANTHROPIC_VERSION,
  86    }
  87  }
  88  
  89  export type EnvLessBridgeParams = {
  90    baseUrl: string
  91    orgUUID: string
  92    title: string
  93    getAccessToken: () => string | undefined
  94    onAuth401?: (staleAccessToken: string) => Promise<boolean>
  95    /**
  96     * Converts internal Message[] → SDKMessage[] for writeMessages() and the
  97     * initial-flush/drain paths. Injected rather than imported — mappers.ts
  98     * transitively pulls in src/commands.ts (entire command registry + React
  99     * tree) which would bloat bundles that don't already have it.
 100     */
 101    toSDKMessages: (messages: Message[]) => SDKMessage[]
 102    initialHistoryCap: number
 103    initialMessages?: Message[]
 104    onInboundMessage?: (msg: SDKMessage) => void | Promise<void>
 105    /**
 106     * Fired on each title-worthy user message seen in writeMessages() until
 107     * the callback returns true (done). Mirrors replBridge.ts's onUserMessage —
 108     * caller derives a title and PATCHes /v1/sessions/{id} so auto-started
 109     * sessions don't stay at the generic fallback. The caller owns the
 110     * derive-at-count-1-and-3 policy; the transport just keeps calling until
 111     * told to stop. sessionId is the raw cse_* — updateBridgeSessionTitle
 112     * retags internally.
 113     */
 114    onUserMessage?: (text: string, sessionId: string) => boolean
 115    onPermissionResponse?: (response: SDKControlResponse) => void
 116    onInterrupt?: () => void
 117    onSetModel?: (model: string | undefined) => void
 118    onSetMaxThinkingTokens?: (maxTokens: number | null) => void
 119    onSetPermissionMode?: (
 120      mode: PermissionMode,
 121    ) => { ok: true } | { ok: false; error: string }
 122    onStateChange?: (state: BridgeState, detail?: string) => void
 123    /**
 124     * When true, skip opening the SSE read stream — only the CCRClient write
 125     * path is activated. Threaded to createV2ReplTransport and
 126     * handleServerControlRequest.
 127     */
 128    outboundOnly?: boolean
 129    /** Free-form tags for session categorization (e.g. ['ccr-mirror']). */
 130    tags?: string[]
 131  }
 132  
 133  /**
 134   * Create a session, fetch a worker JWT, connect the v2 transport.
 135   *
 136   * Returns null on any pre-flight failure (session create failed, /bridge
 137   * failed, transport setup failed). Caller (initReplBridge) surfaces this
 138   * as a generic "initialization failed" state.
 139   */
 140  export async function initEnvLessBridgeCore(
 141    params: EnvLessBridgeParams,
 142  ): Promise<ReplBridgeHandle | null> {
 143    const {
 144      baseUrl,
 145      orgUUID,
 146      title,
 147      getAccessToken,
 148      onAuth401,
 149      toSDKMessages,
 150      initialHistoryCap,
 151      initialMessages,
 152      onInboundMessage,
 153      onUserMessage,
 154      onPermissionResponse,
 155      onInterrupt,
 156      onSetModel,
 157      onSetMaxThinkingTokens,
 158      onSetPermissionMode,
 159      onStateChange,
 160      outboundOnly,
 161      tags,
 162    } = params
 163  
 164    const cfg = await getEnvLessBridgeConfig()
 165  
 166    // ── 1. Create session (POST /v1/code/sessions, no env_id) ───────────────
 167    const accessToken = getAccessToken()
 168    if (!accessToken) {
 169      logForDebugging('[remote-bridge] No OAuth token')
 170      return null
 171    }
 172  
 173    const createdSessionId = await withRetry(
 174      () =>
 175        createCodeSession(baseUrl, accessToken, title, cfg.http_timeout_ms, tags),
 176      'createCodeSession',
 177      cfg,
 178    )
 179    if (!createdSessionId) {
 180      onStateChange?.('failed', 'Session creation failed — see debug log')
 181      logBridgeSkip('v2_session_create_failed', undefined, true)
 182      return null
 183    }
 184    const sessionId: string = createdSessionId
 185    logForDebugging(`[remote-bridge] Created session ${sessionId}`)
 186    logForDiagnosticsNoPII('info', 'bridge_repl_v2_session_created')
 187  
 188    // ── 2. Fetch bridge credentials (POST /bridge → worker_jwt, expires_in, api_base_url) ──
 189    const credentials = await withRetry(
 190      () =>
 191        fetchRemoteCredentials(
 192          sessionId,
 193          baseUrl,
 194          accessToken,
 195          cfg.http_timeout_ms,
 196        ),
 197      'fetchRemoteCredentials',
 198      cfg,
 199    )
 200    if (!credentials) {
 201      onStateChange?.('failed', 'Remote credentials fetch failed — see debug log')
 202      logBridgeSkip('v2_remote_creds_failed', undefined, true)
 203      void archiveSession(
 204        sessionId,
 205        baseUrl,
 206        accessToken,
 207        orgUUID,
 208        cfg.http_timeout_ms,
 209      )
 210      return null
 211    }
 212    logForDebugging(
 213      `[remote-bridge] Fetched bridge credentials (expires_in=${credentials.expires_in}s)`,
 214    )
 215  
 216    // ── 3. Build v2 transport (SSETransport + CCRClient) ────────────────────
 217    const sessionUrl = buildCCRv2SdkUrl(credentials.api_base_url, sessionId)
 218    logForDebugging(`[remote-bridge] v2 session URL: ${sessionUrl}`)
 219  
 220    let transport: ReplBridgeTransport
 221    try {
 222      transport = await createV2ReplTransport({
 223        sessionUrl,
 224        ingressToken: credentials.worker_jwt,
 225        sessionId,
 226        epoch: credentials.worker_epoch,
 227        heartbeatIntervalMs: cfg.heartbeat_interval_ms,
 228        heartbeatJitterFraction: cfg.heartbeat_jitter_fraction,
 229        // Per-instance closure — keeps the worker JWT out of
 230        // process.env.CLAUDE_CODE_SESSION_ACCESS_TOKEN, which mcp/client.ts
 231        // reads ungatedly and would otherwise send to user-configured ws/http
 232        // MCP servers. Frozen-at-construction is correct: transport is fully
 233        // rebuilt on refresh (rebuildTransport below).
 234        getAuthToken: () => credentials.worker_jwt,
 235        outboundOnly,
 236      })
 237    } catch (err) {
 238      logForDebugging(
 239        `[remote-bridge] v2 transport setup failed: ${errorMessage(err)}`,
 240        { level: 'error' },
 241      )
 242      onStateChange?.('failed', `Transport setup failed: ${errorMessage(err)}`)
 243      logBridgeSkip('v2_transport_setup_failed', undefined, true)
 244      void archiveSession(
 245        sessionId,
 246        baseUrl,
 247        accessToken,
 248        orgUUID,
 249        cfg.http_timeout_ms,
 250      )
 251      return null
 252    }
 253    logForDebugging(
 254      `[remote-bridge] v2 transport created (epoch=${credentials.worker_epoch})`,
 255    )
 256    onStateChange?.('ready')
 257  
 258    // ── 4. State ────────────────────────────────────────────────────────────
 259  
 260    // Echo dedup: messages we POST come back on the read stream. Seeded with
 261    // initial message UUIDs so server echoes of flushed history are recognized.
 262    // Both sets cover initial UUIDs — recentPostedUUIDs is a 2000-cap ring buffer
 263    // and could evict them after enough live writes; initialMessageUUIDs is the
 264    // unbounded fallback. Defense-in-depth; mirrors replBridge.ts.
 265    const recentPostedUUIDs = new BoundedUUIDSet(cfg.uuid_dedup_buffer_size)
 266    const initialMessageUUIDs = new Set<string>()
 267    if (initialMessages) {
 268      for (const msg of initialMessages) {
 269        initialMessageUUIDs.add(msg.uuid)
 270        recentPostedUUIDs.add(msg.uuid)
 271      }
 272    }
 273  
 274    // Defensive dedup for re-delivered inbound prompts (seq-num negotiation
 275    // edge cases, server history replay after transport swap).
 276    const recentInboundUUIDs = new BoundedUUIDSet(cfg.uuid_dedup_buffer_size)
 277  
 278    // FlushGate: queue live writes while the history flush POST is in flight,
 279    // so the server receives [history..., live...] in order.
 280    const flushGate = new FlushGate<Message>()
 281  
 282    let initialFlushDone = false
 283    let tornDown = false
 284    let authRecoveryInFlight = false
 285    // Latch for onUserMessage — flips true when the callback returns true
 286    // (policy says "done deriving"). sessionId is const (no re-create path —
 287    // rebuildTransport swaps JWT/epoch, same session), so no reset needed.
 288    let userMessageCallbackDone = !onUserMessage
 289  
 290    // Telemetry: why did onConnect fire? Set by rebuildTransport before
 291    // wireTransportCallbacks; read asynchronously by onConnect. Race-safe
 292    // because authRecoveryInFlight serializes rebuild callers, and a fresh
 293    // initEnvLessBridgeCore() call gets a fresh closure defaulting to 'initial'.
 294    let connectCause: ConnectCause = 'initial'
 295  
 296    // Deadline for onConnect after transport.connect(). Cleared by onConnect
 297    // (connected) and onClose (got a close — not silent). If neither fires
 298    // before cfg.connect_timeout_ms, onConnectTimeout emits — the only
 299    // signal for the `started → (silence)` gap.
 300    let connectDeadline: ReturnType<typeof setTimeout> | undefined
 301    function onConnectTimeout(cause: ConnectCause): void {
 302      if (tornDown) return
 303      logEvent('tengu_bridge_repl_connect_timeout', {
 304        v2: true,
 305        elapsed_ms: cfg.connect_timeout_ms,
 306        cause:
 307          cause as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
 308      })
 309    }
 310  
 311    // ── 5. JWT refresh scheduler ────────────────────────────────────────────
 312    // Schedule a callback 5min before expiry (per response.expires_in). On fire,
 313    // re-fetch /bridge with OAuth → rebuild transport with fresh credentials.
 314    // Each /bridge call bumps epoch server-side, so a JWT-only swap would leave
 315    // the old CCRClient heartbeating with a stale epoch → 409 within 20s.
 316    // JWT is opaque — do not decode.
 317    const refresh = createTokenRefreshScheduler({
 318      refreshBufferMs: cfg.token_refresh_buffer_ms,
 319      getAccessToken: async () => {
 320        // Unconditionally refresh OAuth before calling /bridge — getAccessToken()
 321        // returns expired tokens as non-null strings (doesn't check expiresAt),
 322        // so truthiness doesn't mean valid. Pass the stale token to onAuth401
 323        // so handleOAuth401Error's keychain-comparison can detect parallel refresh.
 324        const stale = getAccessToken()
 325        if (onAuth401) await onAuth401(stale ?? '')
 326        return getAccessToken() ?? stale
 327      },
 328      onRefresh: (sid, oauthToken) => {
 329        void (async () => {
 330          // Laptop wake: overdue proactive timer + SSE 401 fire ~simultaneously.
 331          // Claim the flag BEFORE the /bridge fetch so the other path skips
 332          // entirely — prevents double epoch bump (each /bridge call bumps; if
 333          // both fetch, the first rebuild gets a stale epoch and 409s).
 334          if (authRecoveryInFlight || tornDown) {
 335            logForDebugging(
 336              '[remote-bridge] Recovery already in flight, skipping proactive refresh',
 337            )
 338            return
 339          }
 340          authRecoveryInFlight = true
 341          try {
 342            const fresh = await withRetry(
 343              () =>
 344                fetchRemoteCredentials(
 345                  sid,
 346                  baseUrl,
 347                  oauthToken,
 348                  cfg.http_timeout_ms,
 349                ),
 350              'fetchRemoteCredentials (proactive)',
 351              cfg,
 352            )
 353            if (!fresh || tornDown) return
 354            await rebuildTransport(fresh, 'proactive_refresh')
 355            logForDebugging(
 356              '[remote-bridge] Transport rebuilt (proactive refresh)',
 357            )
 358          } catch (err) {
 359            logForDebugging(
 360              `[remote-bridge] Proactive refresh rebuild failed: ${errorMessage(err)}`,
 361              { level: 'error' },
 362            )
 363            logForDiagnosticsNoPII(
 364              'error',
 365              'bridge_repl_v2_proactive_refresh_failed',
 366            )
 367            if (!tornDown) {
 368              onStateChange?.('failed', `Refresh failed: ${errorMessage(err)}`)
 369            }
 370          } finally {
 371            authRecoveryInFlight = false
 372          }
 373        })()
 374      },
 375      label: 'remote',
 376    })
 377    refresh.scheduleFromExpiresIn(sessionId, credentials.expires_in)
 378  
 379    // ── 6. Wire callbacks (extracted so transport-rebuild can re-wire) ──────
 380    function wireTransportCallbacks(): void {
 381      transport.setOnConnect(() => {
 382        clearTimeout(connectDeadline)
 383        logForDebugging('[remote-bridge] v2 transport connected')
 384        logForDiagnosticsNoPII('info', 'bridge_repl_v2_transport_connected')
 385        logEvent('tengu_bridge_repl_ws_connected', {
 386          v2: true,
 387          cause:
 388            connectCause as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
 389        })
 390  
 391        if (!initialFlushDone && initialMessages && initialMessages.length > 0) {
 392          initialFlushDone = true
 393          // Capture current transport — if 401/teardown happens mid-flush,
 394          // the stale .finally() must not drain the gate or signal connected.
 395          // (Same guard pattern as replBridge.ts:1119.)
 396          const flushTransport = transport
 397          void flushHistory(initialMessages)
 398            .catch(e =>
 399              logForDebugging(`[remote-bridge] flushHistory failed: ${e}`),
 400            )
 401            .finally(() => {
 402              // authRecoveryInFlight catches the v1-vs-v2 asymmetry: v1 nulls
 403              // transport synchronously in setOnClose (replBridge.ts:1175), so
 404              // transport !== flushTransport trips immediately. v2 doesn't null —
 405              // transport reassigned only at rebuildTransport:346, 3 awaits deep.
 406              // authRecoveryInFlight is set synchronously at rebuildTransport entry.
 407              if (
 408                transport !== flushTransport ||
 409                tornDown ||
 410                authRecoveryInFlight
 411              ) {
 412                return
 413              }
 414              drainFlushGate()
 415              onStateChange?.('connected')
 416            })
 417        } else if (!flushGate.active) {
 418          onStateChange?.('connected')
 419        }
 420      })
 421  
 422      transport.setOnData((data: string) => {
 423        handleIngressMessage(
 424          data,
 425          recentPostedUUIDs,
 426          recentInboundUUIDs,
 427          onInboundMessage,
 428          // Remote client answered the permission prompt — the turn resumes.
 429          // Without this the server stays on requires_action until the next
 430          // user message or turn-end result.
 431          onPermissionResponse
 432            ? res => {
 433                transport.reportState('running')
 434                onPermissionResponse(res)
 435              }
 436            : undefined,
 437          req =>
 438            handleServerControlRequest(req, {
 439              transport,
 440              sessionId,
 441              onInterrupt,
 442              onSetModel,
 443              onSetMaxThinkingTokens,
 444              onSetPermissionMode,
 445              outboundOnly,
 446            }),
 447        )
 448      })
 449  
 450      transport.setOnClose((code?: number) => {
 451        clearTimeout(connectDeadline)
 452        if (tornDown) return
 453        logForDebugging(`[remote-bridge] v2 transport closed (code=${code})`)
 454        logEvent('tengu_bridge_repl_ws_closed', { code, v2: true })
 455        // onClose fires only for TERMINAL failures: 401 (JWT invalid),
 456        // 4090 (CCR epoch mismatch), 4091 (CCR init failed), or SSE 10-min
 457        // reconnect budget exhausted. Transient disconnects are handled
 458        // transparently inside SSETransport. 401 we can recover from (fetch
 459        // fresh JWT, rebuild transport); all other codes are dead-ends.
 460        if (code === 401 && !authRecoveryInFlight) {
 461          void recoverFromAuthFailure()
 462          return
 463        }
 464        onStateChange?.('failed', `Transport closed (code ${code})`)
 465      })
 466    }
 467  
 468    // ── 7. Transport rebuild (shared by proactive refresh + 401 recovery) ──
 469    // Every /bridge call bumps epoch server-side. Both refresh paths must
 470    // rebuild the transport with the new epoch — a JWT-only swap leaves the
 471    // old CCRClient heartbeating stale epoch → 409. SSE resumes from the old
 472    // transport's high-water-mark seq-num so no server-side replay.
 473    // Caller MUST set authRecoveryInFlight = true before calling (synchronously,
 474    // before any await) and clear it in a finally. This function doesn't manage
 475    // the flag — moving it here would be too late to prevent a double /bridge
 476    // fetch, and each fetch bumps epoch.
 477    async function rebuildTransport(
 478      fresh: RemoteCredentials,
 479      cause: Exclude<ConnectCause, 'initial'>,
 480    ): Promise<void> {
 481      connectCause = cause
 482      // Queue writes during rebuild — once /bridge returns, the old transport's
 483      // epoch is stale and its next write/heartbeat 409s. Without this gate,
 484      // writeMessages adds UUIDs to recentPostedUUIDs then writeBatch silently
 485      // no-ops (closed uploader after 409) → permanent silent message loss.
 486      flushGate.start()
 487      try {
 488        const seq = transport.getLastSequenceNum()
 489        transport.close()
 490        transport = await createV2ReplTransport({
 491          sessionUrl: buildCCRv2SdkUrl(fresh.api_base_url, sessionId),
 492          ingressToken: fresh.worker_jwt,
 493          sessionId,
 494          epoch: fresh.worker_epoch,
 495          heartbeatIntervalMs: cfg.heartbeat_interval_ms,
 496          heartbeatJitterFraction: cfg.heartbeat_jitter_fraction,
 497          initialSequenceNum: seq,
 498          getAuthToken: () => fresh.worker_jwt,
 499          outboundOnly,
 500        })
 501        if (tornDown) {
 502          // Teardown fired during the async createV2ReplTransport window.
 503          // Don't wire/connect/schedule — we'd re-arm timers after cancelAll()
 504          // and fire onInboundMessage into a torn-down bridge.
 505          transport.close()
 506          return
 507        }
 508        wireTransportCallbacks()
 509        transport.connect()
 510        connectDeadline = setTimeout(
 511          onConnectTimeout,
 512          cfg.connect_timeout_ms,
 513          connectCause,
 514        )
 515        refresh.scheduleFromExpiresIn(sessionId, fresh.expires_in)
 516        // Drain queued writes into the new uploader. Runs before
 517        // ccr.initialize() resolves (transport.connect() is fire-and-forget),
 518        // but the uploader serializes behind the initial PUT /worker. If
 519        // init fails (4091), events drop — but only recentPostedUUIDs
 520        // (per-instance) is populated, so re-enabling the bridge re-flushes.
 521        drainFlushGate()
 522      } finally {
 523        // End the gate on failure paths too — drainFlushGate already ended
 524        // it on success. Queued messages are dropped (transport still dead).
 525        flushGate.drop()
 526      }
 527    }
 528  
 529    // ── 8. 401 recovery (OAuth refresh + rebuild) ───────────────────────────
 530    async function recoverFromAuthFailure(): Promise<void> {
 531      // setOnClose already guards `!authRecoveryInFlight` but that check and
 532      // this set must be atomic against onRefresh — claim synchronously before
 533      // any await. Laptop wake fires both paths ~simultaneously.
 534      if (authRecoveryInFlight) return
 535      authRecoveryInFlight = true
 536      onStateChange?.('reconnecting', 'JWT expired — refreshing')
 537      logForDebugging('[remote-bridge] 401 on SSE — attempting JWT refresh')
 538      try {
 539        // Unconditionally try OAuth refresh — getAccessToken() returns expired
 540        // tokens as non-null strings, so !oauthToken doesn't catch expiry.
 541        // Pass the stale token so handleOAuth401Error's keychain-comparison
 542        // can detect if another tab already refreshed.
 543        const stale = getAccessToken()
 544        if (onAuth401) await onAuth401(stale ?? '')
 545        const oauthToken = getAccessToken() ?? stale
 546        if (!oauthToken || tornDown) {
 547          if (!tornDown) {
 548            onStateChange?.('failed', 'JWT refresh failed: no OAuth token')
 549          }
 550          return
 551        }
 552  
 553        const fresh = await withRetry(
 554          () =>
 555            fetchRemoteCredentials(
 556              sessionId,
 557              baseUrl,
 558              oauthToken,
 559              cfg.http_timeout_ms,
 560            ),
 561          'fetchRemoteCredentials (recovery)',
 562          cfg,
 563        )
 564        if (!fresh || tornDown) {
 565          if (!tornDown) {
 566            onStateChange?.('failed', 'JWT refresh failed after 401')
 567          }
 568          return
 569        }
 570        // If 401 interrupted the initial flush, writeBatch may have silently
 571        // no-op'd on the closed uploader (ccr.close() ran in the SSE wrapper
 572        // before our setOnClose callback). Reset so the new onConnect re-flushes.
 573        // (v1 scopes initialFlushDone inside the per-transport closure at
 574        // replBridge.ts:1027 so it resets naturally; v2 has it at outer scope.)
 575        initialFlushDone = false
 576        await rebuildTransport(fresh, 'auth_401_recovery')
 577        logForDebugging('[remote-bridge] Transport rebuilt after 401')
 578      } catch (err) {
 579        logForDebugging(
 580          `[remote-bridge] 401 recovery failed: ${errorMessage(err)}`,
 581          { level: 'error' },
 582        )
 583        logForDiagnosticsNoPII('error', 'bridge_repl_v2_jwt_refresh_failed')
 584        if (!tornDown) {
 585          onStateChange?.('failed', `JWT refresh failed: ${errorMessage(err)}`)
 586        }
 587      } finally {
 588        authRecoveryInFlight = false
 589      }
 590    }
 591  
 592    wireTransportCallbacks()
 593  
 594    // Start flushGate BEFORE connect so writeMessages() during handshake
 595    // queues instead of racing the history POST.
 596    if (initialMessages && initialMessages.length > 0) {
 597      flushGate.start()
 598    }
 599    transport.connect()
 600    connectDeadline = setTimeout(
 601      onConnectTimeout,
 602      cfg.connect_timeout_ms,
 603      connectCause,
 604    )
 605  
 606    // ── 8. History flush + drain helpers ────────────────────────────────────
 607    function drainFlushGate(): void {
 608      const msgs = flushGate.end()
 609      if (msgs.length === 0) return
 610      for (const msg of msgs) recentPostedUUIDs.add(msg.uuid)
 611      const events = toSDKMessages(msgs).map(m => ({
 612        ...m,
 613        session_id: sessionId,
 614      }))
 615      if (msgs.some(m => m.type === 'user')) {
 616        transport.reportState('running')
 617      }
 618      logForDebugging(
 619        `[remote-bridge] Drained ${msgs.length} queued message(s) after flush`,
 620      )
 621      void transport.writeBatch(events)
 622    }
 623  
 624    async function flushHistory(msgs: Message[]): Promise<void> {
 625      // v2 always creates a fresh server session (unconditional createCodeSession
 626      // above) — no session reuse, no double-post risk. Unlike v1, we do NOT
 627      // filter by previouslyFlushedUUIDs: that set persists across REPL enable/
 628      // disable cycles (useRef), so it would wrongly suppress history on re-enable.
 629      const eligible = msgs.filter(isEligibleBridgeMessage)
 630      const capped =
 631        initialHistoryCap > 0 && eligible.length > initialHistoryCap
 632          ? eligible.slice(-initialHistoryCap)
 633          : eligible
 634      if (capped.length < eligible.length) {
 635        logForDebugging(
 636          `[remote-bridge] Capped initial flush: ${eligible.length} -> ${capped.length} (cap=${initialHistoryCap})`,
 637        )
 638      }
 639      const events = toSDKMessages(capped).map(m => ({
 640        ...m,
 641        session_id: sessionId,
 642      }))
 643      if (events.length === 0) return
 644      // Mid-turn init: if Remote Control is enabled while a query is running,
 645      // the last eligible message is a user prompt or tool_result (both 'user'
 646      // type). Without this the init PUT's 'idle' sticks until the next user-
 647      // type message forwards via writeMessages — which for a pure-text turn
 648      // is never (only assistant chunks stream post-init). Check eligible (pre-
 649      // cap), not capped: the cap may truncate to a user message even when the
 650      // actual trailing message is assistant.
 651      if (eligible.at(-1)?.type === 'user') {
 652        transport.reportState('running')
 653      }
 654      logForDebugging(`[remote-bridge] Flushing ${events.length} history events`)
 655      await transport.writeBatch(events)
 656    }
 657  
 658    // ── 9. Teardown ───────────────────────────────────────────────────────────
 659    // On SIGINT/SIGTERM/⁠/exit, gracefulShutdown races runCleanupFunctions()
 660    // against a 2s cap before forceExit kills the process. Budget accordingly:
 661    //   - archive: teardown_archive_timeout_ms (default 1500, cap 2000)
 662    //   - result write: fire-and-forget, archive latency covers the drain
 663    //   - 401 retry: only if first archive 401s, shares the same budget
 664    async function teardown(): Promise<void> {
 665      if (tornDown) return
 666      tornDown = true
 667      refresh.cancelAll()
 668      clearTimeout(connectDeadline)
 669      flushGate.drop()
 670  
 671      // Fire the result message before archive — transport.write() only awaits
 672      // enqueue (SerialBatchEventUploader resolves once buffered, drain is
 673      // async). Archiving before close() gives the uploader's drain loop a
 674      // window (typical archive ≈ 100-500ms) to POST the result without an
 675      // explicit sleep. close() sets closed=true which interrupts drain at the
 676      // next while-check, so close-before-archive drops the result.
 677      transport.reportState('idle')
 678      void transport.write(makeResultMessage(sessionId))
 679  
 680      let token = getAccessToken()
 681      let status = await archiveSession(
 682        sessionId,
 683        baseUrl,
 684        token,
 685        orgUUID,
 686        cfg.teardown_archive_timeout_ms,
 687      )
 688  
 689      // Token is usually fresh (refresh scheduler runs 5min before expiry) but
 690      // laptop-wake past the refresh window leaves getAccessToken() returning a
 691      // stale string. Retry once on 401 — onAuth401 (= handleOAuth401Error)
 692      // clears keychain cache + force-refreshes. No proactive refresh on the
 693      // happy path: handleOAuth401Error force-refreshes even valid tokens,
 694      // which would waste budget 99% of the time. try/catch mirrors
 695      // recoverFromAuthFailure: keychain reads can throw (macOS locked after
 696      // wake); an uncaught throw here would skip transport.close + telemetry.
 697      if (status === 401 && onAuth401) {
 698        try {
 699          await onAuth401(token ?? '')
 700          token = getAccessToken()
 701          status = await archiveSession(
 702            sessionId,
 703            baseUrl,
 704            token,
 705            orgUUID,
 706            cfg.teardown_archive_timeout_ms,
 707          )
 708        } catch (err) {
 709          logForDebugging(
 710            `[remote-bridge] Teardown 401 retry threw: ${errorMessage(err)}`,
 711            { level: 'error' },
 712          )
 713        }
 714      }
 715  
 716      transport.close()
 717  
 718      const archiveStatus: ArchiveTelemetryStatus =
 719        status === 'no_token'
 720          ? 'skipped_no_token'
 721          : status === 'timeout' || status === 'error'
 722            ? 'network_error'
 723            : status >= 500
 724              ? 'server_5xx'
 725              : status >= 400
 726                ? 'server_4xx'
 727                : 'ok'
 728  
 729      logForDebugging(`[remote-bridge] Torn down (archive=${status})`)
 730      logForDiagnosticsNoPII('info', 'bridge_repl_v2_teardown')
 731      logEvent(
 732        feature('CCR_MIRROR') && outboundOnly
 733          ? 'tengu_ccr_mirror_teardown'
 734          : 'tengu_bridge_repl_teardown',
 735        {
 736          v2: true,
 737          archive_status:
 738            archiveStatus as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
 739          archive_ok: typeof status === 'number' && status < 400,
 740          archive_http_status: typeof status === 'number' ? status : undefined,
 741          archive_timeout: status === 'timeout',
 742          archive_no_token: status === 'no_token',
 743        },
 744      )
 745    }
 746    const unregister = registerCleanup(teardown)
 747  
 748    if (feature('CCR_MIRROR') && outboundOnly) {
 749      logEvent('tengu_ccr_mirror_started', {
 750        v2: true,
 751        expires_in_s: credentials.expires_in,
 752      })
 753    } else {
 754      logEvent('tengu_bridge_repl_started', {
 755        has_initial_messages: !!(initialMessages && initialMessages.length > 0),
 756        v2: true,
 757        expires_in_s: credentials.expires_in,
 758        inProtectedNamespace: isInProtectedNamespace(),
 759      })
 760    }
 761  
 762    // ── 10. Handle ──────────────────────────────────────────────────────────
 763    return {
 764      bridgeSessionId: sessionId,
 765      environmentId: '',
 766      sessionIngressUrl: credentials.api_base_url,
 767      writeMessages(messages) {
 768        const filtered = messages.filter(
 769          m =>
 770            isEligibleBridgeMessage(m) &&
 771            !initialMessageUUIDs.has(m.uuid) &&
 772            !recentPostedUUIDs.has(m.uuid),
 773        )
 774        if (filtered.length === 0) return
 775  
 776        // Fire onUserMessage for title derivation. Scan before the flushGate
 777        // check — prompts are title-worthy even if they queue. Keeps calling
 778        // on every title-worthy message until the callback returns true; the
 779        // caller owns the policy (derive at 1st and 3rd, skip if explicit).
 780        if (!userMessageCallbackDone) {
 781          for (const m of filtered) {
 782            const text = extractTitleText(m)
 783            if (text !== undefined && onUserMessage?.(text, sessionId)) {
 784              userMessageCallbackDone = true
 785              break
 786            }
 787          }
 788        }
 789  
 790        if (flushGate.enqueue(...filtered)) {
 791          logForDebugging(
 792            `[remote-bridge] Queued ${filtered.length} message(s) during flush`,
 793          )
 794          return
 795        }
 796  
 797        for (const msg of filtered) recentPostedUUIDs.add(msg.uuid)
 798        const events = toSDKMessages(filtered).map(m => ({
 799          ...m,
 800          session_id: sessionId,
 801        }))
 802        // v2 does not derive worker_status from events server-side (unlike v1
 803        // session-ingress session_status_updater.go). Push it from here so the
 804        // CCR web session list shows Running instead of stuck on Idle. A user
 805        // message in the batch marks turn start. CCRClient.reportState dedupes
 806        // consecutive same-state pushes.
 807        if (filtered.some(m => m.type === 'user')) {
 808          transport.reportState('running')
 809        }
 810        logForDebugging(`[remote-bridge] Sending ${filtered.length} message(s)`)
 811        void transport.writeBatch(events)
 812      },
 813      writeSdkMessages(messages: SDKMessage[]) {
 814        const filtered = messages.filter(
 815          m => !m.uuid || !recentPostedUUIDs.has(m.uuid),
 816        )
 817        if (filtered.length === 0) return
 818        for (const msg of filtered) {
 819          if (msg.uuid) recentPostedUUIDs.add(msg.uuid)
 820        }
 821        const events = filtered.map(m => ({ ...m, session_id: sessionId }))
 822        void transport.writeBatch(events)
 823      },
 824      sendControlRequest(request: SDKControlRequest) {
 825        if (authRecoveryInFlight) {
 826          logForDebugging(
 827            `[remote-bridge] Dropping control_request during 401 recovery: ${request.request_id}`,
 828          )
 829          return
 830        }
 831        const event = { ...request, session_id: sessionId }
 832        if (request.request.subtype === 'can_use_tool') {
 833          transport.reportState('requires_action')
 834        }
 835        void transport.write(event)
 836        logForDebugging(
 837          `[remote-bridge] Sent control_request request_id=${request.request_id}`,
 838        )
 839      },
 840      sendControlResponse(response: SDKControlResponse) {
 841        if (authRecoveryInFlight) {
 842          logForDebugging(
 843            '[remote-bridge] Dropping control_response during 401 recovery',
 844          )
 845          return
 846        }
 847        const event = { ...response, session_id: sessionId }
 848        transport.reportState('running')
 849        void transport.write(event)
 850        logForDebugging('[remote-bridge] Sent control_response')
 851      },
 852      sendControlCancelRequest(requestId: string) {
 853        if (authRecoveryInFlight) {
 854          logForDebugging(
 855            `[remote-bridge] Dropping control_cancel_request during 401 recovery: ${requestId}`,
 856          )
 857          return
 858        }
 859        const event = {
 860          type: 'control_cancel_request' as const,
 861          request_id: requestId,
 862          session_id: sessionId,
 863        }
 864        // Hook/classifier/channel/recheck resolved the permission locally —
 865        // interactiveHandler calls only cancelRequest (no sendResponse) on
 866        // those paths, so without this the server stays on requires_action.
 867        transport.reportState('running')
 868        void transport.write(event)
 869        logForDebugging(
 870          `[remote-bridge] Sent control_cancel_request request_id=${requestId}`,
 871        )
 872      },
 873      sendResult() {
 874        if (authRecoveryInFlight) {
 875          logForDebugging('[remote-bridge] Dropping result during 401 recovery')
 876          return
 877        }
 878        transport.reportState('idle')
 879        void transport.write(makeResultMessage(sessionId))
 880        logForDebugging(`[remote-bridge] Sent result`)
 881      },
 882      async teardown() {
 883        unregister()
 884        await teardown()
 885      },
 886    }
 887  }
 888  
 889  // ─── Session API (v2 /code/sessions, no env) ─────────────────────────────────
 890  
 891  /** Retry an async init call with exponential backoff + jitter. */
 892  async function withRetry<T>(
 893    fn: () => Promise<T | null>,
 894    label: string,
 895    cfg: EnvLessBridgeConfig,
 896  ): Promise<T | null> {
 897    const max = cfg.init_retry_max_attempts
 898    for (let attempt = 1; attempt <= max; attempt++) {
 899      const result = await fn()
 900      if (result !== null) return result
 901      if (attempt < max) {
 902        const base = cfg.init_retry_base_delay_ms * 2 ** (attempt - 1)
 903        const jitter =
 904          base * cfg.init_retry_jitter_fraction * (2 * Math.random() - 1)
 905        const delay = Math.min(base + jitter, cfg.init_retry_max_delay_ms)
 906        logForDebugging(
 907          `[remote-bridge] ${label} failed (attempt ${attempt}/${max}), retrying in ${Math.round(delay)}ms`,
 908        )
 909        await sleep(delay)
 910      }
 911    }
 912    return null
 913  }
 914  
 915  // Moved to codeSessionApi.ts so the SDK /bridge subpath can bundle them
 916  // without pulling in this file's heavy CLI tree (analytics, transport).
 917  export {
 918    createCodeSession,
 919    type RemoteCredentials,
 920  } from './codeSessionApi.js'
 921  import {
 922    createCodeSession,
 923    fetchRemoteCredentials as fetchRemoteCredentialsRaw,
 924    type RemoteCredentials,
 925  } from './codeSessionApi.js'
 926  import { getBridgeBaseUrlOverride } from './bridgeConfig.js'
 927  
 928  // CLI-side wrapper that applies the CLAUDE_BRIDGE_BASE_URL dev override and
 929  // injects the trusted-device token (both are env/GrowthBook reads that the
 930  // SDK-facing codeSessionApi.ts export must stay free of).
 931  export async function fetchRemoteCredentials(
 932    sessionId: string,
 933    baseUrl: string,
 934    accessToken: string,
 935    timeoutMs: number,
 936  ): Promise<RemoteCredentials | null> {
 937    const creds = await fetchRemoteCredentialsRaw(
 938      sessionId,
 939      baseUrl,
 940      accessToken,
 941      timeoutMs,
 942      getTrustedDeviceToken(),
 943    )
 944    if (!creds) return null
 945    return getBridgeBaseUrlOverride()
 946      ? { ...creds, api_base_url: baseUrl }
 947      : creds
 948  }
 949  
 950  type ArchiveStatus = number | 'timeout' | 'error' | 'no_token'
 951  
 952  // Single categorical for BQ `GROUP BY archive_status`. The booleans on
 953  // _teardown predate this and are redundant with it (except archive_timeout,
 954  // which distinguishes ECONNABORTED from other network errors — both map to
 955  // 'network_error' here since the dominant cause in a 1.5s window is timeout).
 956  type ArchiveTelemetryStatus =
 957    | 'ok'
 958    | 'skipped_no_token'
 959    | 'network_error'
 960    | 'server_4xx'
 961    | 'server_5xx'
 962  
 963  async function archiveSession(
 964    sessionId: string,
 965    baseUrl: string,
 966    accessToken: string | undefined,
 967    orgUUID: string,
 968    timeoutMs: number,
 969  ): Promise<ArchiveStatus> {
 970    if (!accessToken) return 'no_token'
 971    // Archive lives at the compat layer (/v1/sessions/*, not /v1/code/sessions).
 972    // compat.parseSessionID only accepts TagSession (session_*), so retag cse_*.
 973    // anthropic-beta + x-organization-uuid are required — without them the
 974    // compat gateway 404s before reaching the handler.
 975    //
 976    // Unlike bridgeMain.ts (which caches compatId in sessionCompatIds to keep
 977    // in-memory titledSessions/logger keys consistent across a mid-session
 978    // gate flip), this compatId is only a server URL path segment — no
 979    // in-memory state. Fresh compute matches whatever the server currently
 980    // validates: if the gate is OFF, the server has been updated to accept
 981    // cse_* and we correctly send it.
 982    const compatId = toCompatSessionId(sessionId)
 983    try {
 984      const response = await axios.post(
 985        `${baseUrl}/v1/sessions/${compatId}/archive`,
 986        {},
 987        {
 988          headers: {
 989            ...oauthHeaders(accessToken),
 990            'anthropic-beta': 'ccr-byoc-2025-07-29',
 991            'x-organization-uuid': orgUUID,
 992          },
 993          timeout: timeoutMs,
 994          validateStatus: () => true,
 995        },
 996      )
 997      logForDebugging(
 998        `[remote-bridge] Archive ${compatId} status=${response.status}`,
 999      )
1000      return response.status
1001    } catch (err) {
1002      const msg = errorMessage(err)
1003      logForDebugging(`[remote-bridge] Archive failed: ${msg}`)
1004      return axios.isAxiosError(err) && err.code === 'ECONNABORTED'
1005        ? 'timeout'
1006        : 'error'
1007    }
1008  }