task-attempt.ts
1 import { checkAgentBudgetLimits } from '@/lib/server/cost' 2 import { WORKSPACE_DIR } from '@/lib/server/data-dir' 3 import { log } from '@/lib/server/logger' 4 import { loadSettings } from '@/lib/server/settings/settings-repository' 5 import { loadSessions } from '@/lib/server/sessions/session-repository' 6 import { appendPersistedRunEvent, buildRetrievalSummary, persistRun } from '@/lib/server/runtime/run-ledger' 7 import { notify } from '@/lib/server/ws-hub' 8 import { captureGuardianCheckpoint } from '@/lib/server/agents/guardian' 9 import { 10 assessAutonomyRun, 11 executeSupervisorAutoActions, 12 } from '@/lib/server/autonomy/supervisor-reflection' 13 import { errorMessage, hmrSingleton } from '@/lib/shared-utils' 14 import type { 15 Agent, 16 BoardTask, 17 Session, 18 SessionRunRecord, 19 SessionRunStatus, 20 } from '@/types' 21 import type { ExecuteChatTurnResult } from '@/lib/server/chat-execution/chat-execution-types' 22 import type { 23 EnqueueTaskAttemptExecutionRequest, 24 ExecutionHandle, 25 } from '@/lib/server/execution-engine/types' 26 import { executeExecutionChatTurn } from '@/lib/server/execution-engine/chat-turn' 27 28 const TAG = 'execution-engine' 29 30 interface TaskAttemptState { 31 runningByTaskId: Map<string, ExecutionHandle<ExecuteChatTurnResult>> 32 } 33 34 const taskAttemptState = hmrSingleton<TaskAttemptState>( 35 '__swarmclaw_execution_engine_task_attempt__', 36 () => ({ 37 runningByTaskId: new Map<string, ExecutionHandle<ExecuteChatTurnResult>>(), 38 }), 39 ) 40 41 function messagePreview(text: string): string { 42 return (text || '').replace(/\s+/g, ' ').trim().slice(0, 140) 43 } 44 45 function looksIncomplete(text: string): boolean { 46 if (!text) return false 47 const trimmed = text.trim() 48 if (trimmed.endsWith('...') || trimmed.endsWith('…')) return true 49 if (/(?:^|\n)#{1,3}\s+(?:Step|Phase|Next)\s+\d/i.test(trimmed.slice(-200))) return true 50 const lastChunk = trimmed.slice(-300).toLowerCase() 51 return /\b(?:next i(?:'ll| will)|now i(?:'ll| will)|let me (?:now|next)|moving on to|proceeding to)\b/.test(lastChunk) 52 } 53 54 function chainCallerSignal(callerSignal: AbortSignal | undefined, controller: AbortController): void { 55 if (!callerSignal) return 56 if (callerSignal.aborted) { 57 controller.abort() 58 return 59 } 60 const onAbort = () => controller.abort() 61 callerSignal.addEventListener('abort', onAbort, { once: true }) 62 } 63 64 function notifyExecutionState(sessionId: string): void { 65 notify('runs') 66 notify('sessions') 67 notify(`session:${sessionId}`) 68 } 69 70 function emitStatus(run: SessionRunRecord, status: SessionRunStatus, extra?: Record<string, unknown>): void { 71 const { citations, retrievalTrace, ...eventExtra } = extra || {} 72 appendPersistedRunEvent({ 73 runId: run.id, 74 sessionId: run.sessionId, 75 kind: run.kind, 76 ownerType: run.ownerType, 77 ownerId: run.ownerId, 78 parentExecutionId: run.parentExecutionId, 79 phase: 'status', 80 status, 81 summary: run.resultPreview || run.error || undefined, 82 citations: citations as import('@/types').KnowledgeCitation[] | undefined, 83 retrievalTrace: (retrievalTrace as import('@/types').KnowledgeRetrievalTrace | undefined) || undefined, 84 event: { 85 t: 'md', 86 text: JSON.stringify({ 87 run: { 88 id: run.id, 89 sessionId: run.sessionId, 90 kind: run.kind, 91 ownerType: run.ownerType, 92 ownerId: run.ownerId, 93 status, 94 source: run.source, 95 internal: run.internal, 96 ...eventExtra, 97 }, 98 }), 99 }, 100 }) 101 notifyExecutionState(run.sessionId) 102 } 103 104 async function executeTaskAttemptTurn( 105 task: BoardTask, 106 agent: Agent, 107 sessionId: string, 108 signal: AbortSignal, 109 ): Promise<ExecuteChatTurnResult> { 110 if (agent.autoRecovery) { 111 const cwd = task.projectId 112 ? `${WORKSPACE_DIR}/projects/${task.projectId}` 113 : WORKSPACE_DIR 114 captureGuardianCheckpoint(cwd, `task:${task.id}`) 115 } 116 117 const settings = loadSettings() 118 const basePrompt = task.description || task.title 119 const prompt = [ 120 basePrompt, 121 '', 122 'Completion requirements:', 123 '- Execute the task before replying; do not reply with only a plan.', 124 '- Include concrete evidence in your final summary: changed file paths, commands run, and verification results.', 125 '- If blocked, state the blocker explicitly and what input or permission is missing.', 126 ].join('\n') 127 128 let latestRun = await executeExecutionChatTurn({ 129 sessionId, 130 message: prompt, 131 internal: false, 132 source: 'task', 133 runId: task.id, 134 signal, 135 }) 136 let text = typeof latestRun.text === 'string' ? latestRun.text.trim() : '' 137 let previousSummary: string | null = null 138 let totalInputTokens = latestRun.inputTokens || 0 139 let totalOutputTokens = latestRun.outputTokens || 0 140 let totalEstimatedCost = Number(latestRun.estimatedCost || 0) 141 142 if (latestRun.error) { 143 return { 144 ...latestRun, 145 text, 146 } 147 } 148 149 const maxSupervisorFollowups = 2 150 for (let followupIndex = 0; followupIndex < maxSupervisorFollowups; followupIndex += 1) { 151 if (signal.aborted) break 152 153 const sessions = loadSessions() 154 const session = sessions[sessionId] as Session | undefined 155 const assessment = assessAutonomyRun({ 156 runId: `${task.id}:attempt-${(task.attempts || 0) + 1}:step-${followupIndex + 1}`, 157 sessionId, 158 taskId: task.id, 159 agentId: agent.id, 160 source: 'task', 161 status: latestRun.error ? 'failed' : 'completed', 162 resultText: text, 163 error: latestRun.error, 164 toolEvents: latestRun.toolEvents, 165 mainLoopState: { 166 followupChainCount: followupIndex + 1, 167 summary: previousSummary, 168 missionCostUsd: totalEstimatedCost, 169 }, 170 session: session || null, 171 settings, 172 }) 173 if (assessment.shouldBlock) break 174 if (assessment.autoActions?.length) { 175 const result = await executeSupervisorAutoActions({ 176 actions: assessment.autoActions, 177 sessionId, 178 agentId: agent.id, 179 }) 180 if (result.blocked) break 181 } 182 const followupMessage = assessment.interventionPrompt 183 || (text && looksIncomplete(text) 184 ? 'Continue and complete the remaining steps. Provide a final summary when done.' 185 : null) 186 if (!followupMessage) break 187 188 if (agent.monthlyBudget || agent.dailyBudget || agent.hourlyBudget) { 189 try { 190 const followupBudget = checkAgentBudgetLimits(agent) 191 if (!followupBudget.ok) { 192 log.warn(TAG, `[task_attempt] Budget exceeded for "${agent.name}" during follow-up, stopping.`) 193 break 194 } 195 } catch { 196 // Best-effort safety check only. 197 } 198 } 199 200 previousSummary = text || previousSummary 201 const followUp = await executeExecutionChatTurn({ 202 sessionId, 203 message: followupMessage, 204 internal: false, 205 source: 'task', 206 signal, 207 }) 208 totalInputTokens += followUp.inputTokens || 0 209 totalOutputTokens += followUp.outputTokens || 0 210 totalEstimatedCost += Number(followUp.estimatedCost || 0) 211 text = typeof followUp.text === 'string' ? followUp.text.trim() : '' 212 latestRun = { 213 ...followUp, 214 text, 215 inputTokens: totalInputTokens, 216 outputTokens: totalOutputTokens, 217 estimatedCost: totalEstimatedCost, 218 } 219 if (latestRun.error) break 220 } 221 222 return { 223 ...latestRun, 224 text, 225 inputTokens: totalInputTokens, 226 outputTokens: totalOutputTokens, 227 estimatedCost: totalEstimatedCost, 228 } 229 } 230 231 export function enqueueTaskAttemptExecution( 232 input: EnqueueTaskAttemptExecutionRequest, 233 ): ExecutionHandle<ExecuteChatTurnResult> { 234 const existing = taskAttemptState.runningByTaskId.get(input.task.id) 235 if (existing) return { ...existing, deduped: true } 236 237 const executionId = input.executionId || `${input.task.id}:attempt-${(input.task.attempts || 0) + 1}` 238 const controller = new AbortController() 239 chainCallerSignal(input.callerSignal, controller) 240 241 const run: SessionRunRecord = { 242 id: executionId, 243 sessionId: input.sessionId, 244 kind: 'task_attempt', 245 ownerType: 'task', 246 ownerId: input.task.id, 247 parentExecutionId: null, 248 recoveryPolicy: 'restart_recoverable', 249 source: 'task', 250 internal: false, 251 mode: 'task_attempt', 252 status: 'queued', 253 messagePreview: messagePreview(input.task.description || input.task.title), 254 queuedAt: Date.now(), 255 } 256 257 persistRun(run) 258 emitStatus(run, 'queued') 259 260 const promise = (async () => { 261 run.status = 'running' 262 run.startedAt = Date.now() 263 persistRun(run) 264 emitStatus(run, 'running') 265 266 try { 267 const result = await executeTaskAttemptTurn(input.task, input.agent, input.sessionId, controller.signal) 268 run.status = controller.signal.aborted 269 ? 'cancelled' 270 : (result.error ? 'failed' : 'completed') 271 run.endedAt = Date.now() 272 run.error = controller.signal.aborted ? (run.error || 'Cancelled') : result.error 273 run.resultPreview = result.text?.slice(0, 280) 274 run.retrievalSummary = buildRetrievalSummary(result.citations) 275 if (typeof result.inputTokens === 'number') run.totalInputTokens = result.inputTokens 276 if (typeof result.outputTokens === 'number') run.totalOutputTokens = result.outputTokens 277 if (typeof result.estimatedCost === 'number') run.estimatedCost = result.estimatedCost 278 persistRun(run) 279 emitStatus(run, run.status, { 280 hasText: !!result.text, 281 error: run.error || null, 282 citations: result.citations, 283 retrievalTrace: result.retrievalTrace, 284 }) 285 return result 286 } catch (err: unknown) { 287 run.status = controller.signal.aborted ? 'cancelled' : 'failed' 288 run.endedAt = Date.now() 289 run.error = errorMessage(err) 290 persistRun(run) 291 emitStatus(run, run.status, { error: run.error }) 292 throw err 293 } finally { 294 const latest = taskAttemptState.runningByTaskId.get(input.task.id) 295 if (latest?.executionId === executionId) { 296 taskAttemptState.runningByTaskId.delete(input.task.id) 297 } 298 } 299 })() 300 301 const handle: ExecutionHandle<ExecuteChatTurnResult> = { 302 executionId, 303 promise, 304 abort: () => controller.abort(), 305 } 306 taskAttemptState.runningByTaskId.set(input.task.id, handle) 307 return handle 308 }