/ bridge / replBridge.ts
replBridge.ts
   1  // biome-ignore-all assist/source/organizeImports: ANT-ONLY import markers must not be reordered
   2  import { randomUUID } from 'crypto'
   3  import {
   4    createBridgeApiClient,
   5    BridgeFatalError,
   6    isExpiredErrorType,
   7    isSuppressible403,
   8  } from './bridgeApi.js'
   9  import type { BridgeConfig, BridgeApiClient } from './types.js'
  10  import { logForDebugging } from '../utils/debug.js'
  11  import { logForDiagnosticsNoPII } from '../utils/diagLogs.js'
  12  import {
  13    type AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
  14    logEvent,
  15  } from '../services/analytics/index.js'
  16  import { registerCleanup } from '../utils/cleanupRegistry.js'
  17  import {
  18    handleIngressMessage,
  19    handleServerControlRequest,
  20    makeResultMessage,
  21    isEligibleBridgeMessage,
  22    extractTitleText,
  23    BoundedUUIDSet,
  24  } from './bridgeMessaging.js'
  25  import {
  26    decodeWorkSecret,
  27    buildSdkUrl,
  28    buildCCRv2SdkUrl,
  29    sameSessionId,
  30  } from './workSecret.js'
  31  import { toCompatSessionId, toInfraSessionId } from './sessionIdCompat.js'
  32  import { updateSessionBridgeId } from '../utils/concurrentSessions.js'
  33  import { getTrustedDeviceToken } from './trustedDevice.js'
  34  import { HybridTransport } from '../cli/transports/HybridTransport.js'
  35  import {
  36    type ReplBridgeTransport,
  37    createV1ReplTransport,
  38    createV2ReplTransport,
  39  } from './replBridgeTransport.js'
  40  import { updateSessionIngressAuthToken } from '../utils/sessionIngressAuth.js'
  41  import { isEnvTruthy, isInProtectedNamespace } from '../utils/envUtils.js'
  42  import { validateBridgeId } from './bridgeApi.js'
  43  import {
  44    describeAxiosError,
  45    extractHttpStatus,
  46    logBridgeSkip,
  47  } from './debugUtils.js'
  48  import type { Message } from '../types/message.js'
  49  import type { SDKMessage } from '../entrypoints/agentSdkTypes.js'
  50  import type { PermissionMode } from '../utils/permissions/PermissionMode.js'
  51  import type {
  52    SDKControlRequest,
  53    SDKControlResponse,
  54  } from '../entrypoints/sdk/controlTypes.js'
  55  import { createCapacityWake, type CapacitySignal } from './capacityWake.js'
  56  import { FlushGate } from './flushGate.js'
  57  import {
  58    DEFAULT_POLL_CONFIG,
  59    type PollIntervalConfig,
  60  } from './pollConfigDefaults.js'
  61  import { errorMessage } from '../utils/errors.js'
  62  import { sleep } from '../utils/sleep.js'
  63  import {
  64    wrapApiForFaultInjection,
  65    registerBridgeDebugHandle,
  66    clearBridgeDebugHandle,
  67    injectBridgeFault,
  68  } from './bridgeDebug.js'
  69  
  70  export type ReplBridgeHandle = {
  71    bridgeSessionId: string
  72    environmentId: string
  73    sessionIngressUrl: string
  74    writeMessages(messages: Message[]): void
  75    writeSdkMessages(messages: SDKMessage[]): void
  76    sendControlRequest(request: SDKControlRequest): void
  77    sendControlResponse(response: SDKControlResponse): void
  78    sendControlCancelRequest(requestId: string): void
  79    sendResult(): void
  80    teardown(): Promise<void>
  81  }
  82  
  83  export type BridgeState = 'ready' | 'connected' | 'reconnecting' | 'failed'
  84  
  85  /**
  86   * Explicit-param input to initBridgeCore. Everything initReplBridge reads
  87   * from bootstrap state (cwd, session ID, git, OAuth) becomes a field here.
  88   * A daemon caller (Agent SDK, PR 4) that never runs main.tsx fills these
  89   * in itself.
  90   */
  91  export type BridgeCoreParams = {
  92    dir: string
  93    machineName: string
  94    branch: string
  95    gitRepoUrl: string | null
  96    title: string
  97    baseUrl: string
  98    sessionIngressUrl: string
  99    /**
 100     * Opaque string sent as metadata.worker_type. Use BridgeWorkerType for
 101     * the two CLI-originated values; daemon callers may send any string the
 102     * backend recognizes (it's just a filter key on the web side).
 103     */
 104    workerType: string
 105    getAccessToken: () => string | undefined
 106    /**
 107     * POST /v1/sessions. Injected because `createSession.ts` lazy-loads
 108     * `auth.ts`/`model.ts`/`oauth/client.ts` and `bun --outfile` inlines
 109     * dynamic imports — the lazy-load doesn't help, the whole REPL tree ends
 110     * up in the Agent SDK bundle.
 111     *
 112     * REPL wrapper passes `createBridgeSession` from `createSession.ts`.
 113     * Daemon wrapper passes `createBridgeSessionLean` from `sessionApi.ts`
 114     * (HTTP-only, orgUUID+model supplied by the daemon caller).
 115     *
 116     * Receives `gitRepoUrl`+`branch` so the REPL wrapper can build the git
 117     * source/outcome for claude.ai's session card. Daemon ignores them.
 118     */
 119    createSession: (opts: {
 120      environmentId: string
 121      title: string
 122      gitRepoUrl: string | null
 123      branch: string
 124      signal: AbortSignal
 125    }) => Promise<string | null>
 126    /**
 127     * POST /v1/sessions/{id}/archive. Same injection rationale. Best-effort;
 128     * the callback MUST NOT throw.
 129     */
 130    archiveSession: (sessionId: string) => Promise<void>
 131    /**
 132     * Invoked on reconnect-after-env-lost to refresh the title. REPL wrapper
 133     * reads session storage (picks up /rename); daemon returns the static
 134     * title. Defaults to () => title.
 135     */
 136    getCurrentTitle?: () => string
 137    /**
 138     * Converts internal Message[] → SDKMessage[] for writeMessages() and the
 139     * initial-flush/drain paths. REPL wrapper passes the real toSDKMessages
 140     * from utils/messages/mappers.ts. Daemon callers that only use
 141     * writeSdkMessages() and pass no initialMessages can omit this — those
 142     * code paths are unreachable.
 143     *
 144     * Injected rather than imported because mappers.ts transitively pulls in
 145     * src/commands.ts via messages.ts → api.ts → prompts.ts, dragging the
 146     * entire command registry + React tree into the Agent SDK bundle.
 147     */
 148    toSDKMessages?: (messages: Message[]) => SDKMessage[]
 149    /**
 150     * OAuth 401 refresh handler passed to createBridgeApiClient. REPL wrapper
 151     * passes handleOAuth401Error; daemon passes its AuthManager's handler.
 152     * Injected because utils/auth.ts transitively pulls in the command
 153     * registry via config.ts → file.ts → permissions/filesystem.ts →
 154     * sessionStorage.ts → commands.ts.
 155     */
 156    onAuth401?: (staleAccessToken: string) => Promise<boolean>
 157    /**
 158     * Poll interval config getter for the work-poll heartbeat loop. REPL
 159     * wrapper passes the GrowthBook-backed getPollIntervalConfig (allows ops
 160     * to live-tune poll rates fleet-wide). Daemon passes a static config
 161     * with a 60s heartbeat (5× headroom under the 300s work-lease TTL).
 162     * Injected because growthbook.ts transitively pulls in the command
 163     * registry via the same config.ts chain.
 164     */
 165    getPollIntervalConfig?: () => PollIntervalConfig
 166    /**
 167     * Max initial messages to replay on connect. REPL wrapper reads from the
 168     * tengu_bridge_initial_history_cap GrowthBook flag. Daemon passes no
 169     * initialMessages so this is never read. Default 200 matches the flag
 170     * default.
 171     */
 172    initialHistoryCap?: number
 173    // Same REPL-flush machinery as InitBridgeOptions — daemon omits these.
 174    initialMessages?: Message[]
 175    previouslyFlushedUUIDs?: Set<string>
 176    onInboundMessage?: (msg: SDKMessage) => void
 177    onPermissionResponse?: (response: SDKControlResponse) => void
 178    onInterrupt?: () => void
 179    onSetModel?: (model: string | undefined) => void
 180    onSetMaxThinkingTokens?: (maxTokens: number | null) => void
 181    /**
 182     * Returns a policy verdict so this module can emit an error control_response
 183     * without importing the policy checks itself (bootstrap-isolation constraint).
 184     * The callback must guard `auto` (isAutoModeGateEnabled) and
 185     * `bypassPermissions` (isBypassPermissionsModeDisabled AND
 186     * isBypassPermissionsModeAvailable) BEFORE calling transitionPermissionMode —
 187     * that function's internal auto-gate check is a defensive throw, not a
 188     * graceful guard, and its side-effect order is setAutoModeActive(true) then
 189     * throw, which corrupts the 3-way invariant documented in src/CLAUDE.md if
 190     * the callback lets the throw escape here.
 191     */
 192    onSetPermissionMode?: (
 193      mode: PermissionMode,
 194    ) => { ok: true } | { ok: false; error: string }
 195    onStateChange?: (state: BridgeState, detail?: string) => void
 196    /**
 197     * Fires on each real user message to flow through writeMessages() until
 198     * the callback returns true (done). Mirrors remoteBridgeCore.ts's
 199     * onUserMessage so the REPL bridge can derive a session title from early
 200     * prompts when none was set at init time (e.g. user runs /remote-control
 201     * on an empty conversation, then types). Tool-result wrappers, meta
 202     * messages, and display-tag-only messages are skipped. Receives
 203     * currentSessionId so the wrapper can PATCH the title without a closure
 204     * dance to reach the not-yet-returned handle. The caller owns the
 205     * derive-at-count-1-and-3 policy; the transport just keeps calling until
 206     * told to stop. Not fired for the writeSdkMessages daemon path (daemon
 207     * sets its own title at init). Distinct from SessionSpawnOpts's
 208     * onFirstUserMessage (spawn-bridge, PR #21250), which stays fire-once.
 209     */
 210    onUserMessage?: (text: string, sessionId: string) => boolean
 211    /** See InitBridgeOptions.perpetual. */
 212    perpetual?: boolean
 213    /**
 214     * Seeds lastTransportSequenceNum — the SSE event-stream high-water mark
 215     * that's carried across transport swaps within one process. Daemon callers
 216     * pass the value they persisted at shutdown so the FIRST SSE connect of a
 217     * fresh process sends from_sequence_num and the server doesn't replay full
 218     * history. REPL callers omit (fresh session each run → 0 is correct).
 219     */
 220    initialSSESequenceNum?: number
 221  }
 222  
 223  /**
 224   * Superset of ReplBridgeHandle. Adds getSSESequenceNum for daemon callers
 225   * that persist the SSE seq-num across process restarts and pass it back as
 226   * initialSSESequenceNum on the next start.
 227   */
 228  export type BridgeCoreHandle = ReplBridgeHandle & {
 229    /**
 230     * Current SSE sequence-number high-water mark. Updates as transports
 231     * swap. Daemon callers persist this on shutdown and pass it back as
 232     * initialSSESequenceNum on next start.
 233     */
 234    getSSESequenceNum(): number
 235  }
 236  
 237  /**
 238   * Poll error recovery constants. When the work poll starts failing (e.g.
 239   * server 500s), we use exponential backoff and give up after this timeout.
 240   * This is deliberately long — the server is the authority on when a session
 241   * is truly dead. As long as the server accepts our poll, we keep waiting
 242   * for it to re-dispatch the work item.
 243   */
 244  const POLL_ERROR_INITIAL_DELAY_MS = 2_000
 245  const POLL_ERROR_MAX_DELAY_MS = 60_000
 246  const POLL_ERROR_GIVE_UP_MS = 15 * 60 * 1000
 247  
 248  // Monotonically increasing counter for distinguishing init calls in logs
 249  let initSequence = 0
 250  
 251  /**
 252   * Bootstrap-free core: env registration → session creation → poll loop →
 253   * ingress WS → teardown. Reads nothing from bootstrap/state or
 254   * sessionStorage — all context comes from params. Caller (initReplBridge
 255   * below, or a daemon in PR 4) has already passed entitlement gates and
 256   * gathered git/auth/title.
 257   *
 258   * Returns null on registration or session-creation failure.
 259   */
 260  export async function initBridgeCore(
 261    params: BridgeCoreParams,
 262  ): Promise<BridgeCoreHandle | null> {
 263    const {
 264      dir,
 265      machineName,
 266      branch,
 267      gitRepoUrl,
 268      title,
 269      baseUrl,
 270      sessionIngressUrl,
 271      workerType,
 272      getAccessToken,
 273      createSession,
 274      archiveSession,
 275      getCurrentTitle = () => title,
 276      toSDKMessages = () => {
 277        throw new Error(
 278          'BridgeCoreParams.toSDKMessages not provided. Pass it if you use writeMessages() or initialMessages — daemon callers that only use writeSdkMessages() never hit this path.',
 279        )
 280      },
 281      onAuth401,
 282      getPollIntervalConfig = () => DEFAULT_POLL_CONFIG,
 283      initialHistoryCap = 200,
 284      initialMessages,
 285      previouslyFlushedUUIDs,
 286      onInboundMessage,
 287      onPermissionResponse,
 288      onInterrupt,
 289      onSetModel,
 290      onSetMaxThinkingTokens,
 291      onSetPermissionMode,
 292      onStateChange,
 293      onUserMessage,
 294      perpetual,
 295      initialSSESequenceNum = 0,
 296    } = params
 297  
 298    const seq = ++initSequence
 299  
 300    // bridgePointer import hoisted: perpetual mode reads it before register;
 301    // non-perpetual writes it after session create; both use clear at teardown.
 302    const { writeBridgePointer, clearBridgePointer, readBridgePointer } =
 303      await import('./bridgePointer.js')
 304  
 305    // Perpetual mode: read the crash-recovery pointer and treat it as prior
 306    // state. The pointer is written unconditionally after session create
 307    // (crash-recovery for all sessions); perpetual mode just skips the
 308    // teardown clear so it survives clean exits too. Only reuse 'repl'
 309    // pointers — a crashed standalone bridge (`claude remote-control`)
 310    // writes source:'standalone' with a different workerType.
 311    const rawPrior = perpetual ? await readBridgePointer(dir) : null
 312    const prior = rawPrior?.source === 'repl' ? rawPrior : null
 313  
 314    logForDebugging(
 315      `[bridge:repl] initBridgeCore #${seq} starting (initialMessages=${initialMessages?.length ?? 0}${prior ? ` perpetual prior=env:${prior.environmentId}` : ''})`,
 316    )
 317  
 318    // 5. Register bridge environment
 319    const rawApi = createBridgeApiClient({
 320      baseUrl,
 321      getAccessToken,
 322      runnerVersion: MACRO.VERSION,
 323      onDebug: logForDebugging,
 324      onAuth401,
 325      getTrustedDeviceToken,
 326    })
 327    // Ant-only: interpose so /bridge-kick can inject poll/register/heartbeat
 328    // failures. Zero cost in external builds (rawApi passes through unchanged).
 329    const api =
 330      process.env.USER_TYPE === 'ant' ? wrapApiForFaultInjection(rawApi) : rawApi
 331  
 332    const bridgeConfig: BridgeConfig = {
 333      dir,
 334      machineName,
 335      branch,
 336      gitRepoUrl,
 337      maxSessions: 1,
 338      spawnMode: 'single-session',
 339      verbose: false,
 340      sandbox: false,
 341      bridgeId: randomUUID(),
 342      workerType,
 343      environmentId: randomUUID(),
 344      reuseEnvironmentId: prior?.environmentId,
 345      apiBaseUrl: baseUrl,
 346      sessionIngressUrl,
 347    }
 348  
 349    let environmentId: string
 350    let environmentSecret: string
 351    try {
 352      const reg = await api.registerBridgeEnvironment(bridgeConfig)
 353      environmentId = reg.environment_id
 354      environmentSecret = reg.environment_secret
 355    } catch (err) {
 356      logBridgeSkip(
 357        'registration_failed',
 358        `[bridge:repl] Environment registration failed: ${errorMessage(err)}`,
 359      )
 360      // Stale pointer may be the cause (expired/deleted env) — clear it so
 361      // the next start doesn't retry the same dead ID.
 362      if (prior) {
 363        await clearBridgePointer(dir)
 364      }
 365      onStateChange?.('failed', errorMessage(err))
 366      return null
 367    }
 368  
 369    logForDebugging(`[bridge:repl] Environment registered: ${environmentId}`)
 370    logForDiagnosticsNoPII('info', 'bridge_repl_env_registered')
 371    logEvent('tengu_bridge_repl_env_registered', {})
 372  
 373    /**
 374     * Reconnect-in-place: if the just-registered environmentId matches what
 375     * was requested, call reconnectSession to force-stop stale workers and
 376     * re-queue the session. Used at init (perpetual mode — env is alive but
 377     * idle after clean teardown) and in doReconnect() Strategy 1 (env lost
 378     * then resurrected). Returns true on success; caller falls back to
 379     * fresh session creation on false.
 380     */
 381    async function tryReconnectInPlace(
 382      requestedEnvId: string,
 383      sessionId: string,
 384    ): Promise<boolean> {
 385      if (environmentId !== requestedEnvId) {
 386        logForDebugging(
 387          `[bridge:repl] Env mismatch (requested ${requestedEnvId}, got ${environmentId}) — cannot reconnect in place`,
 388        )
 389        return false
 390      }
 391      // The pointer stores what createBridgeSession returned (session_*,
 392      // compat/convert.go:41). /bridge/reconnect is an environments-layer
 393      // endpoint — once the server's ccr_v2_compat_enabled gate is on it
 394      // looks sessions up by their infra tag (cse_*) and returns "Session
 395      // not found" for the session_* costume. We don't know the gate state
 396      // pre-poll, so try both; the re-tag is a no-op if the ID is already
 397      // cse_* (doReconnect Strategy 1 path — currentSessionId never mutates
 398      // to cse_* but future-proof the check).
 399      const infraId = toInfraSessionId(sessionId)
 400      const candidates =
 401        infraId === sessionId ? [sessionId] : [sessionId, infraId]
 402      for (const id of candidates) {
 403        try {
 404          await api.reconnectSession(environmentId, id)
 405          logForDebugging(
 406            `[bridge:repl] Reconnected session ${id} in place on env ${environmentId}`,
 407          )
 408          return true
 409        } catch (err) {
 410          logForDebugging(
 411            `[bridge:repl] reconnectSession(${id}) failed: ${errorMessage(err)}`,
 412          )
 413        }
 414      }
 415      logForDebugging(
 416        '[bridge:repl] reconnectSession exhausted — falling through to fresh session',
 417      )
 418      return false
 419    }
 420  
 421    // Perpetual init: env is alive but has no queued work after clean
 422    // teardown. reconnectSession re-queues it. doReconnect() has the same
 423    // call but only fires on poll 404 (env dead);
 424    // here the env is alive but idle.
 425    const reusedPriorSession = prior
 426      ? await tryReconnectInPlace(prior.environmentId, prior.sessionId)
 427      : false
 428    if (prior && !reusedPriorSession) {
 429      await clearBridgePointer(dir)
 430    }
 431  
 432    // 6. Create session on the bridge. Initial messages are NOT included as
 433    // session creation events because those use STREAM_ONLY persistence and
 434    // are published before the CCR UI subscribes, so they get lost. Instead,
 435    // initial messages are flushed via the ingress WebSocket once it connects.
 436  
 437    // Mutable session ID — updated when the environment+session pair is
 438    // re-created after a connection loss.
 439    let currentSessionId: string
 440  
 441  
 442    if (reusedPriorSession && prior) {
 443      currentSessionId = prior.sessionId
 444      logForDebugging(
 445        `[bridge:repl] Perpetual session reused: ${currentSessionId}`,
 446      )
 447      // Server already has all initialMessages from the prior CLI run. Mark
 448      // them as previously-flushed so the initial flush filter excludes them
 449      // (previouslyFlushedUUIDs is a fresh Set on every CLI start). Duplicate
 450      // UUIDs cause the server to kill the WebSocket.
 451      if (initialMessages && previouslyFlushedUUIDs) {
 452        for (const msg of initialMessages) {
 453          previouslyFlushedUUIDs.add(msg.uuid)
 454        }
 455      }
 456    } else {
 457      const createdSessionId = await createSession({
 458        environmentId,
 459        title,
 460        gitRepoUrl,
 461        branch,
 462        signal: AbortSignal.timeout(15_000),
 463      })
 464  
 465      if (!createdSessionId) {
 466        logForDebugging(
 467          '[bridge:repl] Session creation failed, deregistering environment',
 468        )
 469        logEvent('tengu_bridge_repl_session_failed', {})
 470        await api.deregisterEnvironment(environmentId).catch(() => {})
 471        onStateChange?.('failed', 'Session creation failed')
 472        return null
 473      }
 474  
 475      currentSessionId = createdSessionId
 476      logForDebugging(`[bridge:repl] Session created: ${currentSessionId}`)
 477    }
 478  
 479    // Crash-recovery pointer: written now so a kill -9 at any point after
 480    // this leaves a recoverable trail. Cleared in teardown (non-perpetual)
 481    // or left alone (perpetual mode — pointer survives clean exit too).
 482    // `claude remote-control --continue` from the same directory will detect
 483    // it and offer to resume.
 484    await writeBridgePointer(dir, {
 485      sessionId: currentSessionId,
 486      environmentId,
 487      source: 'repl',
 488    })
 489    logForDiagnosticsNoPII('info', 'bridge_repl_session_created')
 490    logEvent('tengu_bridge_repl_started', {
 491      has_initial_messages: !!(initialMessages && initialMessages.length > 0),
 492      inProtectedNamespace: isInProtectedNamespace(),
 493    })
 494  
 495    // UUIDs of initial messages. Used for dedup in writeMessages to avoid
 496    // re-sending messages that were already flushed on WebSocket open.
 497    const initialMessageUUIDs = new Set<string>()
 498    if (initialMessages) {
 499      for (const msg of initialMessages) {
 500        initialMessageUUIDs.add(msg.uuid)
 501      }
 502    }
 503  
 504    // Bounded ring buffer of UUIDs for messages we've already sent to the
 505    // server via the ingress WebSocket. Serves two purposes:
 506    //  1. Echo filtering — ignore our own messages bouncing back on the WS.
 507    //  2. Secondary dedup in writeMessages — catch race conditions where
 508    //     the hook's index-based tracking isn't sufficient.
 509    //
 510    // Seeded with initialMessageUUIDs so that when the server echoes back
 511    // the initial conversation context over the ingress WebSocket, those
 512    // messages are recognized as echoes and not re-injected into the REPL.
 513    //
 514    // Capacity of 2000 covers well over any realistic echo window (echoes
 515    // arrive within milliseconds) and any messages that might be re-encountered
 516    // after compaction. The hook's lastWrittenIndexRef is the primary dedup;
 517    // this is a safety net.
 518    const recentPostedUUIDs = new BoundedUUIDSet(2000)
 519    for (const uuid of initialMessageUUIDs) {
 520      recentPostedUUIDs.add(uuid)
 521    }
 522  
 523    // Bounded set of INBOUND prompt UUIDs we've already forwarded to the REPL.
 524    // Defensive dedup for when the server re-delivers prompts (seq-num
 525    // negotiation failure, server edge cases, transport swap races). The
 526    // seq-num carryover below is the primary fix; this is the safety net.
 527    const recentInboundUUIDs = new BoundedUUIDSet(2000)
 528  
 529    // 7. Start poll loop for work items — this is what makes the session
 530    // "live" on claude.ai. When a user types there, the backend dispatches
 531    // a work item to our environment. We poll for it, get the ingress token,
 532    // and connect the ingress WebSocket.
 533    //
 534    // The poll loop keeps running: when work arrives it connects the ingress
 535    // WebSocket, and if the WebSocket drops unexpectedly (code != 1000) it
 536    // resumes polling to get a fresh ingress token and reconnect.
 537    const pollController = new AbortController()
 538    // Adapter over either HybridTransport (v1: WS reads + POST writes to
 539    // Session-Ingress) or SSETransport+CCRClient (v2: SSE reads + POST
 540    // writes to CCR /worker/*). The v1/v2 choice is made in onWorkReceived:
 541    // server-driven via secret.use_code_sessions, with CLAUDE_BRIDGE_USE_CCR_V2
 542    // as an ant-dev override.
 543    let transport: ReplBridgeTransport | null = null
 544    // Bumped on every onWorkReceived. Captured in createV2ReplTransport's .then()
 545    // closure to detect stale resolutions: if two calls race while transport is
 546    // null, both registerWorker() (bumping server epoch), and whichever resolves
 547    // SECOND is the correct one — but the transport !== null check gets this
 548    // backwards (first-to-resolve installs, second discards). The generation
 549    // counter catches it independent of transport state.
 550    let v2Generation = 0
 551    // SSE sequence-number high-water mark carried across transport swaps.
 552    // Without this, each new SSETransport starts at 0, sends no
 553    // from_sequence_num / Last-Event-ID on its first connect, and the server
 554    // replays the entire session event history — every prompt ever sent
 555    // re-delivered as fresh inbound messages on every onWorkReceived.
 556    //
 557    // Seed only when we actually reconnected the prior session. If
 558    // `reusedPriorSession` is false we fell through to `createSession()` —
 559    // the caller's persisted seq-num belongs to a dead session and applying
 560    // it to the fresh stream (starting at 1) silently drops events. Same
 561    // hazard as doReconnect Strategy 2; same fix as the reset there.
 562    let lastTransportSequenceNum = reusedPriorSession ? initialSSESequenceNum : 0
 563    // Track the current work ID so teardown can call stopWork
 564    let currentWorkId: string | null = null
 565    // Session ingress JWT for the current work item — used for heartbeat auth.
 566    let currentIngressToken: string | null = null
 567    // Signal to wake the at-capacity sleep early when the transport is lost,
 568    // so the poll loop immediately switches back to fast polling for new work.
 569    const capacityWake = createCapacityWake(pollController.signal)
 570    const wakePollLoop = capacityWake.wake
 571    const capacitySignal = capacityWake.signal
 572    // Gates message writes during the initial flush to prevent ordering
 573    // races where new messages arrive at the server interleaved with history.
 574    const flushGate = new FlushGate<Message>()
 575  
 576    // Latch for onUserMessage — flips true when the callback returns true
 577    // (policy says "done deriving"). If no callback, skip scanning entirely
 578    // (daemon path — no title derivation needed).
 579    let userMessageCallbackDone = !onUserMessage
 580  
 581    // Shared counter for environment re-creations, used by both
 582    // onEnvironmentLost and the abnormal-close handler.
 583    const MAX_ENVIRONMENT_RECREATIONS = 3
 584    let environmentRecreations = 0
 585    let reconnectPromise: Promise<boolean> | null = null
 586  
 587    /**
 588     * Recover from onEnvironmentLost (poll returned 404 — env was reaped
 589     * server-side). Tries two strategies in order:
 590     *
 591     *   1. Reconnect-in-place: idempotent re-register with reuseEnvironmentId
 592     *      → if the backend returns the same env ID, call reconnectSession()
 593     *      to re-queue the existing session. currentSessionId stays the same;
 594     *      the URL on the user's phone stays valid; previouslyFlushedUUIDs is
 595     *      preserved so history isn't re-sent.
 596     *
 597     *   2. Fresh session fallback: if the backend returns a different env ID
 598     *      (original TTL-expired, e.g. laptop slept >4h) or reconnectSession()
 599     *      throws, archive the old session and create a new one on the
 600     *      now-registered env. Old behavior before #20460 primitives landed.
 601     *
 602     * Uses a promise-based reentrancy guard so concurrent callers share the
 603     * same reconnection attempt.
 604     */
 605    async function reconnectEnvironmentWithSession(): Promise<boolean> {
 606      if (reconnectPromise) {
 607        return reconnectPromise
 608      }
 609      reconnectPromise = doReconnect()
 610      try {
 611        return await reconnectPromise
 612      } finally {
 613        reconnectPromise = null
 614      }
 615    }
 616  
 617    async function doReconnect(): Promise<boolean> {
 618      environmentRecreations++
 619      // Invalidate any in-flight v2 handshake — the environment is being
 620      // recreated, so a stale transport arriving post-reconnect would be
 621      // pointed at a dead session.
 622      v2Generation++
 623      logForDebugging(
 624        `[bridge:repl] Reconnecting after env lost (attempt ${environmentRecreations}/${MAX_ENVIRONMENT_RECREATIONS})`,
 625      )
 626  
 627      if (environmentRecreations > MAX_ENVIRONMENT_RECREATIONS) {
 628        logForDebugging(
 629          `[bridge:repl] Environment reconnect limit reached (${MAX_ENVIRONMENT_RECREATIONS}), giving up`,
 630        )
 631        return false
 632      }
 633  
 634      // Close the stale transport. Capture seq BEFORE close — if Strategy 1
 635      // (tryReconnectInPlace) succeeds we keep the SAME session, and the
 636      // next transport must resume where this one left off, not replay from
 637      // the last transport-swap checkpoint.
 638      if (transport) {
 639        const seq = transport.getLastSequenceNum()
 640        if (seq > lastTransportSequenceNum) {
 641          lastTransportSequenceNum = seq
 642        }
 643        transport.close()
 644        transport = null
 645      }
 646      // Transport is gone — wake the poll loop out of its at-capacity
 647      // heartbeat sleep so it can fast-poll for re-dispatched work.
 648      wakePollLoop()
 649      // Reset flush gate so writeMessages() hits the !transport guard
 650      // instead of silently queuing into a dead buffer.
 651      flushGate.drop()
 652  
 653      // Release the current work item (force=false — we may want the session
 654      // back). Best-effort: the env is probably gone, so this likely 404s.
 655      if (currentWorkId) {
 656        const workIdBeingCleared = currentWorkId
 657        await api
 658          .stopWork(environmentId, workIdBeingCleared, false)
 659          .catch(() => {})
 660        // When doReconnect runs concurrently with the poll loop (ws_closed
 661        // handler case — void-called, unlike the awaited onEnvironmentLost
 662        // path), onWorkReceived can fire during the stopWork await and set
 663        // a fresh currentWorkId. If it did, the poll loop has already
 664        // recovered on its own — defer to it rather than proceeding to
 665        // archiveSession, which would destroy the session its new
 666        // transport is connected to.
 667        if (currentWorkId !== workIdBeingCleared) {
 668          logForDebugging(
 669            '[bridge:repl] Poll loop recovered during stopWork await — deferring to it',
 670          )
 671          environmentRecreations = 0
 672          return true
 673        }
 674        currentWorkId = null
 675        currentIngressToken = null
 676      }
 677  
 678      // Bail out if teardown started while we were awaiting
 679      if (pollController.signal.aborted) {
 680        logForDebugging('[bridge:repl] Reconnect aborted by teardown')
 681        return false
 682      }
 683  
 684      // Strategy 1: idempotent re-register with the server-issued env ID.
 685      // If the backend resurrects the same env (fresh secret), we can
 686      // reconnect the existing session. If it hands back a different ID, the
 687      // original env is truly gone and we fall through to a fresh session.
 688      const requestedEnvId = environmentId
 689      bridgeConfig.reuseEnvironmentId = requestedEnvId
 690      try {
 691        const reg = await api.registerBridgeEnvironment(bridgeConfig)
 692        environmentId = reg.environment_id
 693        environmentSecret = reg.environment_secret
 694      } catch (err) {
 695        bridgeConfig.reuseEnvironmentId = undefined
 696        logForDebugging(
 697          `[bridge:repl] Environment re-registration failed: ${errorMessage(err)}`,
 698        )
 699        return false
 700      }
 701      // Clear before any await — a stale value would poison the next fresh
 702      // registration if doReconnect runs again.
 703      bridgeConfig.reuseEnvironmentId = undefined
 704  
 705      logForDebugging(
 706        `[bridge:repl] Re-registered: requested=${requestedEnvId} got=${environmentId}`,
 707      )
 708  
 709      // Bail out if teardown started while we were registering
 710      if (pollController.signal.aborted) {
 711        logForDebugging(
 712          '[bridge:repl] Reconnect aborted after env registration, cleaning up',
 713        )
 714        await api.deregisterEnvironment(environmentId).catch(() => {})
 715        return false
 716      }
 717  
 718      // Same race as above, narrower window: poll loop may have set up a
 719      // transport during the registerBridgeEnvironment await. Bail before
 720      // tryReconnectInPlace/archiveSession kill it server-side.
 721      if (transport !== null) {
 722        logForDebugging(
 723          '[bridge:repl] Poll loop recovered during registerBridgeEnvironment await — deferring to it',
 724        )
 725        environmentRecreations = 0
 726        return true
 727      }
 728  
 729      // Strategy 1: same helper as perpetual init. currentSessionId stays
 730      // the same on success; URL on mobile/web stays valid;
 731      // previouslyFlushedUUIDs preserved (no re-flush).
 732      if (await tryReconnectInPlace(requestedEnvId, currentSessionId)) {
 733        logEvent('tengu_bridge_repl_reconnected_in_place', {})
 734        environmentRecreations = 0
 735        return true
 736      }
 737      // Env differs → TTL-expired/reaped; or reconnect failed.
 738      // Don't deregister — we have a fresh secret for this env either way.
 739      if (environmentId !== requestedEnvId) {
 740        logEvent('tengu_bridge_repl_env_expired_fresh_session', {})
 741      }
 742  
 743      // Strategy 2: fresh session on the now-registered environment.
 744      // Archive the old session first — it's orphaned (bound to a dead env,
 745      // or reconnectSession rejected it). Don't deregister the env — we just
 746      // got a fresh secret for it and are about to use it.
 747      await archiveSession(currentSessionId)
 748  
 749      // Bail out if teardown started while we were archiving
 750      if (pollController.signal.aborted) {
 751        logForDebugging(
 752          '[bridge:repl] Reconnect aborted after archive, cleaning up',
 753        )
 754        await api.deregisterEnvironment(environmentId).catch(() => {})
 755        return false
 756      }
 757  
 758      // Re-read the current title in case the user renamed the session.
 759      // REPL wrapper reads session storage; daemon wrapper returns the
 760      // original title (nothing to refresh).
 761      const currentTitle = getCurrentTitle()
 762  
 763      // Create a new session on the now-registered environment
 764      const newSessionId = await createSession({
 765        environmentId,
 766        title: currentTitle,
 767        gitRepoUrl,
 768        branch,
 769        signal: AbortSignal.timeout(15_000),
 770      })
 771  
 772      if (!newSessionId) {
 773        logForDebugging(
 774          '[bridge:repl] Session creation failed during reconnection',
 775        )
 776        return false
 777      }
 778  
 779      // Bail out if teardown started during session creation (up to 15s)
 780      if (pollController.signal.aborted) {
 781        logForDebugging(
 782          '[bridge:repl] Reconnect aborted after session creation, cleaning up',
 783        )
 784        await archiveSession(newSessionId)
 785        return false
 786      }
 787  
 788      currentSessionId = newSessionId
 789      // Re-publish to the PID file so peer dedup (peerRegistry.ts) picks up the
 790      // new ID — setReplBridgeHandle only fires at init/teardown, not reconnect.
 791      void updateSessionBridgeId(toCompatSessionId(newSessionId)).catch(() => {})
 792      // Reset per-session transport state IMMEDIATELY after the session swap,
 793      // before any await. If this runs after `await writeBridgePointer` below,
 794      // there's a window where handle.bridgeSessionId already returns session B
 795      // but getSSESequenceNum() still returns session A's seq — a daemon
 796      // persistState() in that window writes {bridgeSessionId: B, seq: OLD_A},
 797      // which PASSES the session-ID validation check and defeats it entirely.
 798      //
 799      // The SSE seq-num is scoped to the session's event stream — carrying it
 800      // over leaves the transport's lastSequenceNum stuck high (seq only
 801      // advances when received > last), and its next internal reconnect would
 802      // send from_sequence_num=OLD_SEQ against a stream starting at 1 → all
 803      // events in the gap silently dropped. Inbound UUID dedup is also
 804      // session-scoped.
 805      lastTransportSequenceNum = 0
 806      recentInboundUUIDs.clear()
 807      // Title derivation is session-scoped too: if the user typed during the
 808      // createSession await above, the callback fired against the OLD archived
 809      // session ID (PATCH lost) and the new session got `currentTitle` captured
 810      // BEFORE they typed. Reset so the next prompt can re-derive. Self-
 811      // correcting: if the caller's policy is already done (explicit title or
 812      // count ≥ 3), it returns true on the first post-reset call and re-latches.
 813      userMessageCallbackDone = !onUserMessage
 814      logForDebugging(`[bridge:repl] Re-created session: ${currentSessionId}`)
 815  
 816      // Rewrite the crash-recovery pointer with the new IDs so a crash after
 817      // this point resumes the right session. (The reconnect-in-place path
 818      // above doesn't touch the pointer — same session, same env.)
 819      await writeBridgePointer(dir, {
 820        sessionId: currentSessionId,
 821        environmentId,
 822        source: 'repl',
 823      })
 824  
 825      // Clear flushed UUIDs so initial messages are re-sent to the new session.
 826      // UUIDs are scoped per-session on the server, so re-flushing is safe.
 827      previouslyFlushedUUIDs?.clear()
 828  
 829  
 830      // Reset the counter so independent reconnections hours apart don't
 831      // exhaust the limit — it guards against rapid consecutive failures,
 832      // not lifetime total.
 833      environmentRecreations = 0
 834  
 835      return true
 836    }
 837  
 838    // Helper: get the current OAuth access token for session ingress auth.
 839    // Unlike the JWT path, OAuth tokens are refreshed by the standard OAuth
 840    // flow — no proactive scheduler needed.
 841    function getOAuthToken(): string | undefined {
 842      return getAccessToken()
 843    }
 844  
 845    // Drain any messages that were queued during the initial flush.
 846    // Called after writeBatch completes (or fails) so queued messages
 847    // are sent in order after the historical messages.
 848    function drainFlushGate(): void {
 849      const msgs = flushGate.end()
 850      if (msgs.length === 0) return
 851      if (!transport) {
 852        logForDebugging(
 853          `[bridge:repl] Cannot drain ${msgs.length} pending message(s): no transport`,
 854        )
 855        return
 856      }
 857      for (const msg of msgs) {
 858        recentPostedUUIDs.add(msg.uuid)
 859      }
 860      const sdkMessages = toSDKMessages(msgs)
 861      const events = sdkMessages.map(sdkMsg => ({
 862        ...sdkMsg,
 863        session_id: currentSessionId,
 864      }))
 865      logForDebugging(
 866        `[bridge:repl] Drained ${msgs.length} pending message(s) after flush`,
 867      )
 868      void transport.writeBatch(events)
 869    }
 870  
 871    // Teardown reference — set after definition below. All callers are async
 872    // callbacks that run after assignment, so the reference is always valid.
 873    let doTeardownImpl: (() => Promise<void>) | null = null
 874    function triggerTeardown(): void {
 875      void doTeardownImpl?.()
 876    }
 877  
 878    /**
 879     * Body of the transport's setOnClose callback, hoisted to initBridgeCore
 880     * scope so /bridge-kick can fire it directly. setOnClose wraps this with
 881     * a stale-transport guard; debugFireClose calls it bare.
 882     *
 883     * With autoReconnect:true, this only fires on: clean close (1000),
 884     * permanent server rejection (4001/1002/4003), or 10-min budget
 885     * exhaustion. Transient drops are retried internally by the transport.
 886     */
 887    function handleTransportPermanentClose(closeCode: number | undefined): void {
 888      logForDebugging(
 889        `[bridge:repl] Transport permanently closed: code=${closeCode}`,
 890      )
 891      logEvent('tengu_bridge_repl_ws_closed', {
 892        code: closeCode,
 893      })
 894      // Capture SSE seq high-water mark before nulling. When called from
 895      // setOnClose the guard guarantees transport !== null; when fired from
 896      // /bridge-kick it may already be null (e.g. fired twice) — skip.
 897      if (transport) {
 898        const closedSeq = transport.getLastSequenceNum()
 899        if (closedSeq > lastTransportSequenceNum) {
 900          lastTransportSequenceNum = closedSeq
 901        }
 902        transport = null
 903      }
 904      // Transport is gone — wake the poll loop out of its at-capacity
 905      // heartbeat sleep so it's fast-polling by the time the reconnect
 906      // below completes and the server re-queues work.
 907      wakePollLoop()
 908      // Reset flush state so writeMessages() hits the !transport guard
 909      // (with a warning log) instead of silently queuing into a buffer
 910      // that will never be drained. Unlike onWorkReceived (which
 911      // preserves pending messages for the new transport), onClose is
 912      // a permanent close — no new transport will drain these.
 913      const dropped = flushGate.drop()
 914      if (dropped > 0) {
 915        logForDebugging(
 916          `[bridge:repl] Dropping ${dropped} pending message(s) on transport close (code=${closeCode})`,
 917          { level: 'warn' },
 918        )
 919      }
 920  
 921      if (closeCode === 1000) {
 922        // Clean close — session ended normally. Tear down the bridge.
 923        onStateChange?.('failed', 'session ended')
 924        pollController.abort()
 925        triggerTeardown()
 926        return
 927      }
 928  
 929      // Transport reconnect budget exhausted or permanent server
 930      // rejection. By this point the env has usually been reaped
 931      // server-side (BQ 2026-03-12: ~98% of ws_closed never recover
 932      // via poll alone). stopWork(force=false) can't re-dispatch work
 933      // from an archived env; reconnectEnvironmentWithSession can
 934      // re-activate it via POST /bridge/reconnect, or fall through
 935      // to a fresh session if the env is truly gone. The poll loop
 936      // (already woken above) picks up the re-queued work once
 937      // doReconnect completes.
 938      onStateChange?.(
 939        'reconnecting',
 940        `Remote Control connection lost (code ${closeCode})`,
 941      )
 942      logForDebugging(
 943        `[bridge:repl] Transport reconnect budget exhausted (code=${closeCode}), attempting env reconnect`,
 944      )
 945      void reconnectEnvironmentWithSession().then(success => {
 946        if (success) return
 947        // doReconnect has four abort-check return-false sites for
 948        // teardown-in-progress. Don't pollute the BQ failure signal
 949        // or double-teardown when the user just quit.
 950        if (pollController.signal.aborted) return
 951        // doReconnect returns false (never throws) on genuine failure.
 952        // The dangerous case: registerBridgeEnvironment succeeded (so
 953        // environmentId now points at a fresh valid env) but
 954        // createSession failed — poll loop would poll a sessionless
 955        // env getting null work with no errors, never hitting any
 956        // give-up path. Tear down explicitly.
 957        logForDebugging(
 958          '[bridge:repl] reconnectEnvironmentWithSession resolved false — tearing down',
 959        )
 960        logEvent('tengu_bridge_repl_reconnect_failed', {
 961          close_code: closeCode,
 962        })
 963        onStateChange?.('failed', 'reconnection failed')
 964        triggerTeardown()
 965      })
 966    }
 967  
 968    // Ant-only: SIGUSR2 → force doReconnect() for manual testing. Skips the
 969    // ~30s poll wait — fire-and-observe in the debug log immediately.
 970    // Windows has no USR signals; `process.on` would throw there.
 971    let sigusr2Handler: (() => void) | undefined
 972    if (process.env.USER_TYPE === 'ant' && process.platform !== 'win32') {
 973      sigusr2Handler = () => {
 974        logForDebugging(
 975          '[bridge:repl] SIGUSR2 received — forcing doReconnect() for testing',
 976        )
 977        void reconnectEnvironmentWithSession()
 978      }
 979      process.on('SIGUSR2', sigusr2Handler)
 980    }
 981  
 982    // Ant-only: /bridge-kick fault injection. handleTransportPermanentClose
 983    // is defined below and assigned into this slot so the slash command can
 984    // invoke it directly — the real setOnClose callback is buried inside
 985    // wireTransport which is itself inside onWorkReceived.
 986    let debugFireClose: ((code: number) => void) | null = null
 987    if (process.env.USER_TYPE === 'ant') {
 988      registerBridgeDebugHandle({
 989        fireClose: code => {
 990          if (!debugFireClose) {
 991            logForDebugging('[bridge:debug] fireClose: no transport wired yet')
 992            return
 993          }
 994          logForDebugging(`[bridge:debug] fireClose(${code}) — injecting`)
 995          debugFireClose(code)
 996        },
 997        forceReconnect: () => {
 998          logForDebugging('[bridge:debug] forceReconnect — injecting')
 999          void reconnectEnvironmentWithSession()
1000        },
1001        injectFault: injectBridgeFault,
1002        wakePollLoop,
1003        describe: () =>
1004          `env=${environmentId} session=${currentSessionId} transport=${transport?.getStateLabel() ?? 'null'} workId=${currentWorkId ?? 'null'}`,
1005      })
1006    }
1007  
1008    const pollOpts = {
1009      api,
1010      getCredentials: () => ({ environmentId, environmentSecret }),
1011      signal: pollController.signal,
1012      getPollIntervalConfig,
1013      onStateChange,
1014      getWsState: () => transport?.getStateLabel() ?? 'null',
1015      // REPL bridge is single-session: having any transport == at capacity.
1016      // No need to check isConnectedStatus() — even while the transport is
1017      // auto-reconnecting internally (up to 10 min), poll is heartbeat-only.
1018      isAtCapacity: () => transport !== null,
1019      capacitySignal,
1020      onFatalError: triggerTeardown,
1021      getHeartbeatInfo: () => {
1022        if (!currentWorkId || !currentIngressToken) {
1023          return null
1024        }
1025        return {
1026          environmentId,
1027          workId: currentWorkId,
1028          sessionToken: currentIngressToken,
1029        }
1030      },
1031      // Work-item JWT expired (or work gone). The transport is useless —
1032      // SSE reconnects and CCR writes use the same stale token. Without
1033      // this callback the poll loop would do a 10-min at-capacity backoff,
1034      // during which the work lease (300s TTL) expires and the server stops
1035      // forwarding prompts → ~25-min dead window observed in daemon logs.
1036      // Kill the transport + work state so isAtCapacity()=false; the loop
1037      // fast-polls and picks up the server's re-dispatched work in seconds.
1038      onHeartbeatFatal: (err: BridgeFatalError) => {
1039        logForDebugging(
1040          `[bridge:repl] heartbeatWork fatal (status=${err.status}) — tearing down work item for fast re-dispatch`,
1041        )
1042        if (transport) {
1043          const seq = transport.getLastSequenceNum()
1044          if (seq > lastTransportSequenceNum) {
1045            lastTransportSequenceNum = seq
1046          }
1047          transport.close()
1048          transport = null
1049        }
1050        flushGate.drop()
1051        // force=false → server re-queues. Likely already expired, but
1052        // idempotent and makes re-dispatch immediate if not.
1053        if (currentWorkId) {
1054          void api
1055            .stopWork(environmentId, currentWorkId, false)
1056            .catch((e: unknown) => {
1057              logForDebugging(
1058                `[bridge:repl] stopWork after heartbeat fatal: ${errorMessage(e)}`,
1059              )
1060            })
1061        }
1062        currentWorkId = null
1063        currentIngressToken = null
1064        wakePollLoop()
1065        onStateChange?.(
1066          'reconnecting',
1067          'Work item lease expired, fetching fresh token',
1068        )
1069      },
1070      async onEnvironmentLost() {
1071        const success = await reconnectEnvironmentWithSession()
1072        if (!success) {
1073          return null
1074        }
1075        return { environmentId, environmentSecret }
1076      },
1077      onWorkReceived: (
1078        workSessionId: string,
1079        ingressToken: string,
1080        workId: string,
1081        serverUseCcrV2: boolean,
1082      ) => {
1083        // When new work arrives while a transport is already open, the
1084        // server has decided to re-dispatch (e.g. token rotation, server
1085        // restart). Close the existing transport and reconnect — discarding
1086        // the work causes a stuck 'reconnecting' state if the old WS dies
1087        // shortly after (the server won't re-dispatch a work item it
1088        // already delivered).
1089        // ingressToken (JWT) is stored for heartbeat auth (both v1 and v2).
1090        // Transport auth diverges — see the v1/v2 split below.
1091        if (transport?.isConnectedStatus()) {
1092          logForDebugging(
1093            `[bridge:repl] Work received while transport connected, replacing with fresh token (workId=${workId})`,
1094          )
1095        }
1096  
1097        logForDebugging(
1098          `[bridge:repl] Work received: workId=${workId} workSessionId=${workSessionId} currentSessionId=${currentSessionId} match=${sameSessionId(workSessionId, currentSessionId)}`,
1099        )
1100  
1101        // Refresh the crash-recovery pointer's mtime. Staleness checks file
1102        // mtime (not embedded timestamp) so this re-write bumps the clock —
1103        // a 5h+ session that crashes still has a fresh pointer. Fires once
1104        // per work dispatch (infrequent — bounded by user message rate).
1105        void writeBridgePointer(dir, {
1106          sessionId: currentSessionId,
1107          environmentId,
1108          source: 'repl',
1109        })
1110  
1111        // Reject foreign session IDs — the server shouldn't assign sessions
1112        // from other environments. Since we create env+session as a pair,
1113        // a mismatch indicates an unexpected server-side reassignment.
1114        //
1115        // Compare by underlying UUID, not by tagged-ID prefix. When CCR
1116        // v2's compat layer serves the session, createBridgeSession gets
1117        // session_* from the v1-facing API (compat/convert.go:41) but the
1118        // infrastructure layer delivers cse_* in the work queue
1119        // (container_manager.go:129). Same UUID, different tag.
1120        if (!sameSessionId(workSessionId, currentSessionId)) {
1121          logForDebugging(
1122            `[bridge:repl] Rejecting foreign session: expected=${currentSessionId} got=${workSessionId}`,
1123          )
1124          return
1125        }
1126  
1127        currentWorkId = workId
1128        currentIngressToken = ingressToken
1129  
1130        // Server decides per-session (secret.use_code_sessions from the work
1131        // secret, threaded through runWorkPollLoop). The env var is an ant-dev
1132        // override for forcing v2 before the server flag is on for your user —
1133        // requires ccr_v2_compat_enabled server-side or registerWorker 404s.
1134        //
1135        // Kept separate from CLAUDE_CODE_USE_CCR_V2 (the child-SDK transport
1136        // selector set by sessionRunner/environment-manager) to avoid the
1137        // inheritance hazard in spawn mode where the parent's orchestrator
1138        // var would leak into a v1 child.
1139        const useCcrV2 =
1140          serverUseCcrV2 || isEnvTruthy(process.env.CLAUDE_BRIDGE_USE_CCR_V2)
1141  
1142        // Auth is the one place v1 and v2 diverge hard:
1143        //
1144        // - v1 (Session-Ingress): accepts OAuth OR JWT. We prefer OAuth
1145        //   because the standard OAuth refresh flow handles expiry — no
1146        //   separate JWT refresh scheduler needed.
1147        //
1148        // - v2 (CCR /worker/*): REQUIRES the JWT. register_worker.go:32
1149        //   validates the session_id claim, which OAuth tokens don't carry.
1150        //   The JWT from the work secret has both that claim and the worker
1151        //   role (environment_auth.py:856). JWT refresh: when it expires the
1152        //   server re-dispatches work with a fresh one, and onWorkReceived
1153        //   fires again. createV2ReplTransport stores it via
1154        //   updateSessionIngressAuthToken() before touching the network.
1155        let v1OauthToken: string | undefined
1156        if (!useCcrV2) {
1157          v1OauthToken = getOAuthToken()
1158          if (!v1OauthToken) {
1159            logForDebugging(
1160              '[bridge:repl] No OAuth token available for session ingress, skipping work',
1161            )
1162            return
1163          }
1164          updateSessionIngressAuthToken(v1OauthToken)
1165        }
1166        logEvent('tengu_bridge_repl_work_received', {})
1167  
1168        // Close the previous transport. Nullify BEFORE calling close() so
1169        // the close callback doesn't treat the programmatic close as
1170        // "session ended normally" and trigger a full teardown.
1171        if (transport) {
1172          const oldTransport = transport
1173          transport = null
1174          // Capture the SSE sequence high-water mark so the next transport
1175          // resumes the stream instead of replaying from seq 0. Use max() —
1176          // a transport that died early (never received any frames) would
1177          // otherwise reset a non-zero mark back to 0.
1178          const oldSeq = oldTransport.getLastSequenceNum()
1179          if (oldSeq > lastTransportSequenceNum) {
1180            lastTransportSequenceNum = oldSeq
1181          }
1182          oldTransport.close()
1183        }
1184        // Reset flush state — the old flush (if any) is no longer relevant.
1185        // Preserve pending messages so they're drained after the new
1186        // transport's flush completes (the hook has already advanced its
1187        // lastWrittenIndex and won't re-send them).
1188        flushGate.deactivate()
1189  
1190        // Closure adapter over the shared handleServerControlRequest —
1191        // captures transport/currentSessionId so the transport.setOnData
1192        // callback below doesn't need to thread them through.
1193        const onServerControlRequest = (request: SDKControlRequest): void =>
1194          handleServerControlRequest(request, {
1195            transport,
1196            sessionId: currentSessionId,
1197            onInterrupt,
1198            onSetModel,
1199            onSetMaxThinkingTokens,
1200            onSetPermissionMode,
1201          })
1202  
1203        let initialFlushDone = false
1204  
1205        // Wire callbacks onto a freshly constructed transport and connect.
1206        // Extracted so the (sync) v1 and (async) v2 construction paths can
1207        // share the identical callback + flush machinery.
1208        const wireTransport = (newTransport: ReplBridgeTransport): void => {
1209          transport = newTransport
1210  
1211          newTransport.setOnConnect(() => {
1212            // Guard: if transport was replaced by a newer onWorkReceived call
1213            // while the WS was connecting, ignore this stale callback.
1214            if (transport !== newTransport) return
1215  
1216            logForDebugging('[bridge:repl] Ingress transport connected')
1217            logEvent('tengu_bridge_repl_ws_connected', {})
1218  
1219            // Update the env var with the latest OAuth token so POST writes
1220            // (which read via getSessionIngressAuthToken()) use a fresh token.
1221            // v2 skips this — createV2ReplTransport already stored the JWT,
1222            // and overwriting it with OAuth would break subsequent /worker/*
1223            // requests (session_id claim check).
1224            if (!useCcrV2) {
1225              const freshToken = getOAuthToken()
1226              if (freshToken) {
1227                updateSessionIngressAuthToken(freshToken)
1228              }
1229            }
1230  
1231            // Reset teardownStarted so future teardowns are not blocked.
1232            teardownStarted = false
1233  
1234            // Flush initial messages only on first connect, not on every
1235            // WS reconnection. Re-flushing would cause duplicate messages.
1236            // IMPORTANT: onStateChange('connected') is deferred until the
1237            // flush completes. This prevents writeMessages() from sending
1238            // new messages that could arrive at the server interleaved with
1239            // the historical messages, and delays the web UI from showing
1240            // the session as active until history is persisted.
1241            if (
1242              !initialFlushDone &&
1243              initialMessages &&
1244              initialMessages.length > 0
1245            ) {
1246              initialFlushDone = true
1247  
1248              // Cap the initial flush to the most recent N messages. The full
1249              // history is UI-only (model doesn't see it) and large replays cause
1250              // slow session-ingress persistence (each event is a threadstore write)
1251              // plus elevated Firestore pressure. A 0 or negative cap disables it.
1252              const historyCap = initialHistoryCap
1253              const eligibleMessages = initialMessages.filter(
1254                m =>
1255                  isEligibleBridgeMessage(m) &&
1256                  !previouslyFlushedUUIDs?.has(m.uuid),
1257              )
1258              const cappedMessages =
1259                historyCap > 0 && eligibleMessages.length > historyCap
1260                  ? eligibleMessages.slice(-historyCap)
1261                  : eligibleMessages
1262              if (cappedMessages.length < eligibleMessages.length) {
1263                logForDebugging(
1264                  `[bridge:repl] Capped initial flush: ${eligibleMessages.length} -> ${cappedMessages.length} (cap=${historyCap})`,
1265                )
1266                logEvent('tengu_bridge_repl_history_capped', {
1267                  eligible_count: eligibleMessages.length,
1268                  capped_count: cappedMessages.length,
1269                })
1270              }
1271              const sdkMessages = toSDKMessages(cappedMessages)
1272              if (sdkMessages.length > 0) {
1273                logForDebugging(
1274                  `[bridge:repl] Flushing ${sdkMessages.length} initial message(s) via transport`,
1275                )
1276                const events = sdkMessages.map(sdkMsg => ({
1277                  ...sdkMsg,
1278                  session_id: currentSessionId,
1279                }))
1280                const dropsBefore = newTransport.droppedBatchCount
1281                void newTransport
1282                  .writeBatch(events)
1283                  .then(() => {
1284                    // If any batch was dropped during this flush (SI down for
1285                    // maxConsecutiveFailures attempts), flush() still resolved
1286                    // normally but the events were NOT delivered. Don't mark
1287                    // UUIDs as flushed — keep them eligible for re-send on the
1288                    // next onWorkReceived (JWT refresh re-dispatch, line ~1144).
1289                    if (newTransport.droppedBatchCount > dropsBefore) {
1290                      logForDebugging(
1291                        `[bridge:repl] Initial flush dropped ${newTransport.droppedBatchCount - dropsBefore} batch(es) — not marking ${sdkMessages.length} UUID(s) as flushed`,
1292                      )
1293                      return
1294                    }
1295                    if (previouslyFlushedUUIDs) {
1296                      for (const sdkMsg of sdkMessages) {
1297                        if (sdkMsg.uuid) {
1298                          previouslyFlushedUUIDs.add(sdkMsg.uuid)
1299                        }
1300                      }
1301                    }
1302                  })
1303                  .catch(e =>
1304                    logForDebugging(`[bridge:repl] Initial flush failed: ${e}`),
1305                  )
1306                  .finally(() => {
1307                    // Guard: if transport was replaced during the flush,
1308                    // don't signal connected or drain — the new transport
1309                    // owns the lifecycle now.
1310                    if (transport !== newTransport) return
1311                    drainFlushGate()
1312                    onStateChange?.('connected')
1313                  })
1314              } else {
1315                // All initial messages were already flushed (filtered by
1316                // previouslyFlushedUUIDs). No flush POST needed — clear
1317                // the flag and signal connected immediately. This is the
1318                // first connect for this transport (inside !initialFlushDone),
1319                // so no flush POST is in-flight — the flag was set before
1320                // connect() and must be cleared here.
1321                drainFlushGate()
1322                onStateChange?.('connected')
1323              }
1324            } else if (!flushGate.active) {
1325              // No initial messages or already flushed on first connect.
1326              // WS auto-reconnect path — only signal connected if no flush
1327              // POST is in-flight. If one is, .finally() owns the lifecycle.
1328              onStateChange?.('connected')
1329            }
1330          })
1331  
1332          newTransport.setOnData(data => {
1333            handleIngressMessage(
1334              data,
1335              recentPostedUUIDs,
1336              recentInboundUUIDs,
1337              onInboundMessage,
1338              onPermissionResponse,
1339              onServerControlRequest,
1340            )
1341          })
1342  
1343          // Body lives at initBridgeCore scope so /bridge-kick can call it
1344          // directly via debugFireClose. All referenced closures (transport,
1345          // wakePollLoop, flushGate, reconnectEnvironmentWithSession, etc.)
1346          // are already at that scope. The only lexical dependency on
1347          // wireTransport was `newTransport.getLastSequenceNum()` — but after
1348          // the guard below passes we know transport === newTransport.
1349          debugFireClose = handleTransportPermanentClose
1350          newTransport.setOnClose(closeCode => {
1351            // Guard: if transport was replaced, ignore stale close.
1352            if (transport !== newTransport) return
1353            handleTransportPermanentClose(closeCode)
1354          })
1355  
1356          // Start the flush gate before connect() to cover the WS handshake
1357          // window. Between transport assignment and setOnConnect firing,
1358          // writeMessages() could send messages via HTTP POST before the
1359          // initial flush starts. Starting the gate here ensures those
1360          // calls are queued. If there are no initial messages, the gate
1361          // stays inactive.
1362          if (
1363            !initialFlushDone &&
1364            initialMessages &&
1365            initialMessages.length > 0
1366          ) {
1367            flushGate.start()
1368          }
1369  
1370          newTransport.connect()
1371        } // end wireTransport
1372  
1373        // Bump unconditionally — ANY new transport (v1 or v2) invalidates an
1374        // in-flight v2 handshake. Also bumped in doReconnect().
1375        v2Generation++
1376  
1377        if (useCcrV2) {
1378          // workSessionId is the cse_* form (infrastructure-layer ID from the
1379          // work queue), which is what /v1/code/sessions/{id}/worker/* wants.
1380          // The session_* form (currentSessionId) is NOT usable here —
1381          // handler/convert.go:30 validates TagCodeSession.
1382          const sessionUrl = buildCCRv2SdkUrl(baseUrl, workSessionId)
1383          const thisGen = v2Generation
1384          logForDebugging(
1385            `[bridge:repl] CCR v2: sessionUrl=${sessionUrl} session=${workSessionId} gen=${thisGen}`,
1386          )
1387          void createV2ReplTransport({
1388            sessionUrl,
1389            ingressToken,
1390            sessionId: workSessionId,
1391            initialSequenceNum: lastTransportSequenceNum,
1392          }).then(
1393            t => {
1394              // Teardown started while registerWorker was in flight. Teardown
1395              // saw transport === null and skipped close(); installing now
1396              // would leak CCRClient heartbeat timers and reset
1397              // teardownStarted via wireTransport's side effects.
1398              if (pollController.signal.aborted) {
1399                t.close()
1400                return
1401              }
1402              // onWorkReceived may have fired again while registerWorker()
1403              // was in flight (server re-dispatch with a fresh JWT). The
1404              // transport !== null check alone gets the race wrong when BOTH
1405              // attempts saw transport === null — it keeps the first resolver
1406              // (stale epoch) and discards the second (correct epoch). The
1407              // generation check catches it regardless of transport state.
1408              if (thisGen !== v2Generation) {
1409                logForDebugging(
1410                  `[bridge:repl] CCR v2: discarding stale handshake gen=${thisGen} current=${v2Generation}`,
1411                )
1412                t.close()
1413                return
1414              }
1415              wireTransport(t)
1416            },
1417            (err: unknown) => {
1418              logForDebugging(
1419                `[bridge:repl] CCR v2: createV2ReplTransport failed: ${errorMessage(err)}`,
1420                { level: 'error' },
1421              )
1422              logEvent('tengu_bridge_repl_ccr_v2_init_failed', {})
1423              // If a newer attempt is in flight or already succeeded, don't
1424              // touch its work item — our failure is irrelevant.
1425              if (thisGen !== v2Generation) return
1426              // Release the work item so the server re-dispatches immediately
1427              // instead of waiting for its own timeout. currentWorkId was set
1428              // above; without this, the session looks stuck to the user.
1429              if (currentWorkId) {
1430                void api
1431                  .stopWork(environmentId, currentWorkId, false)
1432                  .catch((e: unknown) => {
1433                    logForDebugging(
1434                      `[bridge:repl] stopWork after v2 init failure: ${errorMessage(e)}`,
1435                    )
1436                  })
1437                currentWorkId = null
1438                currentIngressToken = null
1439              }
1440              wakePollLoop()
1441            },
1442          )
1443        } else {
1444          // v1: HybridTransport (WS reads + POST writes to Session-Ingress).
1445          // autoReconnect is true (default) — when the WS dies, the transport
1446          // reconnects automatically with exponential backoff. POST writes
1447          // continue during reconnection (they use getSessionIngressAuthToken()
1448          // independently of WS state). The poll loop remains as a secondary
1449          // fallback if the reconnect budget is exhausted (10 min).
1450          //
1451          // Auth: uses OAuth tokens directly instead of the JWT from the work
1452          // secret. refreshHeaders picks up the latest OAuth token on each
1453          // WS reconnect attempt.
1454          const wsUrl = buildSdkUrl(sessionIngressUrl, workSessionId)
1455          logForDebugging(`[bridge:repl] Ingress URL: ${wsUrl}`)
1456          logForDebugging(
1457            `[bridge:repl] Creating HybridTransport: session=${workSessionId}`,
1458          )
1459          // v1OauthToken was validated non-null above (we'd have returned early).
1460          const oauthToken = v1OauthToken ?? ''
1461          wireTransport(
1462            createV1ReplTransport(
1463              new HybridTransport(
1464                new URL(wsUrl),
1465                {
1466                  Authorization: `Bearer ${oauthToken}`,
1467                  'anthropic-version': '2023-06-01',
1468                },
1469                workSessionId,
1470                () => ({
1471                  Authorization: `Bearer ${getOAuthToken() ?? oauthToken}`,
1472                  'anthropic-version': '2023-06-01',
1473                }),
1474                // Cap retries so a persistently-failing session-ingress can't
1475                // pin the uploader drain loop for the lifetime of the bridge.
1476                // 50 attempts ≈ 20 min (15s POST timeout + 8s backoff + jitter
1477                // per cycle at steady state). Bridge-only — 1P keeps indefinite.
1478                {
1479                  maxConsecutiveFailures: 50,
1480                  isBridge: true,
1481                  onBatchDropped: () => {
1482                    onStateChange?.(
1483                      'reconnecting',
1484                      'Lost sync with Remote Control — events could not be delivered',
1485                    )
1486                    // SI has been down ~20 min. Wake the poll loop so that when
1487                    // SI recovers, next poll → onWorkReceived → fresh transport
1488                    // → initial flush succeeds → onStateChange('connected') at
1489                    // ~line 1420. Without this, state stays 'reconnecting' even
1490                    // after SI recovers — daemon.ts:437 denies all permissions,
1491                    // useReplBridge.ts:311 keeps replBridgeSessionActive=false.
1492                    // If the env was archived during the outage, poll 404 →
1493                    // onEnvironmentLost recovery path handles it.
1494                    wakePollLoop()
1495                  },
1496                },
1497              ),
1498            ),
1499          )
1500        }
1501      },
1502    }
1503    void startWorkPollLoop(pollOpts)
1504  
1505    // Perpetual mode: hourly mtime refresh of the crash-recovery pointer.
1506    // The onWorkReceived refresh only fires per user prompt — a
1507    // daemon idle for >4h would have a stale pointer, and the next restart
1508    // would clear it (readBridgePointer TTL check) → fresh session. The
1509    // standalone bridge (bridgeMain.ts) has an identical hourly timer.
1510    const pointerRefreshTimer = perpetual
1511      ? setInterval(() => {
1512          // doReconnect() reassigns currentSessionId/environmentId non-
1513          // atomically (env at ~:634, session at ~:719, awaits in between).
1514          // If this timer fires in that window, its fire-and-forget write can
1515          // race with (and overwrite) doReconnect's own pointer write at ~:740,
1516          // leaving the pointer at the now-archived old session. doReconnect
1517          // writes the pointer itself, so skipping here is free.
1518          if (reconnectPromise) return
1519          void writeBridgePointer(dir, {
1520            sessionId: currentSessionId,
1521            environmentId,
1522            source: 'repl',
1523          })
1524        }, 60 * 60_000)
1525      : null
1526    pointerRefreshTimer?.unref?.()
1527  
1528    // Push a silent keep_alive frame on a fixed interval so upstream proxies
1529    // and the session-ingress layer don't GC an otherwise-idle remote control
1530    // session. The keep_alive type is filtered before reaching any client UI
1531    // (Query.ts drops it; web/iOS/Android never see it in their message loop).
1532    // Interval comes from GrowthBook (tengu_bridge_poll_interval_config
1533    // session_keepalive_interval_v2_ms, default 120s); 0 = disabled.
1534    const keepAliveIntervalMs =
1535      getPollIntervalConfig().session_keepalive_interval_v2_ms
1536    const keepAliveTimer =
1537      keepAliveIntervalMs > 0
1538        ? setInterval(() => {
1539            if (!transport) return
1540            logForDebugging('[bridge:repl] keep_alive sent')
1541            void transport.write({ type: 'keep_alive' }).catch((err: unknown) => {
1542              logForDebugging(
1543                `[bridge:repl] keep_alive write failed: ${errorMessage(err)}`,
1544              )
1545            })
1546          }, keepAliveIntervalMs)
1547        : null
1548    keepAliveTimer?.unref?.()
1549  
1550    // Shared teardown sequence used by both cleanup registration and
1551    // the explicit teardown() method on the returned handle.
1552    let teardownStarted = false
1553    doTeardownImpl = async (): Promise<void> => {
1554      if (teardownStarted) {
1555        logForDebugging(
1556          `[bridge:repl] Teardown already in progress, skipping duplicate call env=${environmentId} session=${currentSessionId}`,
1557        )
1558        return
1559      }
1560      teardownStarted = true
1561      const teardownStart = Date.now()
1562      logForDebugging(
1563        `[bridge:repl] Teardown starting: env=${environmentId} session=${currentSessionId} workId=${currentWorkId ?? 'none'} transportState=${transport?.getStateLabel() ?? 'null'}`,
1564      )
1565  
1566      if (pointerRefreshTimer !== null) {
1567        clearInterval(pointerRefreshTimer)
1568      }
1569      if (keepAliveTimer !== null) {
1570        clearInterval(keepAliveTimer)
1571      }
1572      if (sigusr2Handler) {
1573        process.off('SIGUSR2', sigusr2Handler)
1574      }
1575      if (process.env.USER_TYPE === 'ant') {
1576        clearBridgeDebugHandle()
1577        debugFireClose = null
1578      }
1579      pollController.abort()
1580      logForDebugging('[bridge:repl] Teardown: poll loop aborted')
1581  
1582      // Capture the live transport's seq BEFORE close() — close() is sync
1583      // (just aborts the SSE fetch) and does NOT invoke onClose, so the
1584      // setOnClose capture path never runs for explicit teardown.
1585      // Without this, getSSESequenceNum() after teardown returns the stale
1586      // lastTransportSequenceNum (captured at the last transport swap), and
1587      // daemon callers persisting that value lose all events since then.
1588      if (transport) {
1589        const finalSeq = transport.getLastSequenceNum()
1590        if (finalSeq > lastTransportSequenceNum) {
1591          lastTransportSequenceNum = finalSeq
1592        }
1593      }
1594  
1595      if (perpetual) {
1596        // Perpetual teardown is LOCAL-ONLY — do not send result, do not call
1597        // stopWork, do not close the transport. All of those signal the
1598        // server (and any mobile/attach subscribers) that the session is
1599        // ending. Instead: stop polling, let the socket die with the
1600        // process; the backend times the work-item lease back to pending on
1601        // its own (TTL 300s). Next daemon start reads the pointer and
1602        // reconnectSession re-queues work.
1603        transport = null
1604        flushGate.drop()
1605        // Refresh the pointer mtime so that sessions lasting longer than
1606        // BRIDGE_POINTER_TTL_MS (4h) don't appear stale on next start.
1607        await writeBridgePointer(dir, {
1608          sessionId: currentSessionId,
1609          environmentId,
1610          source: 'repl',
1611        })
1612        logForDebugging(
1613          `[bridge:repl] Teardown (perpetual): leaving env=${environmentId} session=${currentSessionId} alive on server, duration=${Date.now() - teardownStart}ms`,
1614        )
1615        return
1616      }
1617  
1618      // Fire the result message, then archive, THEN close. transport.write()
1619      // only enqueues (SerialBatchEventUploader resolves on buffer-add); the
1620      // stopWork/archive latency (~200-500ms) is the drain window for the
1621      // result POST. Closing BEFORE archive meant relying on HybridTransport's
1622      // void-ed 3s grace period, which nothing awaits — forceExit can kill the
1623      // socket mid-POST. Same reorder as remoteBridgeCore.ts teardown (#22803).
1624      const teardownTransport = transport
1625      transport = null
1626      flushGate.drop()
1627      if (teardownTransport) {
1628        void teardownTransport.write(makeResultMessage(currentSessionId))
1629      }
1630  
1631      const stopWorkP = currentWorkId
1632        ? api
1633            .stopWork(environmentId, currentWorkId, true)
1634            .then(() => {
1635              logForDebugging('[bridge:repl] Teardown: stopWork completed')
1636            })
1637            .catch((err: unknown) => {
1638              logForDebugging(
1639                `[bridge:repl] Teardown stopWork failed: ${errorMessage(err)}`,
1640              )
1641            })
1642        : Promise.resolve()
1643  
1644      // Run stopWork and archiveSession in parallel. gracefulShutdown.ts:407
1645      // races runCleanupFunctions() against 2s (NOT the 5s outer failsafe),
1646      // so archive is capped at 1.5s at the injection site to stay under budget.
1647      // archiveSession is contractually no-throw; the injected implementations
1648      // log their own success/failure internally.
1649      await Promise.all([stopWorkP, archiveSession(currentSessionId)])
1650  
1651      teardownTransport?.close()
1652      logForDebugging('[bridge:repl] Teardown: transport closed')
1653  
1654      await api.deregisterEnvironment(environmentId).catch((err: unknown) => {
1655        logForDebugging(
1656          `[bridge:repl] Teardown deregister failed: ${errorMessage(err)}`,
1657        )
1658      })
1659  
1660      // Clear the crash-recovery pointer — explicit disconnect or clean REPL
1661      // exit means the user is done with this session. Crash/kill-9 never
1662      // reaches this line, leaving the pointer for next-launch recovery.
1663      await clearBridgePointer(dir)
1664  
1665      logForDebugging(
1666        `[bridge:repl] Teardown complete: env=${environmentId} duration=${Date.now() - teardownStart}ms`,
1667      )
1668    }
1669  
1670    // 8. Register cleanup for graceful shutdown
1671    const unregister = registerCleanup(() => doTeardownImpl?.())
1672  
1673    logForDebugging(
1674      `[bridge:repl] Ready: env=${environmentId} session=${currentSessionId}`,
1675    )
1676    onStateChange?.('ready')
1677  
1678    return {
1679      get bridgeSessionId() {
1680        return currentSessionId
1681      },
1682      get environmentId() {
1683        return environmentId
1684      },
1685      getSSESequenceNum() {
1686        // lastTransportSequenceNum only updates when a transport is CLOSED
1687        // (captured at swap/onClose). During normal operation the CURRENT
1688        // transport's live seq isn't reflected there. Merge both so callers
1689        // (e.g. daemon persistState()) get the actual high-water mark.
1690        const live = transport?.getLastSequenceNum() ?? 0
1691        return Math.max(lastTransportSequenceNum, live)
1692      },
1693      sessionIngressUrl,
1694      writeMessages(messages) {
1695        // Filter to user/assistant messages that haven't already been sent.
1696        // Two layers of dedup:
1697        //  - initialMessageUUIDs: messages sent as session creation events
1698        //  - recentPostedUUIDs: messages recently sent via POST
1699        const filtered = messages.filter(
1700          m =>
1701            isEligibleBridgeMessage(m) &&
1702            !initialMessageUUIDs.has(m.uuid) &&
1703            !recentPostedUUIDs.has(m.uuid),
1704        )
1705        if (filtered.length === 0) return
1706  
1707        // Fire onUserMessage for title derivation. Scan before the flushGate
1708        // check — prompts are title-worthy even if they queue behind the
1709        // initial history flush. Keeps calling on every title-worthy message
1710        // until the callback returns true; the caller owns the policy.
1711        if (!userMessageCallbackDone) {
1712          for (const m of filtered) {
1713            const text = extractTitleText(m)
1714            if (text !== undefined && onUserMessage?.(text, currentSessionId)) {
1715              userMessageCallbackDone = true
1716              break
1717            }
1718          }
1719        }
1720  
1721        // Queue messages while the initial flush is in progress to prevent
1722        // them from arriving at the server interleaved with history.
1723        if (flushGate.enqueue(...filtered)) {
1724          logForDebugging(
1725            `[bridge:repl] Queued ${filtered.length} message(s) during initial flush`,
1726          )
1727          return
1728        }
1729  
1730        if (!transport) {
1731          const types = filtered.map(m => m.type).join(',')
1732          logForDebugging(
1733            `[bridge:repl] Transport not configured, dropping ${filtered.length} message(s) [${types}] for session=${currentSessionId}`,
1734            { level: 'warn' },
1735          )
1736          return
1737        }
1738  
1739        // Track in the bounded ring buffer for echo filtering and dedup.
1740        for (const msg of filtered) {
1741          recentPostedUUIDs.add(msg.uuid)
1742        }
1743  
1744        logForDebugging(
1745          `[bridge:repl] Sending ${filtered.length} message(s) via transport`,
1746        )
1747  
1748        // Convert to SDK format and send via HTTP POST (HybridTransport).
1749        // The web UI receives them via the subscribe WebSocket.
1750        const sdkMessages = toSDKMessages(filtered)
1751        const events = sdkMessages.map(sdkMsg => ({
1752          ...sdkMsg,
1753          session_id: currentSessionId,
1754        }))
1755        void transport.writeBatch(events)
1756      },
1757      writeSdkMessages(messages) {
1758        // Daemon path: query() already yields SDKMessage, skip conversion.
1759        // Still run echo dedup (server bounces writes back on the WS).
1760        // No initialMessageUUIDs filter — daemon has no initial messages.
1761        // No flushGate — daemon never starts it (no initial flush).
1762        const filtered = messages.filter(
1763          m => !m.uuid || !recentPostedUUIDs.has(m.uuid),
1764        )
1765        if (filtered.length === 0) return
1766        if (!transport) {
1767          logForDebugging(
1768            `[bridge:repl] Transport not configured, dropping ${filtered.length} SDK message(s) for session=${currentSessionId}`,
1769            { level: 'warn' },
1770          )
1771          return
1772        }
1773        for (const msg of filtered) {
1774          if (msg.uuid) recentPostedUUIDs.add(msg.uuid)
1775        }
1776        const events = filtered.map(m => ({ ...m, session_id: currentSessionId }))
1777        void transport.writeBatch(events)
1778      },
1779      sendControlRequest(request: SDKControlRequest) {
1780        if (!transport) {
1781          logForDebugging(
1782            '[bridge:repl] Transport not configured, skipping control_request',
1783          )
1784          return
1785        }
1786        const event = { ...request, session_id: currentSessionId }
1787        void transport.write(event)
1788        logForDebugging(
1789          `[bridge:repl] Sent control_request request_id=${request.request_id}`,
1790        )
1791      },
1792      sendControlResponse(response: SDKControlResponse) {
1793        if (!transport) {
1794          logForDebugging(
1795            '[bridge:repl] Transport not configured, skipping control_response',
1796          )
1797          return
1798        }
1799        const event = { ...response, session_id: currentSessionId }
1800        void transport.write(event)
1801        logForDebugging('[bridge:repl] Sent control_response')
1802      },
1803      sendControlCancelRequest(requestId: string) {
1804        if (!transport) {
1805          logForDebugging(
1806            '[bridge:repl] Transport not configured, skipping control_cancel_request',
1807          )
1808          return
1809        }
1810        const event = {
1811          type: 'control_cancel_request' as const,
1812          request_id: requestId,
1813          session_id: currentSessionId,
1814        }
1815        void transport.write(event)
1816        logForDebugging(
1817          `[bridge:repl] Sent control_cancel_request request_id=${requestId}`,
1818        )
1819      },
1820      sendResult() {
1821        if (!transport) {
1822          logForDebugging(
1823            `[bridge:repl] sendResult: skipping, transport not configured session=${currentSessionId}`,
1824          )
1825          return
1826        }
1827        void transport.write(makeResultMessage(currentSessionId))
1828        logForDebugging(
1829          `[bridge:repl] Sent result for session=${currentSessionId}`,
1830        )
1831      },
1832      async teardown() {
1833        unregister()
1834        await doTeardownImpl?.()
1835        logForDebugging('[bridge:repl] Torn down')
1836        logEvent('tengu_bridge_repl_teardown', {})
1837      },
1838    }
1839  }
1840  
1841  /**
1842   * Persistent poll loop for work items. Runs in the background for the
1843   * lifetime of the bridge connection.
1844   *
1845   * When a work item arrives, acknowledges it and calls onWorkReceived
1846   * with the session ID and ingress token (which connects the ingress
1847   * WebSocket). Then continues polling — the server will dispatch a new
1848   * work item if the ingress WebSocket drops, allowing automatic
1849   * reconnection without tearing down the bridge.
1850   */
1851  async function startWorkPollLoop({
1852    api,
1853    getCredentials,
1854    signal,
1855    onStateChange,
1856    onWorkReceived,
1857    onEnvironmentLost,
1858    getWsState,
1859    isAtCapacity,
1860    capacitySignal,
1861    onFatalError,
1862    getPollIntervalConfig = () => DEFAULT_POLL_CONFIG,
1863    getHeartbeatInfo,
1864    onHeartbeatFatal,
1865  }: {
1866    api: BridgeApiClient
1867    getCredentials: () => { environmentId: string; environmentSecret: string }
1868    signal: AbortSignal
1869    onStateChange?: (state: BridgeState, detail?: string) => void
1870    onWorkReceived: (
1871      sessionId: string,
1872      ingressToken: string,
1873      workId: string,
1874      useCodeSessions: boolean,
1875    ) => void
1876    /** Called when the environment has been deleted. Returns new credentials or null. */
1877    onEnvironmentLost?: () => Promise<{
1878      environmentId: string
1879      environmentSecret: string
1880    } | null>
1881    /** Returns the current WebSocket readyState label for diagnostic logging. */
1882    getWsState?: () => string
1883    /**
1884     * Returns true when the caller cannot accept new work (transport already
1885     * connected). When true, the loop polls at the configured at-capacity
1886     * interval as a heartbeat only. Server-side BRIDGE_LAST_POLL_TTL is
1887     * 4 hours — anything shorter than that is sufficient for liveness.
1888     */
1889    isAtCapacity?: () => boolean
1890    /**
1891     * Produces a signal that aborts when capacity frees up (transport lost),
1892     * merged with the loop signal. Used to interrupt the at-capacity sleep
1893     * so recovery polling starts immediately.
1894     */
1895    capacitySignal?: () => CapacitySignal
1896    /** Called on unrecoverable errors (e.g. server-side expiry) to trigger full teardown. */
1897    onFatalError?: () => void
1898    /** Poll interval config getter — defaults to DEFAULT_POLL_CONFIG. */
1899    getPollIntervalConfig?: () => PollIntervalConfig
1900    /**
1901     * Returns the current work ID and session ingress token for heartbeat.
1902     * When null, heartbeat is not possible (no active work item).
1903     */
1904    getHeartbeatInfo?: () => {
1905      environmentId: string
1906      workId: string
1907      sessionToken: string
1908    } | null
1909    /**
1910     * Called when heartbeatWork throws BridgeFatalError (401/403/404/410 —
1911     * JWT expired or work item gone). Caller should tear down the transport
1912     * + work state so isAtCapacity() flips to false and the loop fast-polls
1913     * for the server's re-dispatched work item. When provided, the loop
1914     * SKIPS the at-capacity backoff sleep (which would otherwise cause a
1915     * ~10-minute dead window before recovery). When omitted, falls back to
1916     * the backoff sleep to avoid a tight poll+heartbeat loop.
1917     */
1918    onHeartbeatFatal?: (err: BridgeFatalError) => void
1919  }): Promise<void> {
1920    const MAX_ENVIRONMENT_RECREATIONS = 3
1921  
1922    logForDebugging(
1923      `[bridge:repl] Starting work poll loop for env=${getCredentials().environmentId}`,
1924    )
1925  
1926    let consecutiveErrors = 0
1927    let firstErrorTime: number | null = null
1928    let lastPollErrorTime: number | null = null
1929    let environmentRecreations = 0
1930    // Set when the at-capacity sleep overruns its deadline by a large margin
1931    // (process suspension). Consumed at the top of the next iteration to
1932    // force one fast-poll cycle — isAtCapacity() is `transport !== null`,
1933    // which stays true while the transport auto-reconnects, so the poll
1934    // loop would otherwise go straight back to a 10-minute sleep on a
1935    // transport that may be pointed at a dead socket.
1936    let suspensionDetected = false
1937  
1938    while (!signal.aborted) {
1939      // Capture credentials outside try so the catch block can detect
1940      // whether a concurrent reconnection replaced the environment.
1941      const { environmentId: envId, environmentSecret: envSecret } =
1942        getCredentials()
1943      const pollConfig = getPollIntervalConfig()
1944      try {
1945        const work = await api.pollForWork(
1946          envId,
1947          envSecret,
1948          signal,
1949          pollConfig.reclaim_older_than_ms,
1950        )
1951  
1952        // A successful poll proves the env is genuinely healthy — reset the
1953        // env-loss counter so events hours apart each start fresh. Outside
1954        // the state-change guard below because onEnvLost's success path
1955        // already emits 'ready'; emitting again here would be a duplicate.
1956        // (onEnvLost returning creds does NOT reset this — that would break
1957        // oscillation protection when the new env immediately dies.)
1958        environmentRecreations = 0
1959  
1960        // Reset error tracking on successful poll
1961        if (consecutiveErrors > 0) {
1962          logForDebugging(
1963            `[bridge:repl] Poll recovered after ${consecutiveErrors} consecutive error(s)`,
1964          )
1965          consecutiveErrors = 0
1966          firstErrorTime = null
1967          lastPollErrorTime = null
1968          onStateChange?.('ready')
1969        }
1970  
1971        if (!work) {
1972          // Read-and-clear: after a detected suspension, skip the at-capacity
1973          // branch exactly once. The pollForWork above already refreshed the
1974          // server's BRIDGE_LAST_POLL_TTL; this fast cycle gives any
1975          // re-dispatched work item a chance to land before we go back under.
1976          const skipAtCapacityOnce = suspensionDetected
1977          suspensionDetected = false
1978          if (isAtCapacity?.() && capacitySignal && !skipAtCapacityOnce) {
1979            const atCapMs = pollConfig.poll_interval_ms_at_capacity
1980            // Heartbeat loops WITHOUT polling. When at-capacity polling is also
1981            // enabled (atCapMs > 0), the loop tracks a deadline and breaks out
1982            // to poll at that interval — heartbeat and poll compose instead of
1983            // one suppressing the other. Breaks out when:
1984            //   - Poll deadline reached (atCapMs > 0 only)
1985            //   - Auth fails (JWT expired → poll refreshes tokens)
1986            //   - Capacity wake fires (transport lost → poll for new work)
1987            //   - Heartbeat config disabled (GrowthBook update)
1988            //   - Loop aborted (shutdown)
1989            if (
1990              pollConfig.non_exclusive_heartbeat_interval_ms > 0 &&
1991              getHeartbeatInfo
1992            ) {
1993              logEvent('tengu_bridge_heartbeat_mode_entered', {
1994                heartbeat_interval_ms:
1995                  pollConfig.non_exclusive_heartbeat_interval_ms,
1996              })
1997              // Deadline computed once at entry — GB updates to atCapMs don't
1998              // shift an in-flight deadline (next entry picks up the new value).
1999              const pollDeadline = atCapMs > 0 ? Date.now() + atCapMs : null
2000              let needsBackoff = false
2001              let hbCycles = 0
2002              while (
2003                !signal.aborted &&
2004                isAtCapacity() &&
2005                (pollDeadline === null || Date.now() < pollDeadline)
2006              ) {
2007                const hbConfig = getPollIntervalConfig()
2008                if (hbConfig.non_exclusive_heartbeat_interval_ms <= 0) break
2009  
2010                const info = getHeartbeatInfo()
2011                if (!info) break
2012  
2013                // Capture capacity signal BEFORE the async heartbeat call so
2014                // a transport loss during the HTTP request is caught by the
2015                // subsequent sleep.
2016                const cap = capacitySignal()
2017  
2018                try {
2019                  await api.heartbeatWork(
2020                    info.environmentId,
2021                    info.workId,
2022                    info.sessionToken,
2023                  )
2024                } catch (err) {
2025                  logForDebugging(
2026                    `[bridge:repl:heartbeat] Failed: ${errorMessage(err)}`,
2027                  )
2028                  if (err instanceof BridgeFatalError) {
2029                    cap.cleanup()
2030                    logEvent('tengu_bridge_heartbeat_error', {
2031                      status:
2032                        err.status as unknown as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2033                      error_type: (err.status === 401 || err.status === 403
2034                        ? 'auth_failed'
2035                        : 'fatal') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2036                    })
2037                    // JWT expired (401/403) or work item gone (404/410).
2038                    // Either way the current transport is dead — SSE
2039                    // reconnects and CCR writes will fail on the same
2040                    // stale token. If the caller gave us a recovery hook,
2041                    // tear down work state and skip backoff: isAtCapacity()
2042                    // flips to false, next outer-loop iteration fast-polls
2043                    // for the server's re-dispatched work item. Without
2044                    // the hook, backoff to avoid tight poll+heartbeat loop.
2045                    if (onHeartbeatFatal) {
2046                      onHeartbeatFatal(err)
2047                      logForDebugging(
2048                        `[bridge:repl:heartbeat] Fatal (status=${err.status}), work state cleared — fast-polling for re-dispatch`,
2049                      )
2050                    } else {
2051                      needsBackoff = true
2052                    }
2053                    break
2054                  }
2055                }
2056  
2057                hbCycles++
2058                await sleep(
2059                  hbConfig.non_exclusive_heartbeat_interval_ms,
2060                  cap.signal,
2061                )
2062                cap.cleanup()
2063              }
2064  
2065              const exitReason = needsBackoff
2066                ? 'error'
2067                : signal.aborted
2068                  ? 'shutdown'
2069                  : !isAtCapacity()
2070                    ? 'capacity_changed'
2071                    : pollDeadline !== null && Date.now() >= pollDeadline
2072                      ? 'poll_due'
2073                      : 'config_disabled'
2074              logEvent('tengu_bridge_heartbeat_mode_exited', {
2075                reason:
2076                  exitReason as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2077                heartbeat_cycles: hbCycles,
2078              })
2079  
2080              // On auth_failed or fatal, backoff before polling to avoid a
2081              // tight poll+heartbeat loop. Fall through to the shared sleep
2082              // below — it's the same capacitySignal-wrapped sleep the legacy
2083              // path uses, and both need the suspension-overrun check.
2084              if (!needsBackoff) {
2085                if (exitReason === 'poll_due') {
2086                  // bridgeApi throttles empty-poll logs (EMPTY_POLL_LOG_INTERVAL=100)
2087                  // so the once-per-10min poll_due poll is invisible at counter=2.
2088                  // Log it here so verification runs see both endpoints in the debug log.
2089                  logForDebugging(
2090                    `[bridge:repl] Heartbeat poll_due after ${hbCycles} cycles — falling through to pollForWork`,
2091                  )
2092                }
2093                continue
2094              }
2095            }
2096            // At-capacity sleep — reached by both the legacy path (heartbeat
2097            // disabled) and the heartbeat-backoff path (needsBackoff=true).
2098            // Merged so the suspension detector covers both; previously the
2099            // backoff path had no overrun check and could go straight back
2100            // under for 10 min after a laptop wake. Use atCapMs when enabled,
2101            // else the heartbeat interval as a floor (guaranteed > 0 on the
2102            // backoff path) so heartbeat-only configs don't tight-loop.
2103            const sleepMs =
2104              atCapMs > 0
2105                ? atCapMs
2106                : pollConfig.non_exclusive_heartbeat_interval_ms
2107            if (sleepMs > 0) {
2108              const cap = capacitySignal()
2109              const sleepStart = Date.now()
2110              await sleep(sleepMs, cap.signal)
2111              cap.cleanup()
2112              // Process-suspension detector. A setTimeout overshooting its
2113              // deadline by 60s means the process was suspended (laptop lid,
2114              // SIGSTOP, VM pause) — even a pathological GC pause is seconds,
2115              // not minutes. Early aborts (wakePollLoop → cap.signal) produce
2116              // overrun < 0 and fall through. Note: this only catches sleeps
2117              // that outlast their deadline; WebSocketTransport's ping
2118              // interval (10s granularity) is the primary detector for shorter
2119              // suspensions. This is the backstop for when that detector isn't
2120              // running (transport mid-reconnect, interval stopped).
2121              const overrun = Date.now() - sleepStart - sleepMs
2122              if (overrun > 60_000) {
2123                logForDebugging(
2124                  `[bridge:repl] At-capacity sleep overran by ${Math.round(overrun / 1000)}s — process suspension detected, forcing one fast-poll cycle`,
2125                )
2126                logEvent('tengu_bridge_repl_suspension_detected', {
2127                  overrun_ms: overrun,
2128                })
2129                suspensionDetected = true
2130              }
2131            }
2132          } else {
2133            await sleep(pollConfig.poll_interval_ms_not_at_capacity, signal)
2134          }
2135          continue
2136        }
2137  
2138        // Decode before type dispatch — need the JWT for the explicit ack.
2139        let secret
2140        try {
2141          secret = decodeWorkSecret(work.secret)
2142        } catch (err) {
2143          logForDebugging(
2144            `[bridge:repl] Failed to decode work secret: ${errorMessage(err)}`,
2145          )
2146          logEvent('tengu_bridge_repl_work_secret_failed', {})
2147          // Can't ack (needs the JWT we failed to decode). stopWork uses OAuth.
2148          // Prevents XAUTOCLAIM re-delivering this poisoned item every cycle.
2149          await api.stopWork(envId, work.id, false).catch(() => {})
2150          continue
2151        }
2152  
2153        // Explicitly acknowledge to prevent redelivery. Non-fatal on failure:
2154        // server re-delivers, and the onWorkReceived callback handles dedup.
2155        logForDebugging(`[bridge:repl] Acknowledging workId=${work.id}`)
2156        try {
2157          await api.acknowledgeWork(envId, work.id, secret.session_ingress_token)
2158        } catch (err) {
2159          logForDebugging(
2160            `[bridge:repl] Acknowledge failed workId=${work.id}: ${errorMessage(err)}`,
2161          )
2162        }
2163  
2164        if (work.data.type === 'healthcheck') {
2165          logForDebugging('[bridge:repl] Healthcheck received')
2166          continue
2167        }
2168  
2169        if (work.data.type === 'session') {
2170          const workSessionId = work.data.id
2171          try {
2172            validateBridgeId(workSessionId, 'session_id')
2173          } catch {
2174            logForDebugging(
2175              `[bridge:repl] Invalid session_id in work: ${workSessionId}`,
2176            )
2177            continue
2178          }
2179  
2180          onWorkReceived(
2181            workSessionId,
2182            secret.session_ingress_token,
2183            work.id,
2184            secret.use_code_sessions === true,
2185          )
2186          logForDebugging('[bridge:repl] Work accepted, continuing poll loop')
2187        }
2188      } catch (err) {
2189        if (signal.aborted) break
2190  
2191        // Detect permanent "environment deleted" error — no amount of
2192        // retrying will recover. Re-register a new environment instead.
2193        // Checked BEFORE the generic BridgeFatalError bail. pollForWork uses
2194        // validateStatus: s => s < 500, so 404 is always wrapped into a
2195        // BridgeFatalError by handleErrorStatus() — never an axios-shaped
2196        // error. The poll endpoint's only path param is the env ID; 404
2197        // unambiguously means env-gone (no-work is a 200 with null body).
2198        // The server sends error.type='not_found_error' (standard Anthropic
2199        // API shape), not a bridge-specific string — but status===404 is
2200        // the real signal and survives body-shape changes.
2201        if (
2202          err instanceof BridgeFatalError &&
2203          err.status === 404 &&
2204          onEnvironmentLost
2205        ) {
2206          // If credentials have already been refreshed by a concurrent
2207          // reconnection (e.g. WS close handler), the stale poll's error
2208          // is expected — skip onEnvironmentLost and retry with fresh creds.
2209          const currentEnvId = getCredentials().environmentId
2210          if (envId !== currentEnvId) {
2211            logForDebugging(
2212              `[bridge:repl] Stale poll error for old env=${envId}, current env=${currentEnvId} — skipping onEnvironmentLost`,
2213            )
2214            consecutiveErrors = 0
2215            firstErrorTime = null
2216            continue
2217          }
2218  
2219          environmentRecreations++
2220          logForDebugging(
2221            `[bridge:repl] Environment deleted, attempting re-registration (attempt ${environmentRecreations}/${MAX_ENVIRONMENT_RECREATIONS})`,
2222          )
2223          logEvent('tengu_bridge_repl_env_lost', {
2224            attempt: environmentRecreations,
2225          } as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS)
2226  
2227          if (environmentRecreations > MAX_ENVIRONMENT_RECREATIONS) {
2228            logForDebugging(
2229              `[bridge:repl] Environment re-registration limit reached (${MAX_ENVIRONMENT_RECREATIONS}), giving up`,
2230            )
2231            onStateChange?.(
2232              'failed',
2233              'Environment deleted and re-registration limit reached',
2234            )
2235            onFatalError?.()
2236            break
2237          }
2238  
2239          onStateChange?.('reconnecting', 'environment lost, recreating session')
2240          const newCreds = await onEnvironmentLost()
2241          // doReconnect() makes several sequential network calls (1-5s).
2242          // If the user triggered teardown during that window, its internal
2243          // abort checks return false — but we need to re-check here to
2244          // avoid emitting a spurious 'failed' + onFatalError() during
2245          // graceful shutdown.
2246          if (signal.aborted) break
2247          if (newCreds) {
2248            // Credentials are updated in the outer scope via
2249            // reconnectEnvironmentWithSession — getCredentials() will
2250            // return the fresh values on the next poll iteration.
2251            // Do NOT reset environmentRecreations here — onEnvLost returning
2252            // creds only proves we tried to fix it, not that the env is
2253            // healthy. A successful poll (above) is the reset point; if the
2254            // new env immediately dies again we still want the limit to fire.
2255            consecutiveErrors = 0
2256            firstErrorTime = null
2257            onStateChange?.('ready')
2258            logForDebugging(
2259              `[bridge:repl] Re-registered environment: ${newCreds.environmentId}`,
2260            )
2261            continue
2262          }
2263  
2264          onStateChange?.(
2265            'failed',
2266            'Environment deleted and re-registration failed',
2267          )
2268          onFatalError?.()
2269          break
2270        }
2271  
2272        // Fatal errors (401/403/404/410) — no point retrying
2273        if (err instanceof BridgeFatalError) {
2274          const isExpiry = isExpiredErrorType(err.errorType)
2275          const isSuppressible = isSuppressible403(err)
2276          logForDebugging(
2277            `[bridge:repl] Fatal poll error: ${err.message} (status=${err.status}, type=${err.errorType ?? 'unknown'})${isSuppressible ? ' (suppressed)' : ''}`,
2278          )
2279          logEvent('tengu_bridge_repl_fatal_error', {
2280            status: err.status,
2281            error_type:
2282              err.errorType as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2283          })
2284          logForDiagnosticsNoPII(
2285            isExpiry ? 'info' : 'error',
2286            'bridge_repl_fatal_error',
2287            { status: err.status, error_type: err.errorType },
2288          )
2289          // Cosmetic 403 errors (e.g., external_poll_sessions scope,
2290          // environments:manage permission) — suppress user-visible error
2291          // but always trigger teardown so cleanup runs.
2292          if (!isSuppressible) {
2293            onStateChange?.(
2294              'failed',
2295              isExpiry
2296                ? 'session expired · /remote-control to reconnect'
2297                : err.message,
2298            )
2299          }
2300          // Always trigger teardown — matches bridgeMain.ts where fatalExit=true
2301          // is unconditional and post-loop cleanup always runs.
2302          onFatalError?.()
2303          break
2304        }
2305  
2306        const now = Date.now()
2307  
2308        // Detect system sleep/wake: if the gap since the last poll error
2309        // greatly exceeds the max backoff delay, the machine likely slept.
2310        // Reset error tracking so we retry with a fresh budget instead of
2311        // immediately giving up.
2312        if (
2313          lastPollErrorTime !== null &&
2314          now - lastPollErrorTime > POLL_ERROR_MAX_DELAY_MS * 2
2315        ) {
2316          logForDebugging(
2317            `[bridge:repl] Detected system sleep (${Math.round((now - lastPollErrorTime) / 1000)}s gap), resetting poll error budget`,
2318          )
2319          logForDiagnosticsNoPII('info', 'bridge_repl_poll_sleep_detected', {
2320            gapMs: now - lastPollErrorTime,
2321          })
2322          consecutiveErrors = 0
2323          firstErrorTime = null
2324        }
2325        lastPollErrorTime = now
2326  
2327        consecutiveErrors++
2328        if (firstErrorTime === null) {
2329          firstErrorTime = now
2330        }
2331        const elapsed = now - firstErrorTime
2332        const httpStatus = extractHttpStatus(err)
2333        const errMsg = describeAxiosError(err)
2334        const wsLabel = getWsState?.() ?? 'unknown'
2335  
2336        logForDebugging(
2337          `[bridge:repl] Poll error (attempt ${consecutiveErrors}, elapsed ${Math.round(elapsed / 1000)}s, ws=${wsLabel}): ${errMsg}`,
2338        )
2339        logEvent('tengu_bridge_repl_poll_error', {
2340          status: httpStatus,
2341          consecutiveErrors,
2342          elapsedMs: elapsed,
2343        } as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS)
2344  
2345        // Only transition to 'reconnecting' on the first error — stay
2346        // there until a successful poll (avoid flickering the UI state).
2347        if (consecutiveErrors === 1) {
2348          onStateChange?.('reconnecting', errMsg)
2349        }
2350  
2351        // Give up after continuous failures
2352        if (elapsed >= POLL_ERROR_GIVE_UP_MS) {
2353          logForDebugging(
2354            `[bridge:repl] Poll failures exceeded ${POLL_ERROR_GIVE_UP_MS / 1000}s (${consecutiveErrors} errors), giving up`,
2355          )
2356          logForDiagnosticsNoPII('info', 'bridge_repl_poll_give_up')
2357          logEvent('tengu_bridge_repl_poll_give_up', {
2358            consecutiveErrors,
2359            elapsedMs: elapsed,
2360            lastStatus: httpStatus,
2361          } as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS)
2362          onStateChange?.('failed', 'connection to server lost')
2363          break
2364        }
2365  
2366        // Exponential backoff: 2s → 4s → 8s → 16s → 32s → 60s (cap)
2367        const backoff = Math.min(
2368          POLL_ERROR_INITIAL_DELAY_MS * 2 ** (consecutiveErrors - 1),
2369          POLL_ERROR_MAX_DELAY_MS,
2370        )
2371        // The poll_due heartbeat-loop exit leaves a healthy lease exposed to
2372        // this backoff path. Heartbeat before each sleep so /poll outages
2373        // (the VerifyEnvironmentSecretAuth DB path heartbeat was introduced to
2374        // avoid) don't kill the 300s lease TTL.
2375        if (getPollIntervalConfig().non_exclusive_heartbeat_interval_ms > 0) {
2376          const info = getHeartbeatInfo?.()
2377          if (info) {
2378            try {
2379              await api.heartbeatWork(
2380                info.environmentId,
2381                info.workId,
2382                info.sessionToken,
2383              )
2384            } catch {
2385              // Best-effort — if heartbeat also fails the lease dies, same as
2386              // pre-poll_due behavior (where the only heartbeat-loop exits were
2387              // ones where the lease was already dying).
2388            }
2389          }
2390        }
2391        await sleep(backoff, signal)
2392      }
2393    }
2394  
2395    logForDebugging(
2396      `[bridge:repl] Work poll loop ended (aborted=${signal.aborted}) env=${getCredentials().environmentId}`,
2397    )
2398  }
2399  
2400  // Exported for testing only
2401  export {
2402    startWorkPollLoop as _startWorkPollLoopForTesting,
2403    POLL_ERROR_INITIAL_DELAY_MS as _POLL_ERROR_INITIAL_DELAY_MS_ForTesting,
2404    POLL_ERROR_MAX_DELAY_MS as _POLL_ERROR_MAX_DELAY_MS_ForTesting,
2405    POLL_ERROR_GIVE_UP_MS as _POLL_ERROR_GIVE_UP_MS_ForTesting,
2406  }