/ utils / task / TaskOutput.ts
TaskOutput.ts
  1  import { unlink } from 'fs/promises'
  2  import { CircularBuffer } from '../CircularBuffer.js'
  3  import { logForDebugging } from '../debug.js'
  4  import { readFileRange, tailFile } from '../fsOperations.js'
  5  import { getMaxOutputLength } from '../shell/outputLimits.js'
  6  import { safeJoinLines } from '../stringUtils.js'
  7  import { DiskTaskOutput, getTaskOutputPath } from './diskOutput.js'
  8  
  9  const DEFAULT_MAX_MEMORY = 8 * 1024 * 1024 // 8MB
 10  const POLL_INTERVAL_MS = 1000
 11  const PROGRESS_TAIL_BYTES = 4096
 12  
 13  type ProgressCallback = (
 14    lastLines: string,
 15    allLines: string,
 16    totalLines: number,
 17    totalBytes: number,
 18    isIncomplete: boolean,
 19  ) => void
 20  
 21  /**
 22   * Single source of truth for a shell command's output.
 23   *
 24   * For bash commands (file mode): both stdout and stderr go directly to
 25   * a file via stdio fds — neither enters JS. Progress is extracted by
 26   * polling the file tail. getStderr() returns '' since stderr is
 27   * interleaved in the output file.
 28   *
 29   * For hooks (pipe mode): data flows through writeStdout()/writeStderr()
 30   * and is buffered in memory, spilling to disk if it exceeds the limit.
 31   */
 32  export class TaskOutput {
 33    readonly taskId: string
 34    readonly path: string
 35    /** True when stdout goes to a file fd (bypassing JS). False for pipe mode (hooks). */
 36    readonly stdoutToFile: boolean
 37    #stdoutBuffer = ''
 38    #stderrBuffer = ''
 39    #disk: DiskTaskOutput | null = null
 40    #recentLines = new CircularBuffer<string>(1000)
 41    #totalLines = 0
 42    #totalBytes = 0
 43    #maxMemory: number
 44    #onProgress: ProgressCallback | null
 45    /** Set by getStdout() — true when the file was fully read (≤ maxOutputLength). */
 46    #outputFileRedundant = false
 47    /** Set by getStdout() — total file size in bytes. */
 48    #outputFileSize = 0
 49  
 50    // --- Shared poller state ---
 51  
 52    /** Registry of all file-mode TaskOutput instances with onProgress callbacks. */
 53    static #registry = new Map<string, TaskOutput>()
 54    /** Subset of #registry currently being polled (visibility-driven by React). */
 55    static #activePolling = new Map<string, TaskOutput>()
 56    static #pollInterval: ReturnType<typeof setInterval> | null = null
 57  
 58    constructor(
 59      taskId: string,
 60      onProgress: ProgressCallback | null,
 61      stdoutToFile = false,
 62      maxMemory: number = DEFAULT_MAX_MEMORY,
 63    ) {
 64      this.taskId = taskId
 65      this.path = getTaskOutputPath(taskId)
 66      this.stdoutToFile = stdoutToFile
 67      this.#maxMemory = maxMemory
 68      this.#onProgress = onProgress
 69  
 70      // Register for polling when stdout goes to a file and progress is needed.
 71      // Actual polling is started/stopped by React via startPolling/stopPolling.
 72      if (stdoutToFile && onProgress) {
 73        TaskOutput.#registry.set(taskId, this)
 74      }
 75    }
 76  
 77    /**
 78     * Begin polling the output file for progress. Called from React
 79     * useEffect when the progress component mounts.
 80     */
 81    static startPolling(taskId: string): void {
 82      const instance = TaskOutput.#registry.get(taskId)
 83      if (!instance || !instance.#onProgress) {
 84        return
 85      }
 86      TaskOutput.#activePolling.set(taskId, instance)
 87      if (!TaskOutput.#pollInterval) {
 88        TaskOutput.#pollInterval = setInterval(TaskOutput.#tick, POLL_INTERVAL_MS)
 89        TaskOutput.#pollInterval.unref()
 90      }
 91    }
 92  
 93    /**
 94     * Stop polling the output file. Called from React useEffect cleanup
 95     * when the progress component unmounts.
 96     */
 97    static stopPolling(taskId: string): void {
 98      TaskOutput.#activePolling.delete(taskId)
 99      if (TaskOutput.#activePolling.size === 0 && TaskOutput.#pollInterval) {
100        clearInterval(TaskOutput.#pollInterval)
101        TaskOutput.#pollInterval = null
102      }
103    }
104  
105    /**
106     * Shared tick: reads the file tail for every actively-polled task.
107     * Non-async body (.then) to avoid stacking if I/O is slow.
108     */
109    static #tick(): void {
110      for (const [, entry] of TaskOutput.#activePolling) {
111        if (!entry.#onProgress) {
112          continue
113        }
114        void tailFile(entry.path, PROGRESS_TAIL_BYTES).then(
115          ({ content, bytesRead, bytesTotal }) => {
116            if (!entry.#onProgress) {
117              return
118            }
119            // Always call onProgress even when content is empty, so the
120            // progress loop wakes up and can check for backgrounding.
121            // Commands like `git log -S` produce no output for long periods.
122            if (!content) {
123              entry.#onProgress('', '', entry.#totalLines, bytesTotal, false)
124              return
125            }
126            // Count all newlines in the tail and capture slice points for the
127            // last 5 and last 100 lines. Uncapped so extrapolation stays accurate
128            // for dense output (short lines → >100 newlines in 4KB).
129            let pos = content.length
130            let n5 = 0
131            let n100 = 0
132            let lineCount = 0
133            while (pos > 0) {
134              pos = content.lastIndexOf('\n', pos - 1)
135              lineCount++
136              if (lineCount === 5) n5 = pos <= 0 ? 0 : pos + 1
137              if (lineCount === 100) n100 = pos <= 0 ? 0 : pos + 1
138            }
139            // lineCount is exact when the whole file fits in PROGRESS_TAIL_BYTES.
140            // Otherwise extrapolate from the tail sample; monotone max keeps the
141            // counter from going backwards when the tail has longer lines on one tick.
142            const totalLines =
143              bytesRead >= bytesTotal
144                ? lineCount
145                : Math.max(
146                    entry.#totalLines,
147                    Math.round((bytesTotal / bytesRead) * lineCount),
148                  )
149            entry.#totalLines = totalLines
150            entry.#totalBytes = bytesTotal
151            entry.#onProgress(
152              content.slice(n5),
153              content.slice(n100),
154              totalLines,
155              bytesTotal,
156              bytesRead < bytesTotal,
157            )
158          },
159          () => {
160            // File may not exist yet
161          },
162        )
163      }
164    }
165  
166    /** Write stdout data (pipe mode only — used by hooks). */
167    writeStdout(data: string): void {
168      this.#writeBuffered(data, false)
169    }
170  
171    /** Write stderr data (always piped). */
172    writeStderr(data: string): void {
173      this.#writeBuffered(data, true)
174    }
175  
176    #writeBuffered(data: string, isStderr: boolean): void {
177      this.#totalBytes += data.length
178  
179      this.#updateProgress(data)
180  
181      // Write to disk if already overflowed
182      if (this.#disk) {
183        this.#disk.append(isStderr ? `[stderr] ${data}` : data)
184        return
185      }
186  
187      // Check if this chunk would exceed the in-memory limit
188      const totalMem =
189        this.#stdoutBuffer.length + this.#stderrBuffer.length + data.length
190      if (totalMem > this.#maxMemory) {
191        this.#spillToDisk(isStderr ? data : null, isStderr ? null : data)
192        return
193      }
194  
195      if (isStderr) {
196        this.#stderrBuffer += data
197      } else {
198        this.#stdoutBuffer += data
199      }
200    }
201  
202    /**
203     * Single backward pass: count all newlines (for totalLines) and extract
204     * the last few lines as flat copies (for the CircularBuffer / progress).
205     * Only used in pipe mode (hooks). File mode uses the shared poller.
206     */
207    #updateProgress(data: string): void {
208      const MAX_PROGRESS_BYTES = 4096
209      const MAX_PROGRESS_LINES = 100
210  
211      let lineCount = 0
212      const lines: string[] = []
213      let extractedBytes = 0
214      let pos = data.length
215  
216      while (pos > 0) {
217        const prev = data.lastIndexOf('\n', pos - 1)
218        if (prev === -1) {
219          break
220        }
221        lineCount++
222        if (
223          lines.length < MAX_PROGRESS_LINES &&
224          extractedBytes < MAX_PROGRESS_BYTES
225        ) {
226          const lineLen = pos - prev - 1
227          if (lineLen > 0 && lineLen <= MAX_PROGRESS_BYTES - extractedBytes) {
228            const line = data.slice(prev + 1, pos)
229            if (line.trim()) {
230              lines.push(Buffer.from(line).toString())
231              extractedBytes += lineLen
232            }
233          }
234        }
235        pos = prev
236      }
237  
238      this.#totalLines += lineCount
239  
240      for (let i = lines.length - 1; i >= 0; i--) {
241        this.#recentLines.add(lines[i]!)
242      }
243  
244      if (this.#onProgress && lines.length > 0) {
245        const recent = this.#recentLines.getRecent(5)
246        this.#onProgress(
247          safeJoinLines(recent, '\n'),
248          safeJoinLines(this.#recentLines.getRecent(100), '\n'),
249          this.#totalLines,
250          this.#totalBytes,
251          this.#disk !== null,
252        )
253      }
254    }
255  
256    #spillToDisk(stderrChunk: string | null, stdoutChunk: string | null): void {
257      this.#disk = new DiskTaskOutput(this.taskId)
258  
259      // Flush existing buffers
260      if (this.#stdoutBuffer) {
261        this.#disk.append(this.#stdoutBuffer)
262        this.#stdoutBuffer = ''
263      }
264      if (this.#stderrBuffer) {
265        this.#disk.append(`[stderr] ${this.#stderrBuffer}`)
266        this.#stderrBuffer = ''
267      }
268  
269      // Write the chunk that triggered overflow
270      if (stdoutChunk) {
271        this.#disk.append(stdoutChunk)
272      }
273      if (stderrChunk) {
274        this.#disk.append(`[stderr] ${stderrChunk}`)
275      }
276    }
277  
278    /**
279     * Get stdout. In file mode, reads from the output file.
280     * In pipe mode, returns the in-memory buffer or tail from CircularBuffer.
281     */
282    async getStdout(): Promise<string> {
283      if (this.stdoutToFile) {
284        return this.#readStdoutFromFile()
285      }
286      // Pipe mode (hooks) — use in-memory data
287      if (this.#disk) {
288        const recent = this.#recentLines.getRecent(5)
289        const tail = safeJoinLines(recent, '\n')
290        const sizeKB = Math.round(this.#totalBytes / 1024)
291        const notice = `\nOutput truncated (${sizeKB}KB total). Full output saved to: ${this.path}`
292        return tail ? tail + notice : notice.trimStart()
293      }
294      return this.#stdoutBuffer
295    }
296  
297    async #readStdoutFromFile(): Promise<string> {
298      const maxBytes = getMaxOutputLength()
299      try {
300        const result = await readFileRange(this.path, 0, maxBytes)
301        if (!result) {
302          this.#outputFileRedundant = true
303          return ''
304        }
305        const { content, bytesRead, bytesTotal } = result
306        // If the file fits, it's fully captured inline and can be deleted.
307        // If not, return what we read — processToolResultBlock handles
308        // the <persisted-output> formatting and persistence downstream.
309        this.#outputFileSize = bytesTotal
310        this.#outputFileRedundant = bytesTotal <= bytesRead
311        return content
312      } catch (err) {
313        // Surface the error instead of silently returning empty. An ENOENT here
314        // means the output file was deleted while the command was running
315        // (historically: cross-session startup cleanup in the same project dir).
316        // Returning a diagnostic string keeps the tool_result non-empty, which
317        // avoids reminder-only-at-tail confusion downstream and tells the model
318        // (and us, via the transcript) what actually happened.
319        const code =
320          err instanceof Error && 'code' in err ? String(err.code) : 'unknown'
321        logForDebugging(
322          `TaskOutput.#readStdoutFromFile: failed to read ${this.path} (${code}): ${err}`,
323        )
324        return `<bash output unavailable: output file ${this.path} could not be read (${code}). This usually means another Claude Code process in the same project deleted it during startup cleanup.>`
325      }
326    }
327  
328    /** Sync getter for ExecResult.stderr */
329    getStderr(): string {
330      if (this.#disk) {
331        return ''
332      }
333      return this.#stderrBuffer
334    }
335  
336    get isOverflowed(): boolean {
337      return this.#disk !== null
338    }
339  
340    get totalLines(): number {
341      return this.#totalLines
342    }
343  
344    get totalBytes(): number {
345      return this.#totalBytes
346    }
347  
348    /**
349     * True after getStdout() when the output file was fully read.
350     * The file content is redundant (fully in ExecResult.stdout) and can be deleted.
351     */
352    get outputFileRedundant(): boolean {
353      return this.#outputFileRedundant
354    }
355  
356    /** Total file size in bytes, set after getStdout() reads the file. */
357    get outputFileSize(): number {
358      return this.#outputFileSize
359    }
360  
361    /** Force all buffered content to disk. Call when backgrounding. */
362    spillToDisk(): void {
363      if (!this.#disk) {
364        this.#spillToDisk(null, null)
365      }
366    }
367  
368    async flush(): Promise<void> {
369      await this.#disk?.flush()
370    }
371  
372    /** Delete the output file (fire-and-forget safe). */
373    async deleteOutputFile(): Promise<void> {
374      try {
375        await unlink(this.path)
376      } catch {
377        // File may already be deleted or not exist
378      }
379    }
380  
381    clear(): void {
382      this.#stdoutBuffer = ''
383      this.#stderrBuffer = ''
384      this.#recentLines.clear()
385      this.#onProgress = null
386      this.#disk?.cancel()
387      TaskOutput.stopPolling(this.taskId)
388      TaskOutput.#registry.delete(this.taskId)
389    }
390  }