/ utils / task / framework.ts
framework.ts
  1  import {
  2    OUTPUT_FILE_TAG,
  3    STATUS_TAG,
  4    SUMMARY_TAG,
  5    TASK_ID_TAG,
  6    TASK_NOTIFICATION_TAG,
  7    TASK_TYPE_TAG,
  8    TOOL_USE_ID_TAG,
  9  } from '../../constants/xml.js'
 10  import type { AppState } from '../../state/AppState.js'
 11  import {
 12    isTerminalTaskStatus,
 13    type TaskStatus,
 14    type TaskType,
 15  } from '../../Task.js'
 16  import type { TaskState } from '../../tasks/types.js'
 17  import { enqueuePendingNotification } from '../messageQueueManager.js'
 18  import { enqueueSdkEvent } from '../sdkEventQueue.js'
 19  import { getTaskOutputDelta, getTaskOutputPath } from './diskOutput.js'
 20  
 21  // Standard polling interval for all tasks
 22  export const POLL_INTERVAL_MS = 1000
 23  
 24  // Duration to display killed tasks before eviction
 25  export const STOPPED_DISPLAY_MS = 3_000
 26  
 27  // Grace period for terminal local_agent tasks in the coordinator panel
 28  export const PANEL_GRACE_MS = 30_000
 29  
 30  // Attachment type for task status updates
 31  export type TaskAttachment = {
 32    type: 'task_status'
 33    taskId: string
 34    toolUseId?: string
 35    taskType: TaskType
 36    status: TaskStatus
 37    description: string
 38    deltaSummary: string | null // New output since last attachment
 39  }
 40  
 41  type SetAppState = (updater: (prev: AppState) => AppState) => void
 42  
 43  /**
 44   * Update a task's state in AppState.
 45   * Helper function for task implementations.
 46   * Generic to allow type-safe updates for specific task types.
 47   */
 48  export function updateTaskState<T extends TaskState>(
 49    taskId: string,
 50    setAppState: SetAppState,
 51    updater: (task: T) => T,
 52  ): void {
 53    setAppState(prev => {
 54      const task = prev.tasks?.[taskId] as T | undefined
 55      if (!task) {
 56        return prev
 57      }
 58      const updated = updater(task)
 59      if (updated === task) {
 60        // Updater returned the same reference (early-return no-op). Skip the
 61        // spread so s.tasks subscribers don't re-render on unchanged state.
 62        return prev
 63      }
 64      return {
 65        ...prev,
 66        tasks: {
 67          ...prev.tasks,
 68          [taskId]: updated,
 69        },
 70      }
 71    })
 72  }
 73  
 74  /**
 75   * Register a new task in AppState.
 76   */
 77  export function registerTask(task: TaskState, setAppState: SetAppState): void {
 78    let isReplacement = false
 79    setAppState(prev => {
 80      const existing = prev.tasks[task.id]
 81      isReplacement = existing !== undefined
 82      // Carry forward UI-held state on re-register (resumeAgentBackground
 83      // replaces the task; user's retain shouldn't reset). startTime keeps
 84      // the panel sort stable; messages + diskLoaded preserve the viewed
 85      // transcript across the replace (the user's just-appended prompt lives
 86      // in messages and isn't on disk yet).
 87      const merged =
 88        existing && 'retain' in existing
 89          ? {
 90              ...task,
 91              retain: existing.retain,
 92              startTime: existing.startTime,
 93              messages: existing.messages,
 94              diskLoaded: existing.diskLoaded,
 95              pendingMessages: existing.pendingMessages,
 96            }
 97          : task
 98      return { ...prev, tasks: { ...prev.tasks, [task.id]: merged } }
 99    })
100  
101    // Replacement (resume) — not a new start. Skip to avoid double-emit.
102    if (isReplacement) return
103  
104    enqueueSdkEvent({
105      type: 'system',
106      subtype: 'task_started',
107      task_id: task.id,
108      tool_use_id: task.toolUseId,
109      description: task.description,
110      task_type: task.type,
111      workflow_name:
112        'workflowName' in task
113          ? (task.workflowName as string | undefined)
114          : undefined,
115      prompt: 'prompt' in task ? (task.prompt as string) : undefined,
116    })
117  }
118  
119  /**
120   * Eagerly evict a terminal task from AppState.
121   * The task must be in a terminal state (completed/failed/killed) with notified=true.
122   * This allows memory to be freed without waiting for the next query loop iteration.
123   * The lazy GC in generateTaskAttachments() remains as a safety net.
124   */
125  export function evictTerminalTask(
126    taskId: string,
127    setAppState: SetAppState,
128  ): void {
129    setAppState(prev => {
130      const task = prev.tasks?.[taskId]
131      if (!task) return prev
132      if (!isTerminalTaskStatus(task.status)) return prev
133      if (!task.notified) return prev
134      // Panel grace period — blocks eviction until deadline passes.
135      // 'retain' in task narrows to LocalAgentTaskState (the only type with
136      // that field); evictAfter is optional so 'evictAfter' in task would
137      // miss tasks that haven't had it set yet.
138      if ('retain' in task && (task.evictAfter ?? Infinity) > Date.now()) {
139        return prev
140      }
141      const { [taskId]: _, ...remainingTasks } = prev.tasks
142      return { ...prev, tasks: remainingTasks }
143    })
144  }
145  
146  /**
147   * Get all running tasks.
148   */
149  export function getRunningTasks(state: AppState): TaskState[] {
150    const tasks = state.tasks ?? {}
151    return Object.values(tasks).filter(task => task.status === 'running')
152  }
153  
154  /**
155   * Generate attachments for tasks with new output or status changes.
156   * Called by the framework to create push notifications.
157   */
158  export async function generateTaskAttachments(state: AppState): Promise<{
159    attachments: TaskAttachment[]
160    // Only the offset patch — NOT the full task. The task may transition to
161    // completed during getTaskOutputDelta's async disk read, and spreading the
162    // full stale snapshot would clobber that transition (zombifying the task).
163    updatedTaskOffsets: Record<string, number>
164    evictedTaskIds: string[]
165  }> {
166    const attachments: TaskAttachment[] = []
167    const updatedTaskOffsets: Record<string, number> = {}
168    const evictedTaskIds: string[] = []
169    const tasks = state.tasks ?? {}
170  
171    for (const taskState of Object.values(tasks)) {
172      if (taskState.notified) {
173        switch (taskState.status) {
174          case 'completed':
175          case 'failed':
176          case 'killed':
177            // Evict terminal tasks — they've been consumed and can be GC'd
178            evictedTaskIds.push(taskState.id)
179            continue
180          case 'pending':
181            // Keep in map — hasn't run yet, but parent already knows about it
182            continue
183          case 'running':
184            // Fall through to running logic below
185            break
186        }
187      }
188  
189      if (taskState.status === 'running') {
190        const delta = await getTaskOutputDelta(
191          taskState.id,
192          taskState.outputOffset,
193        )
194        if (delta.content) {
195          updatedTaskOffsets[taskState.id] = delta.newOffset
196        }
197      }
198  
199      // Completed tasks are NOT notified here — each task type handles its own
200      // completion notification via enqueuePendingNotification(). Generating
201      // attachments here would race with those per-type callbacks, causing
202      // dual delivery (one inline attachment + one separate API turn).
203    }
204  
205    return { attachments, updatedTaskOffsets, evictedTaskIds }
206  }
207  
208  /**
209   * Apply the outputOffset patches and evictions from generateTaskAttachments.
210   * Merges patches against FRESH prev.tasks (not the stale pre-await snapshot),
211   * so concurrent status transitions aren't clobbered.
212   */
213  export function applyTaskOffsetsAndEvictions(
214    setAppState: SetAppState,
215    updatedTaskOffsets: Record<string, number>,
216    evictedTaskIds: string[],
217  ): void {
218    const offsetIds = Object.keys(updatedTaskOffsets)
219    if (offsetIds.length === 0 && evictedTaskIds.length === 0) {
220      return
221    }
222    setAppState(prev => {
223      let changed = false
224      const newTasks = { ...prev.tasks }
225      for (const id of offsetIds) {
226        const fresh = newTasks[id]
227        // Re-check status on fresh state — task may have completed during the
228        // await. If it's no longer running, the offset update is moot.
229        if (fresh?.status === 'running') {
230          newTasks[id] = { ...fresh, outputOffset: updatedTaskOffsets[id]! }
231          changed = true
232        }
233      }
234      for (const id of evictedTaskIds) {
235        const fresh = newTasks[id]
236        // Re-check terminal+notified on fresh state (TOCTOU: resume may have
237        // replaced the task during the generateTaskAttachments await)
238        if (!fresh || !isTerminalTaskStatus(fresh.status) || !fresh.notified) {
239          continue
240        }
241        if ('retain' in fresh && (fresh.evictAfter ?? Infinity) > Date.now()) {
242          continue
243        }
244        delete newTasks[id]
245        changed = true
246      }
247      return changed ? { ...prev, tasks: newTasks } : prev
248    })
249  }
250  
251  /**
252   * Poll all running tasks and check for updates.
253   * This is the main polling loop called by the framework.
254   */
255  export async function pollTasks(
256    getAppState: () => AppState,
257    setAppState: SetAppState,
258  ): Promise<void> {
259    const state = getAppState()
260    const { attachments, updatedTaskOffsets, evictedTaskIds } =
261      await generateTaskAttachments(state)
262  
263    applyTaskOffsetsAndEvictions(setAppState, updatedTaskOffsets, evictedTaskIds)
264  
265    // Send notifications for completed tasks
266    for (const attachment of attachments) {
267      enqueueTaskNotification(attachment)
268    }
269  }
270  
271  /**
272   * Enqueue a task notification to the message queue.
273   */
274  function enqueueTaskNotification(attachment: TaskAttachment): void {
275    const statusText = getStatusText(attachment.status)
276  
277    const outputPath = getTaskOutputPath(attachment.taskId)
278    const toolUseIdLine = attachment.toolUseId
279      ? `\n<${TOOL_USE_ID_TAG}>${attachment.toolUseId}</${TOOL_USE_ID_TAG}>`
280      : ''
281    const message = `<${TASK_NOTIFICATION_TAG}>
282  <${TASK_ID_TAG}>${attachment.taskId}</${TASK_ID_TAG}>${toolUseIdLine}
283  <${TASK_TYPE_TAG}>${attachment.taskType}</${TASK_TYPE_TAG}>
284  <${OUTPUT_FILE_TAG}>${outputPath}</${OUTPUT_FILE_TAG}>
285  <${STATUS_TAG}>${attachment.status}</${STATUS_TAG}>
286  <${SUMMARY_TAG}>Task "${attachment.description}" ${statusText}</${SUMMARY_TAG}>
287  </${TASK_NOTIFICATION_TAG}>`
288  
289    enqueuePendingNotification({ value: message, mode: 'task-notification' })
290  }
291  
292  /**
293   * Get human-readable status text.
294   */
295  function getStatusText(status: TaskStatus): string {
296    switch (status) {
297      case 'completed':
298        return 'completed successfully'
299      case 'failed':
300        return 'failed'
301      case 'killed':
302        return 'was stopped'
303      case 'running':
304        return 'is running'
305      case 'pending':
306        return 'is pending'
307    }
308  }