/ src / lib / server / execution-engine / task-attempt.ts
task-attempt.ts
  1  import { checkAgentBudgetLimits } from '@/lib/server/cost'
  2  import { WORKSPACE_DIR } from '@/lib/server/data-dir'
  3  import { log } from '@/lib/server/logger'
  4  import { loadSettings } from '@/lib/server/settings/settings-repository'
  5  import { loadSessions } from '@/lib/server/sessions/session-repository'
  6  import { appendPersistedRunEvent, buildRetrievalSummary, persistRun } from '@/lib/server/runtime/run-ledger'
  7  import { notify } from '@/lib/server/ws-hub'
  8  import { captureGuardianCheckpoint } from '@/lib/server/agents/guardian'
  9  import {
 10    assessAutonomyRun,
 11    executeSupervisorAutoActions,
 12  } from '@/lib/server/autonomy/supervisor-reflection'
 13  import { errorMessage, hmrSingleton } from '@/lib/shared-utils'
 14  import type {
 15    Agent,
 16    BoardTask,
 17    Session,
 18    SessionRunRecord,
 19    SessionRunStatus,
 20  } from '@/types'
 21  import type { ExecuteChatTurnResult } from '@/lib/server/chat-execution/chat-execution-types'
 22  import type {
 23    EnqueueTaskAttemptExecutionRequest,
 24    ExecutionHandle,
 25  } from '@/lib/server/execution-engine/types'
 26  import { executeExecutionChatTurn } from '@/lib/server/execution-engine/chat-turn'
 27  
 28  const TAG = 'execution-engine'
 29  
 30  interface TaskAttemptState {
 31    runningByTaskId: Map<string, ExecutionHandle<ExecuteChatTurnResult>>
 32  }
 33  
 34  const taskAttemptState = hmrSingleton<TaskAttemptState>(
 35    '__swarmclaw_execution_engine_task_attempt__',
 36    () => ({
 37      runningByTaskId: new Map<string, ExecutionHandle<ExecuteChatTurnResult>>(),
 38    }),
 39  )
 40  
 41  function messagePreview(text: string): string {
 42    return (text || '').replace(/\s+/g, ' ').trim().slice(0, 140)
 43  }
 44  
 45  function looksIncomplete(text: string): boolean {
 46    if (!text) return false
 47    const trimmed = text.trim()
 48    if (trimmed.endsWith('...') || trimmed.endsWith('…')) return true
 49    if (/(?:^|\n)#{1,3}\s+(?:Step|Phase|Next)\s+\d/i.test(trimmed.slice(-200))) return true
 50    const lastChunk = trimmed.slice(-300).toLowerCase()
 51    return /\b(?:next i(?:'ll| will)|now i(?:'ll| will)|let me (?:now|next)|moving on to|proceeding to)\b/.test(lastChunk)
 52  }
 53  
 54  function chainCallerSignal(callerSignal: AbortSignal | undefined, controller: AbortController): void {
 55    if (!callerSignal) return
 56    if (callerSignal.aborted) {
 57      controller.abort()
 58      return
 59    }
 60    const onAbort = () => controller.abort()
 61    callerSignal.addEventListener('abort', onAbort, { once: true })
 62  }
 63  
 64  function notifyExecutionState(sessionId: string): void {
 65    notify('runs')
 66    notify('sessions')
 67    notify(`session:${sessionId}`)
 68  }
 69  
 70  function emitStatus(run: SessionRunRecord, status: SessionRunStatus, extra?: Record<string, unknown>): void {
 71    const { citations, retrievalTrace, ...eventExtra } = extra || {}
 72    appendPersistedRunEvent({
 73      runId: run.id,
 74      sessionId: run.sessionId,
 75      kind: run.kind,
 76      ownerType: run.ownerType,
 77      ownerId: run.ownerId,
 78      parentExecutionId: run.parentExecutionId,
 79      phase: 'status',
 80      status,
 81      summary: run.resultPreview || run.error || undefined,
 82      citations: citations as import('@/types').KnowledgeCitation[] | undefined,
 83      retrievalTrace: (retrievalTrace as import('@/types').KnowledgeRetrievalTrace | undefined) || undefined,
 84      event: {
 85        t: 'md',
 86        text: JSON.stringify({
 87          run: {
 88            id: run.id,
 89            sessionId: run.sessionId,
 90            kind: run.kind,
 91            ownerType: run.ownerType,
 92            ownerId: run.ownerId,
 93            status,
 94            source: run.source,
 95            internal: run.internal,
 96            ...eventExtra,
 97          },
 98        }),
 99      },
100    })
101    notifyExecutionState(run.sessionId)
102  }
103  
104  async function executeTaskAttemptTurn(
105    task: BoardTask,
106    agent: Agent,
107    sessionId: string,
108    signal: AbortSignal,
109  ): Promise<ExecuteChatTurnResult> {
110    if (agent.autoRecovery) {
111      const cwd = task.projectId
112        ? `${WORKSPACE_DIR}/projects/${task.projectId}`
113        : WORKSPACE_DIR
114      captureGuardianCheckpoint(cwd, `task:${task.id}`)
115    }
116  
117    const settings = loadSettings()
118    const basePrompt = task.description || task.title
119    const prompt = [
120      basePrompt,
121      '',
122      'Completion requirements:',
123      '- Execute the task before replying; do not reply with only a plan.',
124      '- Include concrete evidence in your final summary: changed file paths, commands run, and verification results.',
125      '- If blocked, state the blocker explicitly and what input or permission is missing.',
126    ].join('\n')
127  
128    let latestRun = await executeExecutionChatTurn({
129      sessionId,
130      message: prompt,
131      internal: false,
132      source: 'task',
133      runId: task.id,
134      signal,
135    })
136    let text = typeof latestRun.text === 'string' ? latestRun.text.trim() : ''
137    let previousSummary: string | null = null
138    let totalInputTokens = latestRun.inputTokens || 0
139    let totalOutputTokens = latestRun.outputTokens || 0
140    let totalEstimatedCost = Number(latestRun.estimatedCost || 0)
141  
142    if (latestRun.error) {
143      return {
144        ...latestRun,
145        text,
146      }
147    }
148  
149    const maxSupervisorFollowups = 2
150    for (let followupIndex = 0; followupIndex < maxSupervisorFollowups; followupIndex += 1) {
151      if (signal.aborted) break
152  
153      const sessions = loadSessions()
154      const session = sessions[sessionId] as Session | undefined
155      const assessment = assessAutonomyRun({
156        runId: `${task.id}:attempt-${(task.attempts || 0) + 1}:step-${followupIndex + 1}`,
157        sessionId,
158        taskId: task.id,
159        agentId: agent.id,
160        source: 'task',
161        status: latestRun.error ? 'failed' : 'completed',
162        resultText: text,
163        error: latestRun.error,
164        toolEvents: latestRun.toolEvents,
165        mainLoopState: {
166          followupChainCount: followupIndex + 1,
167          summary: previousSummary,
168          missionCostUsd: totalEstimatedCost,
169        },
170        session: session || null,
171        settings,
172      })
173      if (assessment.shouldBlock) break
174      if (assessment.autoActions?.length) {
175        const result = await executeSupervisorAutoActions({
176          actions: assessment.autoActions,
177          sessionId,
178          agentId: agent.id,
179        })
180        if (result.blocked) break
181      }
182      const followupMessage = assessment.interventionPrompt
183        || (text && looksIncomplete(text)
184          ? 'Continue and complete the remaining steps. Provide a final summary when done.'
185          : null)
186      if (!followupMessage) break
187  
188      if (agent.monthlyBudget || agent.dailyBudget || agent.hourlyBudget) {
189        try {
190          const followupBudget = checkAgentBudgetLimits(agent)
191          if (!followupBudget.ok) {
192            log.warn(TAG, `[task_attempt] Budget exceeded for "${agent.name}" during follow-up, stopping.`)
193            break
194          }
195        } catch {
196          // Best-effort safety check only.
197        }
198      }
199  
200      previousSummary = text || previousSummary
201      const followUp = await executeExecutionChatTurn({
202        sessionId,
203        message: followupMessage,
204        internal: false,
205        source: 'task',
206        signal,
207      })
208      totalInputTokens += followUp.inputTokens || 0
209      totalOutputTokens += followUp.outputTokens || 0
210      totalEstimatedCost += Number(followUp.estimatedCost || 0)
211      text = typeof followUp.text === 'string' ? followUp.text.trim() : ''
212      latestRun = {
213        ...followUp,
214        text,
215        inputTokens: totalInputTokens,
216        outputTokens: totalOutputTokens,
217        estimatedCost: totalEstimatedCost,
218      }
219      if (latestRun.error) break
220    }
221  
222    return {
223      ...latestRun,
224      text,
225      inputTokens: totalInputTokens,
226      outputTokens: totalOutputTokens,
227      estimatedCost: totalEstimatedCost,
228    }
229  }
230  
231  export function enqueueTaskAttemptExecution(
232    input: EnqueueTaskAttemptExecutionRequest,
233  ): ExecutionHandle<ExecuteChatTurnResult> {
234    const existing = taskAttemptState.runningByTaskId.get(input.task.id)
235    if (existing) return { ...existing, deduped: true }
236  
237    const executionId = input.executionId || `${input.task.id}:attempt-${(input.task.attempts || 0) + 1}`
238    const controller = new AbortController()
239    chainCallerSignal(input.callerSignal, controller)
240  
241    const run: SessionRunRecord = {
242      id: executionId,
243      sessionId: input.sessionId,
244      kind: 'task_attempt',
245      ownerType: 'task',
246      ownerId: input.task.id,
247      parentExecutionId: null,
248      recoveryPolicy: 'restart_recoverable',
249      source: 'task',
250      internal: false,
251      mode: 'task_attempt',
252      status: 'queued',
253      messagePreview: messagePreview(input.task.description || input.task.title),
254      queuedAt: Date.now(),
255    }
256  
257    persistRun(run)
258    emitStatus(run, 'queued')
259  
260    const promise = (async () => {
261      run.status = 'running'
262      run.startedAt = Date.now()
263      persistRun(run)
264      emitStatus(run, 'running')
265  
266      try {
267        const result = await executeTaskAttemptTurn(input.task, input.agent, input.sessionId, controller.signal)
268        run.status = controller.signal.aborted
269          ? 'cancelled'
270          : (result.error ? 'failed' : 'completed')
271        run.endedAt = Date.now()
272        run.error = controller.signal.aborted ? (run.error || 'Cancelled') : result.error
273        run.resultPreview = result.text?.slice(0, 280)
274        run.retrievalSummary = buildRetrievalSummary(result.citations)
275        if (typeof result.inputTokens === 'number') run.totalInputTokens = result.inputTokens
276        if (typeof result.outputTokens === 'number') run.totalOutputTokens = result.outputTokens
277        if (typeof result.estimatedCost === 'number') run.estimatedCost = result.estimatedCost
278        persistRun(run)
279        emitStatus(run, run.status, {
280          hasText: !!result.text,
281          error: run.error || null,
282          citations: result.citations,
283          retrievalTrace: result.retrievalTrace,
284        })
285        return result
286      } catch (err: unknown) {
287        run.status = controller.signal.aborted ? 'cancelled' : 'failed'
288        run.endedAt = Date.now()
289        run.error = errorMessage(err)
290        persistRun(run)
291        emitStatus(run, run.status, { error: run.error })
292        throw err
293      } finally {
294        const latest = taskAttemptState.runningByTaskId.get(input.task.id)
295        if (latest?.executionId === executionId) {
296          taskAttemptState.runningByTaskId.delete(input.task.id)
297        }
298      }
299    })()
300  
301    const handle: ExecutionHandle<ExecuteChatTurnResult> = {
302      executionId,
303      promise,
304      abort: () => controller.abort(),
305    }
306    taskAttemptState.runningByTaskId.set(input.task.id, handle)
307    return handle
308  }