/ bridge / sessionRunner.ts
sessionRunner.ts
  1  import { type ChildProcess, spawn } from 'child_process'
  2  import { createWriteStream, type WriteStream } from 'fs'
  3  import { tmpdir } from 'os'
  4  import { dirname, join } from 'path'
  5  import { createInterface } from 'readline'
  6  import { jsonParse, jsonStringify } from '../utils/slowOperations.js'
  7  import { debugTruncate } from './debugUtils.js'
  8  import type {
  9    SessionActivity,
 10    SessionDoneStatus,
 11    SessionHandle,
 12    SessionSpawner,
 13    SessionSpawnOpts,
 14  } from './types.js'
 15  
 16  const MAX_ACTIVITIES = 10
 17  const MAX_STDERR_LINES = 10
 18  
 19  /**
 20   * Sanitize a session ID for use in file names.
 21   * Strips any characters that could cause path traversal (e.g. `../`, `/`)
 22   * or other filesystem issues, replacing them with underscores.
 23   */
 24  export function safeFilenameId(id: string): string {
 25    return id.replace(/[^a-zA-Z0-9_-]/g, '_')
 26  }
 27  
 28  /**
 29   * A control_request emitted by the child CLI when it needs permission to
 30   * execute a **specific** tool invocation (not a general capability check).
 31   * The bridge forwards this to the server so the user can approve/deny.
 32   */
 33  export type PermissionRequest = {
 34    type: 'control_request'
 35    request_id: string
 36    request: {
 37      /** Per-invocation permission check — "may I run this tool with these inputs?" */
 38      subtype: 'can_use_tool'
 39      tool_name: string
 40      input: Record<string, unknown>
 41      tool_use_id: string
 42    }
 43  }
 44  
 45  type SessionSpawnerDeps = {
 46    execPath: string
 47    /**
 48     * Arguments that must precede the CLI flags when spawning. Empty for
 49     * compiled binaries (where execPath is the claude binary itself); contains
 50     * the script path (process.argv[1]) for npm installs where execPath is the
 51     * node runtime. Without this, node sees --sdk-url as a node option and
 52     * exits with "bad option: --sdk-url" (see anthropics/claude-code#28334).
 53     */
 54    scriptArgs: string[]
 55    env: NodeJS.ProcessEnv
 56    verbose: boolean
 57    sandbox: boolean
 58    debugFile?: string
 59    permissionMode?: string
 60    onDebug: (msg: string) => void
 61    onActivity?: (sessionId: string, activity: SessionActivity) => void
 62    onPermissionRequest?: (
 63      sessionId: string,
 64      request: PermissionRequest,
 65      accessToken: string,
 66    ) => void
 67  }
 68  
 69  /** Map tool names to human-readable verbs for the status display. */
 70  const TOOL_VERBS: Record<string, string> = {
 71    Read: 'Reading',
 72    Write: 'Writing',
 73    Edit: 'Editing',
 74    MultiEdit: 'Editing',
 75    Bash: 'Running',
 76    Glob: 'Searching',
 77    Grep: 'Searching',
 78    WebFetch: 'Fetching',
 79    WebSearch: 'Searching',
 80    Task: 'Running task',
 81    FileReadTool: 'Reading',
 82    FileWriteTool: 'Writing',
 83    FileEditTool: 'Editing',
 84    GlobTool: 'Searching',
 85    GrepTool: 'Searching',
 86    BashTool: 'Running',
 87    NotebookEditTool: 'Editing notebook',
 88    LSP: 'LSP',
 89  }
 90  
 91  function toolSummary(name: string, input: Record<string, unknown>): string {
 92    const verb = TOOL_VERBS[name] ?? name
 93    const target =
 94      (input.file_path as string) ??
 95      (input.filePath as string) ??
 96      (input.pattern as string) ??
 97      (input.command as string | undefined)?.slice(0, 60) ??
 98      (input.url as string) ??
 99      (input.query as string) ??
100      ''
101    if (target) {
102      return `${verb} ${target}`
103    }
104    return verb
105  }
106  
107  function extractActivities(
108    line: string,
109    sessionId: string,
110    onDebug: (msg: string) => void,
111  ): SessionActivity[] {
112    let parsed: unknown
113    try {
114      parsed = jsonParse(line)
115    } catch {
116      return []
117    }
118  
119    if (!parsed || typeof parsed !== 'object') {
120      return []
121    }
122  
123    const msg = parsed as Record<string, unknown>
124    const activities: SessionActivity[] = []
125    const now = Date.now()
126  
127    switch (msg.type) {
128      case 'assistant': {
129        const message = msg.message as Record<string, unknown> | undefined
130        if (!message) break
131        const content = message.content
132        if (!Array.isArray(content)) break
133  
134        for (const block of content) {
135          if (!block || typeof block !== 'object') continue
136          const b = block as Record<string, unknown>
137  
138          if (b.type === 'tool_use') {
139            const name = (b.name as string) ?? 'Tool'
140            const input = (b.input as Record<string, unknown>) ?? {}
141            const summary = toolSummary(name, input)
142            activities.push({
143              type: 'tool_start',
144              summary,
145              timestamp: now,
146            })
147            onDebug(
148              `[bridge:activity] sessionId=${sessionId} tool_use name=${name} ${inputPreview(input)}`,
149            )
150          } else if (b.type === 'text') {
151            const text = (b.text as string) ?? ''
152            if (text.length > 0) {
153              activities.push({
154                type: 'text',
155                summary: text.slice(0, 80),
156                timestamp: now,
157              })
158              onDebug(
159                `[bridge:activity] sessionId=${sessionId} text "${text.slice(0, 100)}"`,
160              )
161            }
162          }
163        }
164        break
165      }
166      case 'result': {
167        const subtype = msg.subtype as string | undefined
168        if (subtype === 'success') {
169          activities.push({
170            type: 'result',
171            summary: 'Session completed',
172            timestamp: now,
173          })
174          onDebug(
175            `[bridge:activity] sessionId=${sessionId} result subtype=success`,
176          )
177        } else if (subtype) {
178          const errors = msg.errors as string[] | undefined
179          const errorSummary = errors?.[0] ?? `Error: ${subtype}`
180          activities.push({
181            type: 'error',
182            summary: errorSummary,
183            timestamp: now,
184          })
185          onDebug(
186            `[bridge:activity] sessionId=${sessionId} result subtype=${subtype} error="${errorSummary}"`,
187          )
188        } else {
189          onDebug(
190            `[bridge:activity] sessionId=${sessionId} result subtype=undefined`,
191          )
192        }
193        break
194      }
195      default:
196        break
197    }
198  
199    return activities
200  }
201  
202  /**
203   * Extract plain text from a replayed SDKUserMessage NDJSON line. Returns the
204   * trimmed text if this looks like a real human-authored message, otherwise
205   * undefined so the caller keeps waiting for the first real message.
206   */
207  function extractUserMessageText(
208    msg: Record<string, unknown>,
209  ): string | undefined {
210    // Skip tool-result user messages (wrapped subagent results) and synthetic
211    // caveat messages — neither is human-authored.
212    if (msg.parent_tool_use_id != null || msg.isSynthetic || msg.isReplay)
213      return undefined
214  
215    const message = msg.message as Record<string, unknown> | undefined
216    const content = message?.content
217    let text: string | undefined
218    if (typeof content === 'string') {
219      text = content
220    } else if (Array.isArray(content)) {
221      for (const block of content) {
222        if (
223          block &&
224          typeof block === 'object' &&
225          (block as Record<string, unknown>).type === 'text'
226        ) {
227          text = (block as Record<string, unknown>).text as string | undefined
228          break
229        }
230      }
231    }
232    text = text?.trim()
233    return text ? text : undefined
234  }
235  
236  /** Build a short preview of tool input for debug logging. */
237  function inputPreview(input: Record<string, unknown>): string {
238    const parts: string[] = []
239    for (const [key, val] of Object.entries(input)) {
240      if (typeof val === 'string') {
241        parts.push(`${key}="${val.slice(0, 100)}"`)
242      }
243      if (parts.length >= 3) break
244    }
245    return parts.join(' ')
246  }
247  
248  export function createSessionSpawner(deps: SessionSpawnerDeps): SessionSpawner {
249    return {
250      spawn(opts: SessionSpawnOpts, dir: string): SessionHandle {
251        // Debug file resolution:
252        // 1. If deps.debugFile is provided, use it with session ID suffix for uniqueness
253        // 2. If verbose or ant build, auto-generate a temp file path
254        // 3. Otherwise, no debug file
255        const safeId = safeFilenameId(opts.sessionId)
256        let debugFile: string | undefined
257        if (deps.debugFile) {
258          const ext = deps.debugFile.lastIndexOf('.')
259          if (ext > 0) {
260            debugFile = `${deps.debugFile.slice(0, ext)}-${safeId}${deps.debugFile.slice(ext)}`
261          } else {
262            debugFile = `${deps.debugFile}-${safeId}`
263          }
264        } else if (deps.verbose || process.env.USER_TYPE === 'ant') {
265          debugFile = join(tmpdir(), 'claude', `bridge-session-${safeId}.log`)
266        }
267  
268        // Transcript file: write raw NDJSON lines for post-hoc analysis.
269        // Placed alongside the debug file when one is configured.
270        let transcriptStream: WriteStream | null = null
271        let transcriptPath: string | undefined
272        if (deps.debugFile) {
273          transcriptPath = join(
274            dirname(deps.debugFile),
275            `bridge-transcript-${safeId}.jsonl`,
276          )
277          transcriptStream = createWriteStream(transcriptPath, { flags: 'a' })
278          transcriptStream.on('error', err => {
279            deps.onDebug(
280              `[bridge:session] Transcript write error: ${err.message}`,
281            )
282            transcriptStream = null
283          })
284          deps.onDebug(`[bridge:session] Transcript log: ${transcriptPath}`)
285        }
286  
287        const args = [
288          ...deps.scriptArgs,
289          '--print',
290          '--sdk-url',
291          opts.sdkUrl,
292          '--session-id',
293          opts.sessionId,
294          '--input-format',
295          'stream-json',
296          '--output-format',
297          'stream-json',
298          '--replay-user-messages',
299          ...(deps.verbose ? ['--verbose'] : []),
300          ...(debugFile ? ['--debug-file', debugFile] : []),
301          ...(deps.permissionMode
302            ? ['--permission-mode', deps.permissionMode]
303            : []),
304        ]
305  
306        const env: NodeJS.ProcessEnv = {
307          ...deps.env,
308          // Strip the bridge's OAuth token so the child CC process uses
309          // the session access token for inference instead.
310          CLAUDE_CODE_OAUTH_TOKEN: undefined,
311          CLAUDE_CODE_ENVIRONMENT_KIND: 'bridge',
312          ...(deps.sandbox && { CLAUDE_CODE_FORCE_SANDBOX: '1' }),
313          CLAUDE_CODE_SESSION_ACCESS_TOKEN: opts.accessToken,
314          // v1: HybridTransport (WS reads + POST writes) to Session-Ingress.
315          // Harmless in v2 mode — transportUtils checks CLAUDE_CODE_USE_CCR_V2 first.
316          CLAUDE_CODE_POST_FOR_SESSION_INGRESS_V2: '1',
317          // v2: SSETransport + CCRClient to CCR's /v1/code/sessions/* endpoints.
318          // Same env vars environment-manager sets in the container path.
319          ...(opts.useCcrV2 && {
320            CLAUDE_CODE_USE_CCR_V2: '1',
321            CLAUDE_CODE_WORKER_EPOCH: String(opts.workerEpoch),
322          }),
323        }
324  
325        deps.onDebug(
326          `[bridge:session] Spawning sessionId=${opts.sessionId} sdkUrl=${opts.sdkUrl} accessToken=${opts.accessToken ? 'present' : 'MISSING'}`,
327        )
328        deps.onDebug(`[bridge:session] Child args: ${args.join(' ')}`)
329        if (debugFile) {
330          deps.onDebug(`[bridge:session] Debug log: ${debugFile}`)
331        }
332  
333        // Pipe all three streams: stdin for control, stdout for NDJSON parsing,
334        // stderr for error capture and diagnostics.
335        const child: ChildProcess = spawn(deps.execPath, args, {
336          cwd: dir,
337          stdio: ['pipe', 'pipe', 'pipe'],
338          env,
339          windowsHide: true,
340        })
341  
342        deps.onDebug(
343          `[bridge:session] sessionId=${opts.sessionId} pid=${child.pid}`,
344        )
345  
346        const activities: SessionActivity[] = []
347        let currentActivity: SessionActivity | null = null
348        const lastStderr: string[] = []
349        let sigkillSent = false
350        let firstUserMessageSeen = false
351  
352        // Buffer stderr for error diagnostics
353        if (child.stderr) {
354          const stderrRl = createInterface({ input: child.stderr })
355          stderrRl.on('line', line => {
356            // Forward stderr to bridge's stderr in verbose mode
357            if (deps.verbose) {
358              process.stderr.write(line + '\n')
359            }
360            // Ring buffer of last N lines
361            if (lastStderr.length >= MAX_STDERR_LINES) {
362              lastStderr.shift()
363            }
364            lastStderr.push(line)
365          })
366        }
367  
368        // Parse NDJSON from child stdout
369        if (child.stdout) {
370          const rl = createInterface({ input: child.stdout })
371          rl.on('line', line => {
372            // Write raw NDJSON to transcript file
373            if (transcriptStream) {
374              transcriptStream.write(line + '\n')
375            }
376  
377            // Log all messages flowing from the child CLI to the bridge
378            deps.onDebug(
379              `[bridge:ws] sessionId=${opts.sessionId} <<< ${debugTruncate(line)}`,
380            )
381  
382            // In verbose mode, forward raw output to stderr
383            if (deps.verbose) {
384              process.stderr.write(line + '\n')
385            }
386  
387            const extracted = extractActivities(
388              line,
389              opts.sessionId,
390              deps.onDebug,
391            )
392            for (const activity of extracted) {
393              // Maintain ring buffer
394              if (activities.length >= MAX_ACTIVITIES) {
395                activities.shift()
396              }
397              activities.push(activity)
398              currentActivity = activity
399  
400              deps.onActivity?.(opts.sessionId, activity)
401            }
402  
403            // Detect control_request and replayed user messages.
404            // extractActivities parses the same line but swallows parse errors
405            // and skips 'user' type — re-parse here is cheap (NDJSON lines are
406            // small) and keeps each path self-contained.
407            {
408              let parsed: unknown
409              try {
410                parsed = jsonParse(line)
411              } catch {
412                // Non-JSON line, skip detection
413              }
414              if (parsed && typeof parsed === 'object') {
415                const msg = parsed as Record<string, unknown>
416  
417                if (msg.type === 'control_request') {
418                  const request = msg.request as
419                    | Record<string, unknown>
420                    | undefined
421                  if (
422                    request?.subtype === 'can_use_tool' &&
423                    deps.onPermissionRequest
424                  ) {
425                    deps.onPermissionRequest(
426                      opts.sessionId,
427                      parsed as PermissionRequest,
428                      opts.accessToken,
429                    )
430                  }
431                  // interrupt is turn-level; the child handles it internally (print.ts)
432                } else if (
433                  msg.type === 'user' &&
434                  !firstUserMessageSeen &&
435                  opts.onFirstUserMessage
436                ) {
437                  const text = extractUserMessageText(msg)
438                  if (text) {
439                    firstUserMessageSeen = true
440                    opts.onFirstUserMessage(text)
441                  }
442                }
443              }
444            }
445          })
446        }
447  
448        const done = new Promise<SessionDoneStatus>(resolve => {
449          child.on('close', (code, signal) => {
450            // Close transcript stream on exit
451            if (transcriptStream) {
452              transcriptStream.end()
453              transcriptStream = null
454            }
455  
456            if (signal === 'SIGTERM' || signal === 'SIGINT') {
457              deps.onDebug(
458                `[bridge:session] sessionId=${opts.sessionId} interrupted signal=${signal} pid=${child.pid}`,
459              )
460              resolve('interrupted')
461            } else if (code === 0) {
462              deps.onDebug(
463                `[bridge:session] sessionId=${opts.sessionId} completed exit_code=0 pid=${child.pid}`,
464              )
465              resolve('completed')
466            } else {
467              deps.onDebug(
468                `[bridge:session] sessionId=${opts.sessionId} failed exit_code=${code} pid=${child.pid}`,
469              )
470              resolve('failed')
471            }
472          })
473  
474          child.on('error', err => {
475            deps.onDebug(
476              `[bridge:session] sessionId=${opts.sessionId} spawn error: ${err.message}`,
477            )
478            resolve('failed')
479          })
480        })
481  
482        const handle: SessionHandle = {
483          sessionId: opts.sessionId,
484          done,
485          activities,
486          accessToken: opts.accessToken,
487          lastStderr,
488          get currentActivity(): SessionActivity | null {
489            return currentActivity
490          },
491          kill(): void {
492            if (!child.killed) {
493              deps.onDebug(
494                `[bridge:session] Sending SIGTERM to sessionId=${opts.sessionId} pid=${child.pid}`,
495              )
496              // On Windows, child.kill('SIGTERM') throws; use default signal.
497              if (process.platform === 'win32') {
498                child.kill()
499              } else {
500                child.kill('SIGTERM')
501              }
502            }
503          },
504          forceKill(): void {
505            // Use separate flag because child.killed is set when kill() is called,
506            // not when the process exits. We need to send SIGKILL even after SIGTERM.
507            if (!sigkillSent && child.pid) {
508              sigkillSent = true
509              deps.onDebug(
510                `[bridge:session] Sending SIGKILL to sessionId=${opts.sessionId} pid=${child.pid}`,
511              )
512              if (process.platform === 'win32') {
513                child.kill()
514              } else {
515                child.kill('SIGKILL')
516              }
517            }
518          },
519          writeStdin(data: string): void {
520            if (child.stdin && !child.stdin.destroyed) {
521              deps.onDebug(
522                `[bridge:ws] sessionId=${opts.sessionId} >>> ${debugTruncate(data)}`,
523              )
524              child.stdin.write(data)
525            }
526          },
527          updateAccessToken(token: string): void {
528            handle.accessToken = token
529            // Send the fresh token to the child process via stdin. The child's
530            // StructuredIO handles update_environment_variables messages by
531            // setting process.env directly, so getSessionIngressAuthToken()
532            // picks up the new token on the next refreshHeaders call.
533            handle.writeStdin(
534              jsonStringify({
535                type: 'update_environment_variables',
536                variables: { CLAUDE_CODE_SESSION_ACCESS_TOKEN: token },
537              }) + '\n',
538            )
539            deps.onDebug(
540              `[bridge:session] Sent token refresh via stdin for sessionId=${opts.sessionId}`,
541            )
542          },
543        }
544  
545        return handle
546      },
547    }
548  }
549  
550  export { extractActivities as _extractActivitiesForTesting }