/ utils / task / diskOutput.ts
diskOutput.ts
  1  import { constants as fsConstants } from 'fs'
  2  import {
  3    type FileHandle,
  4    mkdir,
  5    open,
  6    stat,
  7    symlink,
  8    unlink,
  9  } from 'fs/promises'
 10  import { join } from 'path'
 11  import { getSessionId } from '../../bootstrap/state.js'
 12  import { getErrnoCode } from '../errors.js'
 13  import { readFileRange, tailFile } from '../fsOperations.js'
 14  import { logError } from '../log.js'
 15  import { getProjectTempDir } from '../permissions/filesystem.js'
 16  
 17  // SECURITY: O_NOFOLLOW prevents following symlinks when opening task output files.
 18  // Without this, an attacker in the sandbox could create symlinks in the tasks directory
 19  // pointing to arbitrary files, causing Claude Code on the host to write to those files.
 20  // O_NOFOLLOW is not available on Windows, but the sandbox attack vector is Unix-only.
 21  const O_NOFOLLOW = fsConstants.O_NOFOLLOW ?? 0
 22  
 23  const DEFAULT_MAX_READ_BYTES = 8 * 1024 * 1024 // 8MB
 24  
 25  /**
 26   * Disk cap for task output files. In file mode (bash), a watchdog polls
 27   * file size and kills the process. In pipe mode (hooks), DiskTaskOutput
 28   * drops chunks past this limit. Shared so both caps stay in sync.
 29   */
 30  export const MAX_TASK_OUTPUT_BYTES = 5 * 1024 * 1024 * 1024
 31  export const MAX_TASK_OUTPUT_BYTES_DISPLAY = '5GB'
 32  
 33  /**
 34   * Get the task output directory for this session.
 35   * Uses project temp directory so reads are auto-allowed by checkReadableInternalPath.
 36   *
 37   * The session ID is included so concurrent sessions in the same project don't
 38   * clobber each other's output files. Startup cleanup in one session previously
 39   * unlinked in-flight output files from other sessions — the writing process's fd
 40   * keeps the inode alive but reads via path fail ENOENT, and getStdout() returned
 41   * empty string (inc-4586 / boris-20260309-060423).
 42   *
 43   * The session ID is captured at FIRST CALL, not re-read on every invocation.
 44   * /clear calls regenerateSessionId(), which would otherwise cause
 45   * ensureOutputDir() to create a new-session path while existing TaskOutput
 46   * instances still hold old-session paths — open() would ENOENT. Background
 47   * bash tasks surviving /clear need their output files to stay reachable.
 48   */
 49  let _taskOutputDir: string | undefined
 50  export function getTaskOutputDir(): string {
 51    if (_taskOutputDir === undefined) {
 52      _taskOutputDir = join(getProjectTempDir(), getSessionId(), 'tasks')
 53    }
 54    return _taskOutputDir
 55  }
 56  
 57  /** Test helper — clears the memoized dir. */
 58  export function _resetTaskOutputDirForTest(): void {
 59    _taskOutputDir = undefined
 60  }
 61  
 62  /**
 63   * Ensure the task output directory exists
 64   */
 65  async function ensureOutputDir(): Promise<void> {
 66    await mkdir(getTaskOutputDir(), { recursive: true })
 67  }
 68  
 69  /**
 70   * Get the output file path for a task
 71   */
 72  export function getTaskOutputPath(taskId: string): string {
 73    return join(getTaskOutputDir(), `${taskId}.output`)
 74  }
 75  
 76  // Tracks fire-and-forget promises (initTaskOutput, initTaskOutputAsSymlink,
 77  // evictTaskOutput, #drain) so tests can drain before teardown. Prevents the
 78  // async-ENOENT-after-teardown flake class (#24957, #25065): a voided async
 79  // resumes after preload's afterEach nuked the temp dir → ENOENT → unhandled
 80  // rejection → flaky test failure. allSettled so a rejection doesn't short-
 81  // circuit the drain and leave other ops racing the rmSync.
 82  const _pendingOps = new Set<Promise<unknown>>()
 83  function track<T>(p: Promise<T>): Promise<T> {
 84    _pendingOps.add(p)
 85    void p.finally(() => _pendingOps.delete(p)).catch(() => {})
 86    return p
 87  }
 88  
 89  /**
 90   * Encapsulates async disk writes for a single task's output.
 91   *
 92   * Uses a flat array as a write queue processed by a single drain loop,
 93   * so each chunk can be GC'd immediately after its write completes.
 94   * This avoids the memory retention problem of chained .then() closures
 95   * where every reaction captures its data until the whole chain resolves.
 96   */
 97  export class DiskTaskOutput {
 98    #path: string
 99    #fileHandle: FileHandle | null = null
100    #queue: string[] = []
101    #bytesWritten = 0
102    #capped = false
103    #flushPromise: Promise<void> | null = null
104    #flushResolve: (() => void) | null = null
105  
106    constructor(taskId: string) {
107      this.#path = getTaskOutputPath(taskId)
108    }
109  
110    append(content: string): void {
111      if (this.#capped) {
112        return
113      }
114      // content.length (UTF-16 code units) undercounts UTF-8 bytes by at most ~3×.
115      // Acceptable for a coarse disk-fill guard — avoids re-scanning every chunk.
116      this.#bytesWritten += content.length
117      if (this.#bytesWritten > MAX_TASK_OUTPUT_BYTES) {
118        this.#capped = true
119        this.#queue.push(
120          `\n[output truncated: exceeded ${MAX_TASK_OUTPUT_BYTES_DISPLAY} disk cap]\n`,
121        )
122      } else {
123        this.#queue.push(content)
124      }
125      if (!this.#flushPromise) {
126        this.#flushPromise = new Promise<void>(resolve => {
127          this.#flushResolve = resolve
128        })
129        void track(this.#drain())
130      }
131    }
132  
133    flush(): Promise<void> {
134      return this.#flushPromise ?? Promise.resolve()
135    }
136  
137    cancel(): void {
138      this.#queue.length = 0
139    }
140  
141    async #drainAllChunks(): Promise<void> {
142      while (true) {
143        try {
144          if (!this.#fileHandle) {
145            await ensureOutputDir()
146            this.#fileHandle = await open(
147              this.#path,
148              process.platform === 'win32'
149                ? 'a'
150                : fsConstants.O_WRONLY |
151                    fsConstants.O_APPEND |
152                    fsConstants.O_CREAT |
153                    O_NOFOLLOW,
154            )
155          }
156          while (true) {
157            await this.#writeAllChunks()
158            if (this.#queue.length === 0) {
159              break
160            }
161          }
162        } finally {
163          if (this.#fileHandle) {
164            const fileHandle = this.#fileHandle
165            this.#fileHandle = null
166            await fileHandle.close()
167          }
168        }
169        // you could have another .append() while we're waiting for the file to close, so we check the queue again before fully exiting
170        if (this.#queue.length) {
171          continue
172        }
173  
174        break
175      }
176    }
177  
178    #writeAllChunks(): Promise<void> {
179      // This code is extremely precise.
180      // You **must not** add an await here!! That will cause memory to balloon as the queue grows.
181      // It's okay to add an `await` to the caller of this method (e.g. #drainAllChunks) because that won't cause Buffer[] to be kept alive in memory.
182      return this.#fileHandle!.appendFile(
183        // This variable needs to get GC'd ASAP.
184        this.#queueToBuffers(),
185      )
186    }
187  
188    /** Keep this in a separate method so that GC doesn't keep it alive for any longer than it should. */
189    #queueToBuffers(): Buffer {
190      // Use .splice to in-place mutate the array, informing the GC it can free it.
191      const queue = this.#queue.splice(0, this.#queue.length)
192  
193      let totalLength = 0
194      for (const str of queue) {
195        totalLength += Buffer.byteLength(str, 'utf8')
196      }
197  
198      const buffer = Buffer.allocUnsafe(totalLength)
199      let offset = 0
200      for (const str of queue) {
201        offset += buffer.write(str, offset, 'utf8')
202      }
203  
204      return buffer
205    }
206  
207    async #drain(): Promise<void> {
208      try {
209        await this.#drainAllChunks()
210      } catch (e) {
211        // Transient fs errors (EMFILE on busy CI, EPERM on Windows pending-
212        // delete) previously rode up through `void this.#drain()` as an
213        // unhandled rejection while the flush promise resolved anyway — callers
214        // saw an empty file with no error. Retry once for the transient case
215        // (queue is intact if open() failed), then log and give up.
216        logError(e)
217        if (this.#queue.length > 0) {
218          try {
219            await this.#drainAllChunks()
220          } catch (e2) {
221            logError(e2)
222          }
223        }
224      } finally {
225        const resolve = this.#flushResolve!
226        this.#flushPromise = null
227        this.#flushResolve = null
228        resolve()
229      }
230    }
231  }
232  
233  const outputs = new Map<string, DiskTaskOutput>()
234  
235  /**
236   * Test helper — cancel pending writes, await in-flight ops, clear the map.
237   * backgroundShells.test.ts and other task tests spawn real shells that
238   * write through this module without afterEach cleanup; their entries
239   * leak into diskOutput.test.ts on the same shard.
240   *
241   * Awaits all tracked promises until the set stabilizes — a settling promise
242   * may spawn another (initTaskOutputAsSymlink's catch → initTaskOutput).
243   * Call this in afterEach BEFORE rmSync to avoid async-ENOENT-after-teardown.
244   */
245  export async function _clearOutputsForTest(): Promise<void> {
246    for (const output of outputs.values()) {
247      output.cancel()
248    }
249    while (_pendingOps.size > 0) {
250      await Promise.allSettled([..._pendingOps])
251    }
252    outputs.clear()
253  }
254  
255  function getOrCreateOutput(taskId: string): DiskTaskOutput {
256    let output = outputs.get(taskId)
257    if (!output) {
258      output = new DiskTaskOutput(taskId)
259      outputs.set(taskId, output)
260    }
261    return output
262  }
263  
264  /**
265   * Append output to a task's disk file asynchronously.
266   * Creates the file if it doesn't exist.
267   */
268  export function appendTaskOutput(taskId: string, content: string): void {
269    getOrCreateOutput(taskId).append(content)
270  }
271  
272  /**
273   * Wait for all pending writes for a task to complete.
274   * Useful before reading output to ensure all data is flushed.
275   */
276  export async function flushTaskOutput(taskId: string): Promise<void> {
277    const output = outputs.get(taskId)
278    if (output) {
279      await output.flush()
280    }
281  }
282  
283  /**
284   * Evict a task's DiskTaskOutput from the in-memory map after flushing.
285   * Unlike cleanupTaskOutput, this does not delete the output file on disk.
286   * Call this when a task completes and its output has been consumed.
287   */
288  export function evictTaskOutput(taskId: string): Promise<void> {
289    return track(
290      (async () => {
291        const output = outputs.get(taskId)
292        if (output) {
293          await output.flush()
294          outputs.delete(taskId)
295        }
296      })(),
297    )
298  }
299  
300  /**
301   * Get delta (new content) since last read.
302   * Reads only from the byte offset, up to maxBytes — never loads the full file.
303   */
304  export async function getTaskOutputDelta(
305    taskId: string,
306    fromOffset: number,
307    maxBytes: number = DEFAULT_MAX_READ_BYTES,
308  ): Promise<{ content: string; newOffset: number }> {
309    try {
310      const result = await readFileRange(
311        getTaskOutputPath(taskId),
312        fromOffset,
313        maxBytes,
314      )
315      if (!result) {
316        return { content: '', newOffset: fromOffset }
317      }
318      return {
319        content: result.content,
320        newOffset: fromOffset + result.bytesRead,
321      }
322    } catch (e) {
323      const code = getErrnoCode(e)
324      if (code === 'ENOENT') {
325        return { content: '', newOffset: fromOffset }
326      }
327      logError(e)
328      return { content: '', newOffset: fromOffset }
329    }
330  }
331  
332  /**
333   * Get output for a task, reading the tail of the file.
334   * Caps at maxBytes to avoid loading multi-GB files into memory.
335   */
336  export async function getTaskOutput(
337    taskId: string,
338    maxBytes: number = DEFAULT_MAX_READ_BYTES,
339  ): Promise<string> {
340    try {
341      const { content, bytesTotal, bytesRead } = await tailFile(
342        getTaskOutputPath(taskId),
343        maxBytes,
344      )
345      if (bytesTotal > bytesRead) {
346        return `[${Math.round((bytesTotal - bytesRead) / 1024)}KB of earlier output omitted]\n${content}`
347      }
348      return content
349    } catch (e) {
350      const code = getErrnoCode(e)
351      if (code === 'ENOENT') {
352        return ''
353      }
354      logError(e)
355      return ''
356    }
357  }
358  
359  /**
360   * Get the current size (offset) of a task's output file.
361   */
362  export async function getTaskOutputSize(taskId: string): Promise<number> {
363    try {
364      return (await stat(getTaskOutputPath(taskId))).size
365    } catch (e) {
366      const code = getErrnoCode(e)
367      if (code === 'ENOENT') {
368        return 0
369      }
370      logError(e)
371      return 0
372    }
373  }
374  
375  /**
376   * Clean up a task's output file and write queue.
377   */
378  export async function cleanupTaskOutput(taskId: string): Promise<void> {
379    const output = outputs.get(taskId)
380    if (output) {
381      output.cancel()
382      outputs.delete(taskId)
383    }
384  
385    try {
386      await unlink(getTaskOutputPath(taskId))
387    } catch (e) {
388      const code = getErrnoCode(e)
389      if (code === 'ENOENT') {
390        return
391      }
392      logError(e)
393    }
394  }
395  
396  /**
397   * Initialize output file for a new task.
398   * Creates an empty file to ensure the path exists.
399   */
400  export function initTaskOutput(taskId: string): Promise<string> {
401    return track(
402      (async () => {
403        await ensureOutputDir()
404        const outputPath = getTaskOutputPath(taskId)
405        // SECURITY: O_NOFOLLOW prevents symlink-following attacks from the sandbox.
406        // O_EXCL ensures we create a new file and fail if something already exists at this path.
407        // On Windows, use string flags — numeric O_EXCL can produce EINVAL through libuv.
408        const fh = await open(
409          outputPath,
410          process.platform === 'win32'
411            ? 'wx'
412            : fsConstants.O_WRONLY |
413                fsConstants.O_CREAT |
414                fsConstants.O_EXCL |
415                O_NOFOLLOW,
416        )
417        await fh.close()
418        return outputPath
419      })(),
420    )
421  }
422  
423  /**
424   * Initialize output file as a symlink to another file (e.g., agent transcript).
425   * Tries to create the symlink first; if a file already exists, removes it and retries.
426   */
427  export function initTaskOutputAsSymlink(
428    taskId: string,
429    targetPath: string,
430  ): Promise<string> {
431    return track(
432      (async () => {
433        try {
434          await ensureOutputDir()
435          const outputPath = getTaskOutputPath(taskId)
436  
437          try {
438            await symlink(targetPath, outputPath)
439          } catch {
440            await unlink(outputPath)
441            await symlink(targetPath, outputPath)
442          }
443  
444          return outputPath
445        } catch (error) {
446          logError(error)
447          return initTaskOutput(taskId)
448        }
449      })(),
450    )
451  }