/ src / cli / remoteIO.ts
remoteIO.ts
  1  import type { StdoutMessage } from 'src/entrypoints/sdk/controlTypes.js'
  2  import { PassThrough } from 'stream'
  3  import { URL } from 'url'
  4  import { getSessionId } from '../bootstrap/state.js'
  5  import { getPollIntervalConfig } from '../bridge/pollConfig.js'
  6  import { registerCleanup } from '../utils/cleanupRegistry.js'
  7  import { setCommandLifecycleListener } from '../utils/commandLifecycle.js'
  8  import { isDebugMode, logForDebugging } from '../utils/debug.js'
  9  import { logForDiagnosticsNoPII } from '../utils/diagLogs.js'
 10  import { isEnvTruthy } from '../utils/envUtils.js'
 11  import { errorMessage } from '../utils/errors.js'
 12  import { gracefulShutdown } from '../utils/gracefulShutdown.js'
 13  import { logError } from '../utils/log.js'
 14  import { writeToStdout } from '../utils/process.js'
 15  import { getSessionIngressAuthToken } from '../utils/sessionIngressAuth.js'
 16  import {
 17    setSessionMetadataChangedListener,
 18    setSessionStateChangedListener,
 19  } from '../utils/sessionState.js'
 20  import {
 21    setInternalEventReader,
 22    setInternalEventWriter,
 23  } from '../utils/sessionStorage.js'
 24  import { ndjsonSafeStringify } from './ndjsonSafeStringify.js'
 25  import { StructuredIO } from './structuredIO.js'
 26  import { CCRClient, CCRInitError } from './transports/ccrClient.js'
 27  import { SSETransport } from './transports/SSETransport.js'
 28  import type { Transport } from './transports/Transport.js'
 29  import { getTransportForUrl } from './transports/transportUtils.js'
 30  
 31  /**
 32   * Bidirectional streaming for SDK mode with session tracking
 33   * Supports WebSocket transport
 34   */
 35  export class RemoteIO extends StructuredIO {
 36    private url: URL
 37    private transport: Transport
 38    private inputStream: PassThrough
 39    private readonly isBridge: boolean = false
 40    private readonly isDebug: boolean = false
 41    private ccrClient: CCRClient | null = null
 42    private keepAliveTimer: ReturnType<typeof setInterval> | null = null
 43  
 44    constructor(
 45      streamUrl: string,
 46      initialPrompt?: AsyncIterable<string>,
 47      replayUserMessages?: boolean,
 48    ) {
 49      const inputStream = new PassThrough({ encoding: 'utf8' })
 50      super(inputStream, replayUserMessages)
 51      this.inputStream = inputStream
 52      this.url = new URL(streamUrl)
 53  
 54      // Prepare headers with session token if available
 55      const headers: Record<string, string> = {}
 56      const sessionToken = getSessionIngressAuthToken()
 57      if (sessionToken) {
 58        headers['Authorization'] = `Bearer ${sessionToken}`
 59      } else {
 60        logForDebugging('[remote-io] No session ingress token available', {
 61          level: 'error',
 62        })
 63      }
 64  
 65      // Add environment runner version if available (set by Environment Manager)
 66      const erVersion = process.env.CLAUDE_CODE_ENVIRONMENT_RUNNER_VERSION
 67      if (erVersion) {
 68        headers['x-environment-runner-version'] = erVersion
 69      }
 70  
 71      // Provide a callback that re-reads the session token dynamically.
 72      // When the parent process refreshes the token (via token file or env var),
 73      // the transport can pick it up on reconnection.
 74      const refreshHeaders = (): Record<string, string> => {
 75        const h: Record<string, string> = {}
 76        const freshToken = getSessionIngressAuthToken()
 77        if (freshToken) {
 78          h['Authorization'] = `Bearer ${freshToken}`
 79        }
 80        const freshErVersion = process.env.CLAUDE_CODE_ENVIRONMENT_RUNNER_VERSION
 81        if (freshErVersion) {
 82          h['x-environment-runner-version'] = freshErVersion
 83        }
 84        return h
 85      }
 86  
 87      // Get appropriate transport based on URL protocol
 88      this.transport = getTransportForUrl(
 89        this.url,
 90        headers,
 91        getSessionId(),
 92        refreshHeaders,
 93      )
 94  
 95      // Set up data callback
 96      this.isBridge = process.env.CLAUDE_CODE_ENVIRONMENT_KIND === 'bridge'
 97      this.isDebug = isDebugMode()
 98      this.transport.setOnData((data: string) => {
 99        this.inputStream.write(data)
100        if (this.isBridge && this.isDebug) {
101          writeToStdout(data.endsWith('\n') ? data : data + '\n')
102        }
103      })
104  
105      // Set up close callback to handle connection failures
106      this.transport.setOnClose(() => {
107        // End the input stream to trigger graceful shutdown
108        this.inputStream.end()
109      })
110  
111      // Initialize CCR v2 client (heartbeats, epoch, state reporting, event writes).
112      // The CCRClient constructor wires the SSE received-ack handler
113      // synchronously, so new CCRClient() MUST run before transport.connect() —
114      // otherwise early SSE frames hit an unwired onEventCallback and their
115      // 'received' delivery acks are silently dropped.
116      if (isEnvTruthy(process.env.CLAUDE_CODE_USE_CCR_V2)) {
117        // CCR v2 is SSE+POST by definition. getTransportForUrl returns
118        // SSETransport under the same env var, but the two checks live in
119        // different files — assert the invariant so a future decoupling
120        // fails loudly here instead of confusingly inside CCRClient.
121        if (!(this.transport instanceof SSETransport)) {
122          throw new Error(
123            'CCR v2 requires SSETransport; check getTransportForUrl',
124          )
125        }
126        this.ccrClient = new CCRClient(this.transport, this.url)
127        const init = this.ccrClient.initialize()
128        this.restoredWorkerState = init.catch(() => null)
129        init.catch((error: unknown) => {
130          logForDiagnosticsNoPII('error', 'cli_worker_lifecycle_init_failed', {
131            reason: error instanceof CCRInitError ? error.reason : 'unknown',
132          })
133          logError(
134            new Error(`CCRClient initialization failed: ${errorMessage(error)}`),
135          )
136          void gracefulShutdown(1, 'other')
137        })
138        registerCleanup(async () => this.ccrClient?.close())
139  
140        // Register internal event writer for transcript persistence.
141        // When set, sessionStorage writes transcript messages as CCR v2
142        // internal events instead of v1 Session Ingress.
143        setInternalEventWriter((eventType, payload, options) =>
144          this.ccrClient!.writeInternalEvent(eventType, payload, options),
145        )
146  
147        // Register internal event readers for session resume.
148        // When set, hydrateFromCCRv2InternalEvents() can fetch foreground
149        // and subagent internal events to reconstruct conversation state.
150        setInternalEventReader(
151          () => this.ccrClient!.readInternalEvents(),
152          () => this.ccrClient!.readSubagentInternalEvents(),
153        )
154  
155        const LIFECYCLE_TO_DELIVERY = {
156          started: 'processing',
157          completed: 'processed',
158        } as const
159        setCommandLifecycleListener((uuid, state) => {
160          this.ccrClient?.reportDelivery(uuid, LIFECYCLE_TO_DELIVERY[state])
161        })
162        setSessionStateChangedListener((state, details) => {
163          this.ccrClient?.reportState(state, details)
164        })
165        setSessionMetadataChangedListener(metadata => {
166          this.ccrClient?.reportMetadata(metadata)
167        })
168      }
169  
170      // Start connection only after all callbacks are wired (setOnData above,
171      // setOnEvent inside new CCRClient() when CCR v2 is enabled).
172      void this.transport.connect()
173  
174      // Push a silent keep_alive frame on a fixed interval so upstream
175      // proxies and the session-ingress layer don't GC an otherwise-idle
176      // remote control session. The keep_alive type is filtered before
177      // reaching any client UI (Query.ts drops it; structuredIO.ts drops it;
178      // web/iOS/Android never see it in their message loop). Interval comes
179      // from GrowthBook (tengu_bridge_poll_interval_config
180      // session_keepalive_interval_v2_ms, default 120s); 0 = disabled.
181      // Bridge-only: fixes Envoy idle timeout on bridge-topology sessions
182      // (#21931). byoc workers ran without this before #21931 and do not
183      // need it — different network path.
184      const keepAliveIntervalMs =
185        getPollIntervalConfig().session_keepalive_interval_v2_ms
186      if (this.isBridge && keepAliveIntervalMs > 0) {
187        this.keepAliveTimer = setInterval(() => {
188          logForDebugging('[remote-io] keep_alive sent')
189          void this.write({ type: 'keep_alive' }).catch(err => {
190            logForDebugging(
191              `[remote-io] keep_alive write failed: ${errorMessage(err)}`,
192            )
193          })
194        }, keepAliveIntervalMs)
195        this.keepAliveTimer.unref?.()
196      }
197  
198      // Register for graceful shutdown cleanup
199      registerCleanup(async () => this.close())
200  
201      // If initial prompt is provided, send it through the input stream
202      if (initialPrompt) {
203        // Convert the initial prompt to the input stream format.
204        // Chunks from stdin may already contain trailing newlines, so strip
205        // them before appending our own to avoid double-newline issues that
206        // cause structuredIO to parse empty lines. String() handles both
207        // string chunks and Buffer objects from process.stdin.
208        const stream = this.inputStream
209        void (async () => {
210          for await (const chunk of initialPrompt) {
211            stream.write(String(chunk).replace(/\n$/, '') + '\n')
212          }
213        })()
214      }
215    }
216  
217    override flushInternalEvents(): Promise<void> {
218      return this.ccrClient?.flushInternalEvents() ?? Promise.resolve()
219    }
220  
221    override get internalEventsPending(): number {
222      return this.ccrClient?.internalEventsPending ?? 0
223    }
224  
225    /**
226     * Send output to the transport.
227     * In bridge mode, control_request messages are always echoed to stdout so the
228     * bridge parent can detect permission requests. Other messages are echoed only
229     * in debug mode.
230     */
231    async write(message: StdoutMessage): Promise<void> {
232      if (this.ccrClient) {
233        await this.ccrClient.writeEvent(message)
234      } else {
235        await this.transport.write(message)
236      }
237      if (this.isBridge) {
238        if (message.type === 'control_request' || this.isDebug) {
239          writeToStdout(ndjsonSafeStringify(message) + '\n')
240        }
241      }
242    }
243  
244    /**
245     * Clean up connections gracefully
246     */
247    close(): void {
248      if (this.keepAliveTimer) {
249        clearInterval(this.keepAliveTimer)
250        this.keepAliveTimer = null
251      }
252      this.transport.close()
253      this.inputStream.end()
254    }
255  }