core.ts
1 import { log } from '@/lib/server/logger' 2 import { matchesCapabilities, filterAgentsByCapabilities, capabilityMatchScore } from '@/lib/server/agents/capability-match' 3 import { genId } from '@/lib/id' 4 import { dedup, hmrSingleton, jitteredBackoff } from '@/lib/shared-utils' 5 import fs from 'node:fs' 6 import path from 'node:path' 7 import { logActivity } from '@/lib/server/activity/activity-log' 8 import { loadAgents } from '@/lib/server/agents/agent-repository' 9 import { withTransaction } from '@/lib/server/persistence/transaction' 10 import { loadQueue, saveQueue } from '@/lib/server/runtime/queue-repository' 11 import { loadSchedules, saveSchedules } from '@/lib/server/schedules/schedule-repository' 12 import { loadSessions, saveSessions } from '@/lib/server/sessions/session-repository' 13 import { loadSettings } from '@/lib/server/settings/settings-repository' 14 import { loadTasks, saveTasks } from '@/lib/server/tasks/task-repository' 15 import { notify } from '@/lib/server/ws-hub' 16 import { getMessages, getLastMessage, appendMessage } from '@/lib/server/messages/message-repository' 17 import { perf } from '@/lib/server/runtime/perf' 18 import { WORKSPACE_DIR } from '@/lib/server/data-dir' 19 import { createAgentTaskSession } from '@/lib/server/agents/task-session' 20 import { formatValidationFailure } from '@/lib/server/tasks/task-validation' 21 import { pushMainLoopEventToMainSessions } from '@/lib/server/agents/main-agent-loop' 22 import type { ExecuteChatTurnResult } from '@/lib/server/chat-execution/chat-execution-types' 23 import { checkAgentBudgetLimits } from '@/lib/server/cost' 24 import { enqueueExecution } from '@/lib/server/execution-engine' 25 import { extractTaskResult, formatResultBody } from '@/lib/server/tasks/task-result' 26 import { checkoutTask } from '@/lib/server/tasks/task-checkout' 27 import { queueSwarmFeedTaskCompletionWake } from '@/lib/server/swarmfeed-runtime' 28 import { 29 classifyRuntimeFailure, 30 observeAutonomyRunOutcome, 31 recordSupervisorIncident, 32 } from '@/lib/server/autonomy/supervisor-reflection' 33 import { 34 collectTaskConnectorFollowupTargets as collectTaskConnectorFollowupTargetsImpl, 35 extractLikelyOutputFiles, 36 isSendableAttachment, 37 maybeResolveUploadMediaPathFromUrl, 38 notifyConnectorTaskFollowups, 39 resolveExistingOutputFilePath, 40 resolveTaskOriginConnectorFollowupTarget as resolveTaskOriginConnectorFollowupTargetImpl, 41 type ScheduleTaskMeta, 42 type SessionLike, 43 } from '@/lib/server/tasks/task-followups' 44 import { getCheckpointSaver } from '@/lib/server/langgraph-checkpoint' 45 import { cascadeUnblock } from '@/lib/server/dag-validation' 46 import { prepareGuardianRecovery } from '@/lib/server/agents/guardian' 47 import { notifyOrchestrators } from '@/lib/server/runtime/orchestrator-events' 48 import type { Agent, BoardTask, Message, Session } from '@/types' 49 import { buildAgentDisabledMessage, isAgentDisabled } from '@/lib/server/agents/agent-availability' 50 import { 51 didTaskValidationChange, 52 markInvalidCompletedTaskFailed, 53 markValidatedTaskCompleted, 54 refreshTaskCompletionValidation, 55 } from '@/lib/server/tasks/task-lifecycle' 56 57 const TAG = 'queue' 58 59 export const collectTaskConnectorFollowupTargets = collectTaskConnectorFollowupTargetsImpl 60 export const resolveTaskOriginConnectorFollowupTarget = resolveTaskOriginConnectorFollowupTargetImpl 61 62 // HMR-safe: pin processing state to globalThis so hot reloads don't reset it 63 const _queueState = hmrSingleton('__swarmclaw_queue__', () => ({ 64 activeCount: 0, 65 maxConcurrent: 3, 66 pendingKick: false, 67 })) 68 69 function normalizeInt(value: unknown, fallback: number, min: number, max: number): number { 70 const parsed = typeof value === 'number' 71 ? value 72 : typeof value === 'string' 73 ? Number.parseInt(value, 10) 74 : Number.NaN 75 if (!Number.isFinite(parsed)) return fallback 76 return Math.max(min, Math.min(max, Math.trunc(parsed))) 77 } 78 79 const OPENCLAW_USE_CASE_TAGS = new Set([ 80 'local-dev', 81 'single-vps', 82 'private-tailnet', 83 'browser-heavy', 84 'team-control', 85 ]) 86 87 function deriveTaskRoutePreferences(task: BoardTask): { 88 preferredGatewayTags?: string[] 89 preferredGatewayUseCase?: string | null 90 } { 91 const tags = Array.isArray(task.tags) 92 ? dedup(task.tags.map((tag) => (typeof tag === 'string' ? tag.trim().toLowerCase() : '')).filter(Boolean)) 93 : [] 94 const customUseCase = typeof task.customFields?.openclawUseCase === 'string' 95 ? task.customFields.openclawUseCase 96 : typeof task.customFields?.gatewayUseCase === 'string' 97 ? task.customFields.gatewayUseCase 98 : null 99 const preferredGatewayUseCase = customUseCase && OPENCLAW_USE_CASE_TAGS.has(customUseCase) 100 ? customUseCase 101 : (tags.find((tag) => OPENCLAW_USE_CASE_TAGS.has(tag)) || null) 102 const preferredGatewayTags = tags.filter((tag) => tag !== preferredGatewayUseCase) 103 return { 104 preferredGatewayTags, 105 preferredGatewayUseCase, 106 } 107 } 108 109 function resolveTaskPolicy(task: BoardTask): { maxAttempts: number; backoffSec: number } { 110 const settings = loadSettings() 111 const defaultMaxAttempts = normalizeInt(settings.defaultTaskMaxAttempts, 3, 1, 20) 112 const defaultBackoffSec = normalizeInt(settings.taskRetryBackoffSec, 30, 1, 3600) 113 const maxAttempts = normalizeInt(task.maxAttempts, defaultMaxAttempts, 1, 20) 114 const backoffSec = normalizeInt(task.retryBackoffSec, defaultBackoffSec, 1, 3600) 115 return { maxAttempts, backoffSec } 116 } 117 118 function applyTaskPolicyDefaults(task: BoardTask): void { 119 const policy = resolveTaskPolicy(task) 120 if (typeof task.attempts !== 'number' || task.attempts < 0) task.attempts = 0 121 task.maxAttempts = policy.maxAttempts 122 task.retryBackoffSec = policy.backoffSec 123 if (task.retryScheduledAt === undefined) task.retryScheduledAt = null 124 if (task.deadLetteredAt === undefined) task.deadLetteredAt = null 125 } 126 127 export interface TaskResumeState { 128 claudeSessionId: string | null 129 codexThreadId: string | null 130 opencodeSessionId: string | null 131 cursorSessionId?: string | null 132 qwenSessionId?: string | null 133 delegateResumeIds: NonNullable<Session['delegateResumeIds']> 134 } 135 136 export interface TaskResumeContext { 137 source: 'self' | 'delegated_from_task' | 'blocked_by' 138 sourceTaskId: string 139 sourceTaskTitle: string 140 sourceSessionId: string | null 141 resume: TaskResumeState 142 } 143 144 function normalizeResumeHandle(value: unknown): string | null { 145 return typeof value === 'string' && value.trim() ? value.trim() : null 146 } 147 148 function buildEmptyDelegateResumeIds(): NonNullable<Session['delegateResumeIds']> { 149 return { 150 claudeCode: null, 151 codex: null, 152 opencode: null, 153 gemini: null, 154 copilot: null, 155 cursor: null, 156 qwen: null, 157 } 158 } 159 160 function normalizeCliProvider(value: unknown): string | null { 161 return typeof value === 'string' && value.trim() ? value.trim().toLowerCase() : null 162 } 163 164 function hasResumeState(state: TaskResumeState | null | undefined): state is TaskResumeState { 165 if (!state) return false 166 return Boolean( 167 state.claudeSessionId 168 || state.codexThreadId 169 || state.opencodeSessionId 170 || state.delegateResumeIds.claudeCode 171 || state.delegateResumeIds.codex 172 || state.delegateResumeIds.opencode 173 || state.delegateResumeIds.gemini 174 || state.delegateResumeIds.cursor 175 || state.delegateResumeIds.qwen 176 ) 177 } 178 179 export function extractTaskResumeState(task: Partial<BoardTask> | null | undefined): TaskResumeState | null { 180 if (!task) return null 181 182 const legacyResumeId = normalizeResumeHandle(task.cliResumeId) 183 const legacyProvider = normalizeCliProvider(task.cliProvider) 184 const claudeSessionId = normalizeResumeHandle(task.claudeResumeId) 185 || (legacyProvider === 'claude-cli' ? legacyResumeId : null) 186 const codexThreadId = normalizeResumeHandle(task.codexResumeId) 187 || (legacyProvider === 'codex-cli' ? legacyResumeId : null) 188 const opencodeSessionId = normalizeResumeHandle(task.opencodeResumeId) 189 || (legacyProvider === 'opencode-cli' ? legacyResumeId : null) 190 const geminiSessionId = normalizeResumeHandle(task.geminiResumeId) 191 || (legacyProvider === 'gemini-cli' ? legacyResumeId : null) 192 const cursorSessionId = legacyProvider === 'cursor-cli' ? legacyResumeId : null 193 const qwenSessionId = legacyProvider === 'qwen-code-cli' ? legacyResumeId : null 194 195 const resume = { 196 claudeSessionId, 197 codexThreadId, 198 opencodeSessionId, 199 cursorSessionId, 200 qwenSessionId, 201 delegateResumeIds: { 202 claudeCode: claudeSessionId, 203 codex: codexThreadId, 204 opencode: opencodeSessionId, 205 gemini: geminiSessionId, 206 cursor: cursorSessionId, 207 qwen: qwenSessionId, 208 }, 209 } satisfies TaskResumeState 210 211 return hasResumeState(resume) ? resume : null 212 } 213 214 export function extractSessionResumeState(session: Partial<Session> | null | undefined): TaskResumeState | null { 215 if (!session) return null 216 217 const claudeSessionId = normalizeResumeHandle(session.claudeSessionId) 218 const codexThreadId = normalizeResumeHandle(session.codexThreadId) 219 const opencodeSessionId = normalizeResumeHandle(session.opencodeSessionId) 220 const cursorSessionId = normalizeResumeHandle(session.cursorSessionId) 221 const qwenSessionId = normalizeResumeHandle(session.qwenSessionId) 222 const delegateResumeIds = session.delegateResumeIds && typeof session.delegateResumeIds === 'object' 223 ? { ...buildEmptyDelegateResumeIds(), ...session.delegateResumeIds } 224 : buildEmptyDelegateResumeIds() 225 226 const resume = { 227 claudeSessionId, 228 codexThreadId, 229 opencodeSessionId, 230 cursorSessionId, 231 qwenSessionId, 232 delegateResumeIds: { 233 claudeCode: normalizeResumeHandle(delegateResumeIds.claudeCode) || claudeSessionId, 234 codex: normalizeResumeHandle(delegateResumeIds.codex) || codexThreadId, 235 opencode: normalizeResumeHandle(delegateResumeIds.opencode) || opencodeSessionId, 236 gemini: normalizeResumeHandle(delegateResumeIds.gemini), 237 cursor: normalizeResumeHandle(delegateResumeIds.cursor) || cursorSessionId, 238 qwen: normalizeResumeHandle(delegateResumeIds.qwen) || qwenSessionId, 239 copilot: normalizeResumeHandle(delegateResumeIds.copilot), 240 }, 241 } satisfies TaskResumeState 242 243 return hasResumeState(resume) ? resume : null 244 } 245 246 export function resolveTaskResumeContext( 247 task: BoardTask, 248 tasksById: Record<string, BoardTask>, 249 sessionsById?: Record<string, SessionLike | Session>, 250 ): TaskResumeContext | null { 251 const candidates: Array<{ source: TaskResumeContext['source']; taskId: string | null | undefined }> = [ 252 { source: 'self', taskId: task.id }, 253 { source: 'delegated_from_task', taskId: task.delegatedFromTaskId }, 254 ...((Array.isArray(task.blockedBy) ? task.blockedBy : []).map((taskId) => ({ source: 'blocked_by' as const, taskId }))), 255 ] 256 const seen = new Set<string>() 257 258 for (const candidate of candidates) { 259 const taskId = typeof candidate.taskId === 'string' ? candidate.taskId.trim() : '' 260 if (!taskId || seen.has(taskId)) continue 261 seen.add(taskId) 262 const sourceTask = taskId === task.id ? task : tasksById[taskId] 263 if (!sourceTask) continue 264 const sourceSessionId = normalizeResumeHandle(sourceTask.checkpoint?.lastSessionId) || normalizeResumeHandle(sourceTask.sessionId) 265 const resume = extractTaskResumeState(sourceTask) 266 || (sourceSessionId && sessionsById?.[sourceSessionId] 267 ? extractSessionResumeState(sessionsById[sourceSessionId] as Session) 268 : null) 269 if (!resume) continue 270 return { 271 source: candidate.source, 272 sourceTaskId: sourceTask.id, 273 sourceTaskTitle: sourceTask.title, 274 sourceSessionId, 275 resume, 276 } 277 } 278 279 return null 280 } 281 282 export function applyTaskResumeStateToSession(session: Session, resume: TaskResumeState | null | undefined): boolean { 283 if (!hasResumeState(resume)) return false 284 285 let changed = false 286 const directFields: Array<['claudeSessionId' | 'codexThreadId' | 'opencodeSessionId' | 'cursorSessionId' | 'qwenSessionId', string | null]> = [ 287 ['claudeSessionId', resume.claudeSessionId], 288 ['codexThreadId', resume.codexThreadId], 289 ['opencodeSessionId', resume.opencodeSessionId], 290 ['cursorSessionId', resume.cursorSessionId ?? null], 291 ['qwenSessionId', resume.qwenSessionId ?? null], 292 ] 293 for (const [key, value] of directFields) { 294 if (!value || session[key] === value) continue 295 session[key] = value 296 changed = true 297 } 298 299 const currentDelegateResume = session.delegateResumeIds && typeof session.delegateResumeIds === 'object' 300 ? { ...buildEmptyDelegateResumeIds(), ...session.delegateResumeIds } 301 : buildEmptyDelegateResumeIds() 302 for (const [key, value] of Object.entries(resume.delegateResumeIds) as Array<[keyof NonNullable<Session['delegateResumeIds']>, string | null]>) { 303 if (!value || currentDelegateResume[key] === value) continue 304 currentDelegateResume[key] = value 305 changed = true 306 } 307 if (changed) session.delegateResumeIds = currentDelegateResume 308 return changed 309 } 310 311 export function resolveReusableTaskSessionId( 312 task: BoardTask, 313 tasks: Record<string, BoardTask>, 314 sessions: Record<string, SessionLike>, 315 ): string { 316 const candidateTaskIds = [ 317 task.id, 318 typeof task.delegatedFromTaskId === 'string' ? task.delegatedFromTaskId : '', 319 ...(Array.isArray(task.blockedBy) ? task.blockedBy : []), 320 ] 321 const seen = new Set<string>() 322 for (const candidateTaskId of candidateTaskIds) { 323 const taskId = typeof candidateTaskId === 'string' ? candidateTaskId.trim() : '' 324 if (!taskId || seen.has(taskId)) continue 325 seen.add(taskId) 326 const sourceTask = taskId === task.id ? task : tasks[taskId] 327 if (!sourceTask) continue 328 const candidates = [ 329 normalizeResumeHandle(sourceTask.checkpoint?.lastSessionId), 330 normalizeResumeHandle(sourceTask.sessionId), 331 ] 332 for (const candidate of candidates) { 333 if (candidate && sessions[candidate]) return candidate 334 } 335 } 336 return '' 337 } 338 339 function buildTaskContinuationNote( 340 reusedExistingSession: boolean, 341 resumeContext: TaskResumeContext | null, 342 ): string { 343 const notes: string[] = [] 344 if (reusedExistingSession) { 345 notes.push('Reusing the previous execution session for this task.') 346 } 347 if (resumeContext?.source === 'delegated_from_task' || resumeContext?.source === 'blocked_by') { 348 notes.push(`Stored CLI context is available from related task "${resumeContext.sourceTaskTitle}".`) 349 } else if (resumeContext?.source === 'self' && !reusedExistingSession) { 350 notes.push('Stored CLI resume handles are available for continuation.') 351 } 352 return notes.length ? `\n\n${notes.join(' ')}` : '' 353 } 354 355 const DEV_TASK_HINT = /\b(dev(?:\s+server)?|start(?:ing)?\s+(?:the\s+)?server|run(?:ning)?\s+(?:the\s+)?(?:app|project|site)|serve|localhost|http\s+server|web\s+server|npm\b|pnpm\b|yarn\b|bun\b|vite|next(?:\.js)?|react|build|compile)\b/i 356 const TASK_CWD_NOISE_DIRS = new Set([ 357 'uploads', 358 'data', 359 'projects', 360 'tasks', 361 '.swarm-data-test', 362 '.git', 363 '.next', 364 'node_modules', 365 ]) 366 const PROJECT_MARKER_FILES = ['package.json', 'pyproject.toml', 'Cargo.toml', 'go.mod', '.git'] 367 const SOURCE_MARKER_DIRS = ['src', 'app', 'public', 'pages'] 368 const WORKSPACE_PROJECTS_DIR = path.join(WORKSPACE_DIR, 'projects') 369 370 interface WorkspaceDirCandidate { 371 dir: string 372 name: string 373 hasProjectMarker: boolean 374 hasSourceMarker: boolean 375 } 376 377 let workspaceDirCache: { expiresAt: number; candidates: WorkspaceDirCandidate[] } | null = null 378 379 function isExistingDirectory(dirPath: string): boolean { 380 try { 381 return fs.statSync(dirPath).isDirectory() 382 } catch { 383 return false 384 } 385 } 386 387 function isWithinDirectory(parent: string, child: string): boolean { 388 const parentResolved = path.resolve(parent) 389 const childResolved = path.resolve(child) 390 const rel = path.relative(parentResolved, childResolved) 391 return rel === '' || (!rel.startsWith('..') && !path.isAbsolute(rel)) 392 } 393 394 function normalizeForMatch(value: string): string { 395 return value.toLowerCase().replace(/[^a-z0-9]+/g, ' ').trim() 396 } 397 398 function hasAnyMarker(dirPath: string, markers: string[]): boolean { 399 return markers.some((marker) => fs.existsSync(path.join(dirPath, marker))) 400 } 401 402 function normalizeDirCandidate(raw: unknown, baseDir: string): string | null { 403 if (typeof raw !== 'string') return null 404 const trimmed = raw.trim() 405 if (!trimmed) return null 406 const homeDir = process.env.HOME || '' 407 const expanded = trimmed === '~' 408 ? homeDir 409 : trimmed.startsWith('~/') 410 ? path.join(homeDir, trimmed.slice(2)) 411 : trimmed 412 const resolved = path.isAbsolute(expanded) ? path.resolve(expanded) : path.resolve(baseDir, expanded) 413 return isExistingDirectory(resolved) ? resolved : null 414 } 415 416 function looksLikeDevTask(task: Pick<BoardTask, 'title' | 'description'>): boolean { 417 const text = `${task.title || ''} ${task.description || ''}`.trim() 418 return DEV_TASK_HINT.test(text) 419 } 420 421 function listWorkspaceDirCandidates(): WorkspaceDirCandidate[] { 422 const now = Date.now() 423 if (workspaceDirCache && workspaceDirCache.expiresAt > now) return workspaceDirCache.candidates 424 425 const candidates: WorkspaceDirCandidate[] = [] 426 const seen = new Set<string>() 427 const roots = [WORKSPACE_DIR, WORKSPACE_PROJECTS_DIR] 428 429 for (const root of roots) { 430 if (!isExistingDirectory(root)) continue 431 let entries: fs.Dirent[] = [] 432 try { 433 entries = fs.readdirSync(root, { withFileTypes: true }) 434 } catch { 435 continue 436 } 437 for (const entry of entries) { 438 if (!entry.isDirectory()) continue 439 const name = entry.name 440 if (!name || name.startsWith('.')) continue 441 if (TASK_CWD_NOISE_DIRS.has(name)) continue 442 const dir = path.join(root, name) 443 const key = path.resolve(dir) 444 if (seen.has(key)) continue 445 seen.add(key) 446 candidates.push({ 447 dir: key, 448 name, 449 hasProjectMarker: hasAnyMarker(key, PROJECT_MARKER_FILES), 450 hasSourceMarker: hasAnyMarker(key, SOURCE_MARKER_DIRS), 451 }) 452 } 453 } 454 455 candidates.sort((a, b) => a.name.localeCompare(b.name)) 456 workspaceDirCache = { 457 expiresAt: now + 15_000, 458 candidates, 459 } 460 return candidates 461 } 462 463 function inferWorkspaceProjectCwd(task: Pick<BoardTask, 'title' | 'description' | 'file'>): string | null { 464 const candidates = listWorkspaceDirCandidates() 465 if (!candidates.length) return null 466 467 const taskText = normalizeForMatch(`${task.title || ''} ${task.description || ''} ${task.file || ''}`) 468 const devTask = looksLikeDevTask(task) 469 const markerCandidates = candidates.filter((candidate) => candidate.hasProjectMarker) 470 471 let best: { dir: string; score: number } | null = null 472 for (const candidate of candidates) { 473 const nameNorm = normalizeForMatch(candidate.name) 474 if (!nameNorm) continue 475 let score = 0 476 if (taskText.includes(nameNorm)) score += 8 477 for (const token of nameNorm.split(' ')) { 478 if (token.length < 3) continue 479 if (taskText.includes(token)) score += 1 480 } 481 if (candidate.hasProjectMarker) score += devTask ? 3 : 1 482 if (candidate.hasSourceMarker) score += 1 483 if (!best || score > best.score) best = { dir: candidate.dir, score } 484 } 485 486 if (best && best.score >= 4) return best.dir 487 if (devTask && markerCandidates.length === 1) return markerCandidates[0].dir 488 return null 489 } 490 491 function resolveTaskExecutionCwd(task: ScheduleTaskMeta, sessions: Record<string, SessionLike>): string { 492 const workspaceRoot = path.resolve(WORKSPACE_DIR) 493 494 const explicitCwd = normalizeDirCandidate(task.cwd, workspaceRoot) 495 if (explicitCwd) return explicitCwd 496 497 const projectId = typeof task.projectId === 'string' ? task.projectId.trim() : '' 498 if (projectId) { 499 const projectDir = path.join(WORKSPACE_PROJECTS_DIR, projectId) 500 if (isExistingDirectory(projectDir)) return projectDir 501 } 502 503 const fileRef = typeof task.file === 'string' ? task.file.trim() : '' 504 if (fileRef) { 505 const filePath = path.isAbsolute(fileRef) ? fileRef : path.resolve(workspaceRoot, fileRef) 506 const fileDir = isExistingDirectory(filePath) ? filePath : path.dirname(filePath) 507 if (isExistingDirectory(fileDir) && isWithinDirectory(workspaceRoot, fileDir)) return fileDir 508 } 509 510 const inferredCwd = inferWorkspaceProjectCwd(task) 511 if (inferredCwd) return inferredCwd 512 513 const sourceSessionId = typeof task.createdInSessionId === 'string' ? task.createdInSessionId.trim() : '' 514 const sourceSessionCwd = sourceSessionId 515 ? normalizeDirCandidate(sessions[sourceSessionId]?.cwd, workspaceRoot) 516 : null 517 if (sourceSessionCwd && path.resolve(sourceSessionCwd) !== workspaceRoot) return sourceSessionCwd 518 519 const runSessionId = typeof task.sessionId === 'string' ? task.sessionId.trim() : '' 520 const runSessionCwd = runSessionId 521 ? normalizeDirCandidate(sessions[runSessionId]?.cwd, workspaceRoot) 522 : null 523 if (runSessionCwd && path.resolve(runSessionCwd) !== workspaceRoot) return runSessionCwd 524 525 const sandboxDir = path.join(workspaceRoot, 'tasks', task.id) 526 fs.mkdirSync(sandboxDir, { recursive: true }) 527 return sandboxDir 528 } 529 530 function queueContains(queue: string[], id: string): boolean { 531 return queue.includes(id) 532 } 533 534 function isCancelledTask(task: Partial<BoardTask> | null | undefined): boolean { 535 return task?.status === 'cancelled' 536 } 537 538 function pushQueueUnique(queue: string[], id: string): void { 539 if (!queueContains(queue, id)) queue.push(id) 540 } 541 542 function isAgentCreatedTask(task: Partial<BoardTask> | null | undefined): boolean { 543 return Boolean(typeof task?.createdByAgentId === 'string' && task.createdByAgentId.trim()) 544 } 545 546 function resolveTaskTerminalChatSessionId( 547 task: BoardTask, 548 sessions: Record<string, SessionLike>, 549 ): string | null { 550 if (task.status !== 'completed' && task.status !== 'failed') return null 551 if (task.sourceType === 'schedule') return null 552 if (isAgentCreatedTask(task)) return null 553 const createdInSessionId = typeof task.createdInSessionId === 'string' 554 ? task.createdInSessionId.trim() 555 : '' 556 return createdInSessionId && sessions[createdInSessionId] ? createdInSessionId : null 557 } 558 559 interface TaskResultDeliveryData { 560 statusLabel: 'completed' | 'failed' 561 resultBody: string 562 outputFileRefs: string[] 563 firstImage?: NonNullable<BoardTask['artifacts']>[number] 564 followupMediaPath?: string 565 mediaFileName?: string 566 execCwd: string 567 resumeLines: string[] 568 } 569 570 function collectTaskResultDeliveryData( 571 task: BoardTask, 572 sessions: Record<string, SessionLike>, 573 ): TaskResultDeliveryData { 574 const runSessionId = typeof task.sessionId === 'string' ? task.sessionId : '' 575 const runSession = runSessionId ? sessions[runSessionId] : null 576 const fallbackText = runSessionId ? latestAssistantText(runSessionId) : '' 577 const taskResult = extractTaskResult( 578 runSessionId ? getMessages(runSessionId) : [], 579 task.result || fallbackText || null, 580 { sinceTime: typeof task.startedAt === 'number' ? task.startedAt : null }, 581 ) 582 const resultBody = formatResultBody(taskResult) 583 const outputFileRefs = Array.isArray(task.outputFiles) && task.outputFiles.length > 0 584 ? task.outputFiles 585 : extractLikelyOutputFiles(resultBody) 586 const firstImage = taskResult.artifacts.find((artifact) => artifact.type === 'image') 587 const firstArtifactMediaPath = taskResult.artifacts 588 .map((artifact) => maybeResolveUploadMediaPathFromUrl(artifact.url)) 589 .find((candidate): candidate is string => Boolean(candidate)) 590 const resumeLines: string[] = [] 591 if (task.claudeResumeId) resumeLines.push(`Claude session: \`${task.claudeResumeId}\``) 592 if (task.codexResumeId) resumeLines.push(`Codex thread: \`${task.codexResumeId}\``) 593 if (task.opencodeResumeId) resumeLines.push(`OpenCode session: \`${task.opencodeResumeId}\``) 594 if (task.geminiResumeId) resumeLines.push(`Gemini session: \`${task.geminiResumeId}\``) 595 if (resumeLines.length === 0 && task.cliResumeId) { 596 resumeLines.push(`${task.cliProvider || 'CLI'} session: \`${task.cliResumeId}\``) 597 } 598 const execCwd = runSession?.cwd || '' 599 const existingOutputPaths = outputFileRefs 600 .map((fileRef: string) => resolveExistingOutputFilePath(fileRef, execCwd)) 601 .filter((candidate: string | null): candidate is string => Boolean(candidate)) 602 const firstLocalOutputPath = existingOutputPaths.find((candidate: string) => isSendableAttachment(candidate)) 603 const followupMediaPath = firstArtifactMediaPath || firstLocalOutputPath || undefined 604 605 return { 606 statusLabel: task.status === 'completed' ? 'completed' : 'failed', 607 resultBody, 608 outputFileRefs, 609 firstImage, 610 followupMediaPath, 611 mediaFileName: followupMediaPath ? path.basename(followupMediaPath) : undefined, 612 execCwd, 613 resumeLines, 614 } 615 } 616 617 function buildTaskTerminalMessage( 618 prefix: string, 619 task: BoardTask, 620 delivery: TaskResultDeliveryData, 621 ): string { 622 const parts = [prefix] 623 if (delivery.execCwd) parts.push(`Working directory: \`${delivery.execCwd}\``) 624 if (delivery.outputFileRefs.length > 0) { 625 parts.push(`Output files:\n${delivery.outputFileRefs.slice(0, 8).map((fileRef: string) => `- \`${fileRef}\``).join('\n')}`) 626 } 627 if (task.completionReportPath) parts.push(`Task report: \`${task.completionReportPath}\``) 628 if (delivery.resumeLines.length > 0) parts.push(delivery.resumeLines.join(' | ')) 629 parts.push(delivery.resultBody || 'No summary.') 630 return parts.join('\n\n') 631 } 632 633 function latestAssistantText(sessionId: string | null | undefined): string { 634 if (!sessionId) return '' 635 const messages = getMessages(sessionId) 636 for (let i = messages.length - 1; i >= 0; i--) { 637 const msg = messages[i] 638 if (msg?.role !== 'assistant') continue 639 const text = typeof msg?.text === 'string' ? msg.text.trim() : '' 640 if (!text) continue 641 if (/^HEARTBEAT_OK$/i.test(text)) continue 642 return text 643 } 644 return '' 645 } 646 647 function queueTaskAutonomyObservation(input: { 648 runId: string 649 sessionId: string 650 taskId: string 651 agentId: string 652 status: 'completed' | 'failed' | 'cancelled' 653 resultText?: string | null 654 error?: string | null 655 toolEvents?: ExecuteChatTurnResult['toolEvents'] 656 sourceMessage?: string | null 657 }) { 658 void observeAutonomyRunOutcome({ 659 runId: input.runId, 660 sessionId: input.sessionId, 661 taskId: input.taskId, 662 agentId: input.agentId, 663 source: 'task', 664 status: input.status, 665 resultText: input.resultText, 666 error: input.error || undefined, 667 toolEvents: input.toolEvents, 668 sourceMessage: input.sourceMessage, 669 }).catch((err: unknown) => { 670 log.warn(TAG, `[queue] Autonomy observation failed for ${input.runId}:`, err) 671 }) 672 } 673 674 function hasFinishedExecutionSession(session: SessionLike | Session | null | undefined): boolean { 675 if (!session) return false 676 return session.active === false && !session.currentRunId 677 } 678 679 export function reconcileFinishedRunningTasks(): { reconciled: number; deadLettered: number } { 680 const tasks = loadTasks() 681 const sessions = loadSessions() as Record<string, SessionLike> 682 const settings = loadSettings() 683 const queue = loadQueue() 684 const now = Date.now() 685 let reconciled = 0 686 let deadLettered = 0 687 let tasksDirty = false 688 let sessionsDirty = false 689 let queueDirty = false 690 const terminalTasks: BoardTask[] = [] 691 692 for (const task of Object.values(tasks) as BoardTask[]) { 693 if (task.status !== 'running') continue 694 const sessionId = typeof task.sessionId === 'string' ? task.sessionId : '' 695 if (!sessionId) continue 696 const session = sessions[sessionId] 697 if (!hasFinishedExecutionSession(session)) continue 698 699 const fallbackText = latestAssistantText(sessionId) 700 if (!fallbackText && !task.result) { 701 task.status = 'failed' 702 task.result = 'Agent session finished without producing output.' 703 task.checkoutRunId = null 704 task.updatedAt = now 705 tasksDirty = true 706 continue 707 } 708 709 applyTaskPolicyDefaults(task) 710 const taskResult = extractTaskResult( 711 getMessages(sessionId), 712 task.result || fallbackText || null, 713 { sinceTime: typeof task.startedAt === 'number' ? task.startedAt : null }, 714 ) 715 const enrichedResult = formatResultBody(taskResult) 716 task.result = enrichedResult.slice(0, 4000) || null 717 task.artifacts = taskResult.artifacts.slice(0, 24) 718 task.outputFiles = extractLikelyOutputFiles(enrichedResult).slice(0, 24) 719 task.updatedAt = now 720 const { validation } = refreshTaskCompletionValidation(task, settings) 721 if (!task.comments) task.comments = [] 722 723 if (validation.ok) { 724 markValidatedTaskCompleted(task, { now }) 725 task.retryScheduledAt = null 726 task.deadLetteredAt = null 727 task.checkpoint = { 728 ...(task.checkpoint || {}), 729 lastRunId: sessionId, 730 lastSessionId: sessionId, 731 note: 'Recovered completed task state from finished session.', 732 updatedAt: now, 733 } 734 task.comments.push({ 735 id: genId(), 736 author: 'System', 737 text: 'Recovered completed task state from a finished execution session.', 738 createdAt: now, 739 }) 740 reconciled++ 741 terminalTasks.push(task) 742 } else { 743 const failureReason = formatValidationFailure(validation.reasons).slice(0, 500) 744 const retryState = scheduleRetryOrDeadLetter(task, failureReason) 745 task.completedAt = retryState === 'dead_lettered' ? null : task.completedAt 746 task.comments.push({ 747 id: genId(), 748 author: 'System', 749 text: `Recovered finished session but the task result failed validation.\n\n${validation.reasons.map((reason) => `- ${reason}`).join('\n')}`, 750 createdAt: now, 751 }) 752 if (retryState === 'retry') { 753 pushQueueUnique(queue, task.id) 754 queueDirty = true 755 reconciled++ 756 pushMainLoopEventToMainSessions({ 757 type: 'task_retry_scheduled', 758 text: `Task retry scheduled: "${task.title}" (${task.id}) attempt ${task.attempts}/${task.maxAttempts} in ${task.retryBackoffSec}s.`, 759 }) 760 } else { 761 deadLettered++ 762 terminalTasks.push(task) 763 } 764 } 765 766 if (session.heartbeatEnabled !== false) { 767 session.heartbeatEnabled = false 768 session.lastActiveAt = now 769 sessionsDirty = true 770 } 771 tasksDirty = true 772 } 773 774 if (tasksDirty) { 775 saveTasks(tasks) 776 notify('tasks') 777 notify('runs') 778 } 779 if (sessionsDirty) saveSessions(sessions as Record<string, Session>) 780 if (queueDirty) saveQueue(queue) 781 782 for (const task of terminalTasks) { 783 if (task.status === 'completed') { 784 logActivity({ entityType: 'task', entityId: task.id, action: 'completed', actor: 'system', actorId: task.agentId, summary: `Task completed: "${task.title}"` }) 785 pushMainLoopEventToMainSessions({ 786 type: 'task_completed', 787 text: `Task completed: "${task.title}" (${task.id})`, 788 }) 789 notifyOrchestrators(`Task completed: "${task.title}"`, `task-complete:${task.id}`) 790 } else if (task.status === 'failed') { 791 logActivity({ entityType: 'task', entityId: task.id, action: 'failed', actor: 'system', actorId: task.agentId, summary: `Task failed: "${task.title}"` }) 792 pushMainLoopEventToMainSessions({ 793 type: 'task_failed', 794 text: `Task failed validation: "${task.title}" (${task.id})`, 795 }) 796 notifyOrchestrators(`Task failed: "${task.title}" — validation failure`, `task-fail:${task.id}`) 797 } 798 handleTerminalTaskResultDeliveries(task) 799 cleanupTerminalOneOffSchedule(task) 800 } 801 802 return { reconciled, deadLettered } 803 } 804 805 function cleanupTerminalOneOffSchedule(task: BoardTask): void { 806 void task 807 } 808 809 function pushUserFacingTaskResult(task: BoardTask, sessions: Record<string, SessionLike>): void { 810 if (task.status !== 'completed' && task.status !== 'failed') return 811 const targetSessionId = resolveTaskTerminalChatSessionId(task, sessions) 812 if (!targetSessionId) return 813 const targetSession = sessions[targetSessionId] 814 if (!targetSession) return 815 816 const delivery = collectTaskResultDeliveryData(task, sessions) 817 const taskLink = `[${task.title}](#task:${task.id})` 818 const body = buildTaskTerminalMessage(`Task ${delivery.statusLabel}: **${taskLink}**`, task, delivery) 819 const now = Date.now() 820 const lastMsg = getLastMessage(targetSessionId) 821 if (lastMsg?.role === 'assistant' && lastMsg?.text === body && typeof lastMsg?.time === 'number' && now - lastMsg.time < 30_000) { 822 return 823 } 824 825 const message: Message = { 826 role: 'assistant', 827 text: body, 828 time: now, 829 kind: 'system', 830 } 831 if (delivery.firstImage) message.imageUrl = delivery.firstImage.url 832 appendMessage(targetSessionId, message) 833 notify(`messages:${targetSessionId}`) 834 } 835 836 function deliverTaskConnectorFollowups(task: BoardTask, sessions: Record<string, SessionLike>): void { 837 if (task.status !== 'completed' && task.status !== 'failed') return 838 const delivery = collectTaskResultDeliveryData(task, sessions) 839 void notifyConnectorTaskFollowups({ 840 task, 841 statusLabel: delivery.statusLabel, 842 summaryText: delivery.resultBody || '', 843 imageUrl: delivery.firstImage?.url, 844 mediaPath: delivery.followupMediaPath, 845 mediaFileName: delivery.mediaFileName, 846 }) 847 } 848 849 function handleTerminalTaskResultDeliveries(task: BoardTask): void { 850 const sessions = loadSessions() as Record<string, SessionLike> 851 pushUserFacingTaskResult(task, sessions) 852 deliverTaskConnectorFollowups(task, sessions) 853 } 854 855 /** Disable heartbeat on a task's session when the task finishes. */ 856 export function disableSessionHeartbeat(sessionId: string | null | undefined) { 857 if (!sessionId) return 858 const sessions = loadSessions() 859 const session = sessions[sessionId] 860 if (!session || session.heartbeatEnabled === false) return 861 session.heartbeatEnabled = false 862 session.lastActiveAt = Date.now() 863 saveSessions(sessions) 864 log.info(TAG, `[queue] Disabled heartbeat on session ${sessionId} (task finished)`) 865 } 866 867 export function enqueueTask(taskId: string) { 868 const tasks = loadTasks() 869 const task = tasks[taskId] as BoardTask | undefined 870 if (!task) return 871 872 applyTaskPolicyDefaults(task) 873 task.status = 'queued' 874 task.queuedAt = Date.now() 875 task.retryScheduledAt = null 876 task.updatedAt = Date.now() 877 saveTasks(tasks) 878 879 const queue = loadQueue() 880 pushQueueUnique(queue, taskId) 881 saveQueue(queue) 882 883 logActivity({ entityType: 'task', entityId: taskId, action: 'queued', actor: 'system', summary: `Task queued: "${task.title}"` }) 884 885 pushMainLoopEventToMainSessions({ 886 type: 'task_queued', 887 text: `Task queued: "${task.title}" (${task.id})`, 888 }) 889 890 // If processNext is at capacity, mark a pending kick so it picks up work when a slot frees 891 if (_queueState.activeCount >= _queueState.maxConcurrent) { 892 _queueState.pendingKick = true 893 } 894 // Delay before kicking worker so UI shows the queued state 895 setTimeout(() => processNext(), 2000) 896 } 897 898 /** 899 * Re-validate all completed tasks so the completed queue only contains 900 * tasks with concrete completion evidence. 901 */ 902 export function validateCompletedTasksQueue() { 903 const tasks = loadTasks() 904 const sessions = loadSessions() 905 const settings = loadSettings() 906 const now = Date.now() 907 let checked = 0 908 let demoted = 0 909 let tasksDirty = false 910 let sessionsDirty = false 911 912 for (const task of Object.values(tasks) as BoardTask[]) { 913 if (task.status !== 'completed') continue 914 checked++ 915 916 const previousValidation = task.validation || null 917 const previousReportPath = task.completionReportPath || null 918 const { validation } = refreshTaskCompletionValidation(task, settings) 919 if (task.completionReportPath !== previousReportPath) { 920 tasksDirty = true 921 } 922 const validationChanged = didTaskValidationChange(previousValidation, validation) 923 924 if (validationChanged) { 925 tasksDirty = true 926 } 927 928 if (validation.ok) { 929 if (!task.completedAt) { 930 markValidatedTaskCompleted(task, { now, preserveCompletedAt: true }) 931 tasksDirty = true 932 } 933 continue 934 } 935 936 markInvalidCompletedTaskFailed(task, validation, { 937 now, 938 comment: { 939 author: 'System', 940 text: `Task auto-failed completed-queue validation.\n\n${validation.reasons.map((r) => `- ${r}`).join('\n')}`, 941 }, 942 }) 943 tasksDirty = true 944 demoted++ 945 946 if (task.sessionId) { 947 const session = sessions[task.sessionId] 948 if (session && session.heartbeatEnabled !== false) { 949 session.heartbeatEnabled = false 950 session.lastActiveAt = now 951 sessionsDirty = true 952 } 953 } 954 } 955 956 if (tasksDirty) { saveTasks(tasks); notify('tasks') } 957 if (sessionsDirty) saveSessions(sessions) 958 if (demoted > 0) { 959 log.warn(TAG, `[queue] Demoted ${demoted} invalid completed task(s) to failed after validation audit`) 960 } 961 return { checked, demoted } 962 } 963 964 function scheduleRetryOrDeadLetter(task: BoardTask, reason: string): 'retry' | 'dead_lettered' { 965 if (isCancelledTask(task)) { 966 task.retryScheduledAt = null 967 task.deadLetteredAt = null 968 task.checkoutRunId = null 969 task.updatedAt = Date.now() 970 return 'dead_lettered' 971 } 972 applyTaskPolicyDefaults(task) 973 const now = Date.now() 974 task.attempts = (task.attempts || 0) + 1 975 976 if ((task.attempts || 0) < (task.maxAttempts || 1)) { 977 const delayMs = jitteredBackoff((task.retryBackoffSec || 30) * 1000, Math.max(0, (task.attempts || 1) - 1), 6 * 3600_000) 978 task.status = 'queued' 979 task.retryScheduledAt = now + delayMs 980 // Release the prior checkout so the task can be checked out again on retry. 981 // Without this, checkoutTask() returns null every attempt and the orphan- 982 // recovery loop burns CPU re-queueing a task that can never run. 983 task.checkoutRunId = null 984 task.updatedAt = now 985 task.error = `Retry scheduled after failure: ${reason}`.slice(0, 500) 986 if (!task.comments) task.comments = [] 987 task.comments.push({ 988 id: genId(), 989 author: 'System', 990 text: `Attempt ${task.attempts}/${task.maxAttempts} failed. Retrying in ${Math.round(delayMs / 1000)}s.\n\nReason: ${reason}`, 991 createdAt: now, 992 }) 993 return 'retry' 994 } 995 996 task.status = 'failed' 997 task.deadLetteredAt = now 998 task.retryScheduledAt = null 999 task.checkoutRunId = null 1000 task.updatedAt = now 1001 task.error = `Dead-lettered after ${task.attempts}/${task.maxAttempts} attempts: ${reason}`.slice(0, 500) 1002 if (!task.comments) task.comments = [] 1003 task.comments.push({ 1004 id: genId(), 1005 author: 'System', 1006 text: `Task moved to dead-letter after ${task.attempts}/${task.maxAttempts} attempts.\n\nReason: ${reason}`, 1007 createdAt: now, 1008 }) 1009 notifyOrchestrators(`Task failed: "${task.title}" — ${(reason || 'unknown error').slice(0, 100)}`, `task-fail:${task.id}`) 1010 if (task.sessionId) { 1011 const failure = classifyRuntimeFailure({ source: 'task', message: reason }) 1012 recordSupervisorIncident({ 1013 runId: task.id, 1014 sessionId: task.sessionId, 1015 taskId: task.id, 1016 agentId: task.agentId || null, 1017 source: 'task', 1018 kind: 'runtime_failure', 1019 severity: failure.severity, 1020 summary: `Task dead-lettered: ${reason}`.slice(0, 320), 1021 details: reason, 1022 failureFamily: failure.family, 1023 remediation: failure.remediation, 1024 repairPrompt: failure.repairPrompt, 1025 autoAction: null, 1026 }) 1027 } 1028 1029 // Guardian recovery is approval-backed. Dead-lettering prepares a restore 1030 // request instead of mutating the workspace automatically. 1031 const agents = loadAgents() 1032 const agent = task.agentId ? agents[task.agentId] : null 1033 if (agent?.autoRecovery) { 1034 const cwd = task.projectId 1035 ? path.join(WORKSPACE_DIR, 'projects', task.projectId) 1036 : WORKSPACE_DIR 1037 const recovery = prepareGuardianRecovery({ 1038 cwd, 1039 reason, 1040 requester: `task:${task.id}`, 1041 }) 1042 if (recovery.ok && recovery.approval) { 1043 task.comments.push({ 1044 id: genId(), 1045 author: 'Guardian', 1046 text: `Recovery prepared for checkpoint ${recovery.checkpoint?.head.slice(0, 12) || 'unknown'}.\n\nApprove restore request ${recovery.approval.id} to roll the workspace back safely.`, 1047 createdAt: now + 1, 1048 }) 1049 } else { 1050 task.comments.push({ 1051 id: genId(), 1052 author: 'Guardian', 1053 text: `Recovery advisory: ${recovery.reason || 'Unable to prepare a restore request.'}`, 1054 createdAt: now + 1, 1055 }) 1056 } 1057 } 1058 1059 return 'dead_lettered' 1060 } 1061 1062 export function dequeueNextRunnableTask(queue: string[], tasks: Record<string, BoardTask>): string | null { 1063 const now = Date.now() 1064 1065 // Remove stale entries first. 1066 for (let i = queue.length - 1; i >= 0; i--) { 1067 const id = queue[i] 1068 const task = tasks[id] 1069 if (!task || task.status !== 'queued') queue.splice(i, 1) 1070 } 1071 1072 const idx = queue.findIndex((id) => { 1073 const task = tasks[id] 1074 if (!task) return false 1075 const retryAt = typeof task.retryScheduledAt === 'number' ? task.retryScheduledAt : null 1076 if (retryAt && retryAt > now) return false 1077 const blockers = Array.isArray(task.blockedBy) ? task.blockedBy : [] 1078 if (blockers.some((blockerId) => tasks[blockerId]?.status !== 'completed')) return false 1079 // Skip pool-mode tasks that haven't been claimed yet 1080 if (task.assignmentMode === 'pool' && !task.claimedByAgentId) return false 1081 return true 1082 }) 1083 if (idx === -1) return null 1084 const [taskId] = queue.splice(idx, 1) 1085 return taskId || null 1086 } 1087 1088 export async function processNext() { 1089 const settings = loadSettings() 1090 _queueState.maxConcurrent = normalizeInt( 1091 (settings as Record<string, unknown>).taskQueueConcurrency, 3, 1, 10 1092 ) 1093 1094 if (_queueState.activeCount >= _queueState.maxConcurrent) { 1095 _queueState.pendingKick = true 1096 return 1097 } 1098 _queueState.activeCount++ 1099 const endQueuePerf = perf.start('queue', 'processNext') 1100 1101 try { 1102 // Recover orphaned tasks: status is 'queued' but missing from the queue array 1103 // Only run from the first worker to avoid redundant scans 1104 if (_queueState.activeCount === 1) { 1105 const allTasks = loadTasks() 1106 const currentQueue = loadQueue() 1107 const queueSet = new Set(currentQueue) 1108 let recovered = false 1109 let tasksDirty = false 1110 for (const [id, t] of Object.entries(allTasks) as [string, BoardTask][]) { 1111 if (t.status === 'queued' && !queueSet.has(id)) { 1112 log.info(TAG, `[queue] Recovering orphaned queued task: "${t.title}" (${id})`) 1113 // Defence in depth: a queued task must not carry a stale checkoutRunId 1114 // (left over from pre-1.5.38 retries). If it does, checkoutTask() will 1115 // reject every attempt and this orphan-recovery loop will spin at 100% 1116 // CPU re-queueing a task that can never run. 1117 if (t.checkoutRunId) { 1118 t.checkoutRunId = null 1119 tasksDirty = true 1120 } 1121 pushQueueUnique(currentQueue, id) 1122 recovered = true 1123 } 1124 } 1125 if (tasksDirty) saveTasks(allTasks) 1126 if (recovered) saveQueue(currentQueue) 1127 } 1128 1129 // Process ONE task per invocation (no while loop) 1130 { 1131 const tasks = loadTasks() 1132 const queue = loadQueue() 1133 if (queue.length === 0) return 1134 1135 const taskId = dequeueNextRunnableTask(queue, tasks as Record<string, BoardTask>) 1136 saveQueue(queue) 1137 if (!taskId) return 1138 const latestTasks = loadTasks() as Record<string, BoardTask> 1139 let task = latestTasks[taskId] as BoardTask | undefined 1140 1141 if (!task || task.status !== 'queued') { 1142 return 1143 } 1144 1145 // Dependency guard: skip tasks whose blockers are not all completed 1146 const blockers = Array.isArray(task.blockedBy) ? task.blockedBy as string[] : [] 1147 if (blockers.length > 0) { 1148 const allBlockersDone = blockers.every((bid) => { 1149 const blocker = latestTasks[bid] as BoardTask | undefined 1150 return blocker?.status === 'completed' 1151 }) 1152 if (!allBlockersDone) { 1153 // Put it back in the queue and skip 1154 pushQueueUnique(queue, taskId) 1155 saveQueue(queue) 1156 log.info(TAG, `[queue] Skipping task "${task.title}" (${taskId}) — blocked by incomplete dependencies`) 1157 return 1158 } 1159 } 1160 1161 const agents = loadAgents() 1162 let agent = agents[task.agentId] 1163 if (!agent) { 1164 task.status = 'failed' 1165 task.deadLetteredAt = Date.now() 1166 task.checkoutRunId = null 1167 task.error = `Agent ${task.agentId} not found` 1168 task.updatedAt = Date.now() 1169 saveTasks(latestTasks) 1170 pushMainLoopEventToMainSessions({ 1171 type: 'task_failed', 1172 text: `Task failed: "${task.title}" (${task.id}) — agent not found.`, 1173 }) 1174 return 1175 } 1176 1177 // Capability matching — reroute if assigned agent doesn't have required capabilities 1178 const reqCaps = Array.isArray(task.requiredCapabilities) ? task.requiredCapabilities as string[] : [] 1179 if (reqCaps.length > 0 && !matchesCapabilities(agent.capabilities, reqCaps)) { 1180 const candidates = filterAgentsByCapabilities(agents, reqCaps) 1181 .filter((a) => a.id !== agent!.id && !a.disabled) 1182 if (candidates.length > 0) { 1183 // Pick best match by capability score, then alphabetically for stability 1184 candidates.sort((a, b) => { 1185 const scoreA = capabilityMatchScore(a.capabilities, reqCaps) 1186 const scoreB = capabilityMatchScore(b.capabilities, reqCaps) 1187 if (scoreB !== scoreA) return scoreB - scoreA 1188 return a.name.localeCompare(b.name) 1189 }) 1190 const rerouted = candidates[0] 1191 log.info(TAG, `[queue] Rerouting task "${task.title}" (${taskId}) from agent "${agent.name}" to "${rerouted.name}" — capability match`) 1192 task.agentId = rerouted.id 1193 agent = rerouted 1194 } else { 1195 task.status = 'failed' 1196 task.deadLetteredAt = Date.now() 1197 task.checkoutRunId = null 1198 task.error = `No agent matches required capabilities: [${reqCaps.join(', ')}]` 1199 task.updatedAt = Date.now() 1200 saveTasks(latestTasks) 1201 pushMainLoopEventToMainSessions({ 1202 type: 'task_failed', 1203 text: `Task failed: "${task.title}" (${task.id}) — no agent matches required capabilities [${reqCaps.join(', ')}].`, 1204 }) 1205 return 1206 } 1207 } 1208 1209 if (isAgentDisabled(agent)) { 1210 const now = Date.now() 1211 task.deferredReason = buildAgentDisabledMessage(agent, 'process queued tasks') 1212 task.status = 'deferred' 1213 task.updatedAt = now 1214 task.retryScheduledAt = null 1215 saveTasks(latestTasks) 1216 notify('tasks') 1217 pushMainLoopEventToMainSessions({ 1218 type: 'task_deferred', 1219 text: `Task deferred: "${task.title}" (${task.id}) — agent ${task.agentId} is disabled.`, 1220 }) 1221 return 1222 } 1223 1224 // Budget enforcement gate 1225 const typedAgent = agent as Agent 1226 if (typedAgent.monthlyBudget || typedAgent.dailyBudget || typedAgent.hourlyBudget) { 1227 try { 1228 const budgetCheck = checkAgentBudgetLimits(typedAgent) 1229 if (!budgetCheck.ok) { 1230 const now = Date.now() 1231 const exceeded = budgetCheck.exceeded[0] 1232 task.status = 'deferred' 1233 task.deferredReason = exceeded?.message || 'Agent budget exceeded' 1234 task.retryScheduledAt = null 1235 task.updatedAt = now 1236 saveTasks(latestTasks) 1237 notify('tasks') 1238 1239 recordSupervisorIncident({ 1240 runId: task.id, 1241 sessionId: task.sessionId || '', 1242 taskId: task.id, 1243 agentId: typedAgent.id, 1244 source: 'task', 1245 kind: 'budget_pressure', 1246 severity: 'high', 1247 summary: exceeded?.message || `Agent "${typedAgent.name}" budget exceeded, task deferred.`, 1248 autoAction: 'budget_trim', 1249 }) 1250 return 1251 } 1252 } catch {} 1253 } 1254 1255 // Atomic checkout — prevents two runners from starting the same task 1256 const runId = genId() 1257 task = checkoutTask(taskId, runId) as BoardTask | undefined 1258 if (!task) return 1259 applyTaskPolicyDefaults(task) 1260 logActivity({ entityType: 'task', entityId: taskId, action: 'running', actor: 'system', actorId: task.agentId, summary: `Task started: "${task.title}"` }) 1261 1262 // Reload tasks map for resolution functions and final save (checkoutTask already saved the running status) 1263 const allTasks = loadTasks() as Record<string, BoardTask> 1264 allTasks[taskId] = task 1265 const sessionsForCwd = loadSessions() as Record<string, SessionLike> 1266 const taskCwd = resolveTaskExecutionCwd(task as ScheduleTaskMeta, sessionsForCwd) 1267 task.cwd = taskCwd 1268 let sessionId = '' 1269 const scheduleTask = task as ScheduleTaskMeta 1270 const isScheduleTask = scheduleTask.sourceType === 'schedule' 1271 const sourceScheduleId = typeof scheduleTask.sourceScheduleId === 'string' 1272 ? scheduleTask.sourceScheduleId 1273 : '' 1274 const reusableTaskSessionId = resolveReusableTaskSessionId(task, allTasks, sessionsForCwd) 1275 const resumeContext = resolveTaskResumeContext(task, allTasks, sessionsForCwd as Record<string, SessionLike | Session>) 1276 1277 // Resolve the agent's persistent thread session to use as parentSessionId 1278 const agentThreadSessionId = agent.threadSessionId || null 1279 const taskRoutePreferences = deriveTaskRoutePreferences(task) 1280 1281 if (isScheduleTask && sourceScheduleId) { 1282 const schedules = loadSchedules() 1283 const linkedSchedule = schedules[sourceScheduleId] 1284 const linkedScheduleRecord = linkedSchedule as unknown as Record<string, unknown> | undefined 1285 const existingSessionId = typeof linkedScheduleRecord?.lastSessionId === 'string' 1286 ? linkedScheduleRecord.lastSessionId 1287 : '' 1288 if (existingSessionId) { 1289 const sessions = loadSessions() 1290 if (sessions[existingSessionId]) { 1291 sessionId = existingSessionId 1292 } 1293 } 1294 if (!sessionId) { 1295 sessionId = createAgentTaskSession( 1296 agent, 1297 task.title, 1298 agentThreadSessionId || undefined, 1299 taskCwd, 1300 taskRoutePreferences, 1301 ) 1302 } 1303 if (linkedScheduleRecord && linkedScheduleRecord.lastSessionId !== sessionId) { 1304 linkedScheduleRecord.lastSessionId = sessionId 1305 linkedScheduleRecord.updatedAt = Date.now() 1306 const updatedLinkedSchedule = linkedScheduleRecord as unknown as typeof linkedSchedule 1307 schedules[sourceScheduleId] = updatedLinkedSchedule 1308 saveSchedules(schedules) 1309 } 1310 } else { 1311 sessionId = reusableTaskSessionId || createAgentTaskSession( 1312 agent, 1313 task.title, 1314 agentThreadSessionId || undefined, 1315 taskCwd, 1316 taskRoutePreferences, 1317 ) 1318 } 1319 1320 const executionSessions = loadSessions() as Record<string, Session> 1321 const executionSession = executionSessions[sessionId] 1322 const seededResumeState = executionSession 1323 ? applyTaskResumeStateToSession(executionSession, resumeContext?.resume) 1324 : false 1325 if (seededResumeState) saveSessions(executionSessions) 1326 1327 task.sessionId = sessionId 1328 const reusedExistingSession = !isScheduleTask && Boolean(reusableTaskSessionId) && reusableTaskSessionId === sessionId 1329 const continuationBits: string[] = [] 1330 if (reusedExistingSession) { 1331 continuationBits.push('reusing prior session') 1332 } 1333 if (resumeContext?.source === 'delegated_from_task' || resumeContext?.source === 'blocked_by') { 1334 continuationBits.push(`seeded from task ${resumeContext.sourceTaskId}`) 1335 } else if (seededResumeState) { 1336 continuationBits.push('restored CLI resume handles') 1337 } 1338 task.checkpoint = { 1339 lastSessionId: sessionId, 1340 note: `Attempt ${(task.attempts || 0) + 1}/${task.maxAttempts || '?'} started${continuationBits.length ? ` (${continuationBits.join('; ')})` : ''}`, 1341 updatedAt: Date.now(), 1342 } 1343 saveTasks(allTasks) 1344 pushMainLoopEventToMainSessions({ 1345 type: 'task_running', 1346 text: `Task running: "${task.title}" (${task.id}) with ${agent.name}`, 1347 }) 1348 1349 // Save initial assistant message so user sees context when opening the session 1350 { 1351 const sessionExists = Boolean(loadSessions()[sessionId]) 1352 if (sessionExists) { 1353 const isDelegation = (task as unknown as Record<string, unknown>).sourceType === 'delegation' 1354 let initialText: string 1355 if (isDelegation) { 1356 const delegatorId = (task as unknown as Record<string, unknown>).delegatedByAgentId as string | undefined 1357 const delegator = delegatorId ? agents[delegatorId] : null 1358 const prefix = `[delegation-source:${delegatorId || ''}:${delegator?.name || 'Agent'}:${delegator?.avatarSeed || ''}]` 1359 initialText = `${prefix}\nDelegated by **${delegator?.name || 'another agent'}** | [${task.title}](#task:${task.id})\n\n${task.description || ''}\n\nWorking directory: \`${taskCwd}\`${buildTaskContinuationNote(Boolean(reusedExistingSession), resumeContext)}\n\nI'll begin working on this now.` 1360 } else { 1361 initialText = `Starting task: **${task.title}**\n\n${task.description || ''}\n\nWorking directory: \`${taskCwd}\`${buildTaskContinuationNote(Boolean(reusedExistingSession), resumeContext)}\n\nI'll begin working on this now.` 1362 } 1363 // Inject upstream task results context 1364 if (Array.isArray(task.upstreamResults) && task.upstreamResults.length > 0) { 1365 const upstreamBlock = task.upstreamResults 1366 .map((ur) => `### ${ur.taskTitle}\n${ur.resultPreview || '(no result)'}`) 1367 .join('\n\n') 1368 initialText += `\n\n## Context from upstream tasks\n\n${upstreamBlock}` 1369 } 1370 appendMessage(sessionId, { 1371 role: 'assistant', 1372 text: initialText, 1373 time: Date.now(), 1374 ...(isDelegation ? { kind: 'system' as const } : {}), 1375 }) 1376 } 1377 } 1378 1379 log.info(TAG, `[queue] Running task "${task.title}" (${taskId}) with ${agent.name}`) 1380 1381 try { 1382 const taskRunId = `${taskId}:attempt-${(task.attempts || 0) + 1}` 1383 const endTaskRunPerf = perf.start('queue', 'executeTaskRun', { taskId, agentName: agent.name }) 1384 const taskRunHandle = enqueueExecution({ 1385 kind: 'task_attempt', 1386 task, 1387 agent, 1388 sessionId, 1389 executionId: taskRunId, 1390 }) 1391 const taskRun = await taskRunHandle.promise 1392 endTaskRunPerf() 1393 // Update lastActivityAt after execution completes (idle timeout tracking) 1394 { 1395 const latestTasks = loadTasks() as Record<string, BoardTask> 1396 const updatedTask = latestTasks[taskId] 1397 if (updatedTask) { 1398 updatedTask.lastActivityAt = Date.now() 1399 saveTasks(latestTasks) 1400 } 1401 } 1402 const result = taskRun.error 1403 ? (taskRun.text || `Error: ${taskRun.error}`) 1404 : taskRun.text 1405 const t2 = loadTasks() 1406 const settings = loadSettings() 1407 if (isCancelledTask(t2[taskId])) { 1408 disableSessionHeartbeat(t2[taskId].sessionId) 1409 notify('tasks') 1410 notify('runs') 1411 queueTaskAutonomyObservation({ 1412 runId: taskRunId, 1413 sessionId, 1414 taskId, 1415 agentId: agent.id, 1416 status: 'cancelled', 1417 error: t2[taskId].error || 'Task cancelled', 1418 toolEvents: taskRun.toolEvents, 1419 sourceMessage: task.description || task.title, 1420 }) 1421 log.warn(TAG, `[queue] Task "${task.title}" cancelled during execution`) 1422 return 1423 } 1424 if (t2[taskId]) { 1425 applyTaskPolicyDefaults(t2[taskId]) 1426 // Structured extraction: Zod-validated result with typed artifacts 1427 const taskResult = extractTaskResult( 1428 getMessages(sessionId), 1429 result || null, 1430 { sinceTime: typeof t2[taskId].startedAt === 'number' ? t2[taskId].startedAt : null }, 1431 ) 1432 const enrichedResult = formatResultBody(taskResult) 1433 t2[taskId].result = enrichedResult.slice(0, 4000) || null 1434 t2[taskId].artifacts = taskResult.artifacts.slice(0, 24) 1435 t2[taskId].outputFiles = extractLikelyOutputFiles(enrichedResult).slice(0, 24) 1436 t2[taskId].updatedAt = Date.now() 1437 const { validation } = refreshTaskCompletionValidation(t2[taskId], settings) 1438 1439 const now = Date.now() 1440 // Add a completion/failure comment from the executing agent. 1441 if (!t2[taskId].comments) t2[taskId].comments = [] 1442 1443 if (validation.ok) { 1444 markValidatedTaskCompleted(t2[taskId], { now }) 1445 t2[taskId].retryScheduledAt = null 1446 t2[taskId].checkpoint = { 1447 ...(t2[taskId].checkpoint || {}), 1448 lastRunId: sessionId, 1449 lastSessionId: sessionId, 1450 note: `Completed on attempt ${t2[taskId].attempts || 0}/${t2[taskId].maxAttempts || '?'}`, 1451 updatedAt: now, 1452 } 1453 t2[taskId].comments!.push({ 1454 id: genId(), 1455 author: agent.name, 1456 agentId: agent.id, 1457 text: `Task completed.\n\n${result?.slice(0, 1000) || 'No summary provided.'}`, 1458 createdAt: now, 1459 }) 1460 } else { 1461 const failureReason = formatValidationFailure(validation.reasons).slice(0, 500) 1462 const retryState = scheduleRetryOrDeadLetter(t2[taskId], failureReason) 1463 t2[taskId].completedAt = retryState === 'dead_lettered' ? null : t2[taskId].completedAt 1464 t2[taskId].comments!.push({ 1465 id: genId(), 1466 author: agent.name, 1467 agentId: agent.id, 1468 text: `Task failed validation and was not marked completed.\n\n${validation.reasons.map((r) => `- ${r}`).join('\n')}`, 1469 createdAt: now, 1470 }) 1471 if (retryState === 'retry') { 1472 const qRetry = loadQueue() 1473 pushQueueUnique(qRetry, taskId) 1474 saveQueue(qRetry) 1475 pushMainLoopEventToMainSessions({ 1476 type: 'task_retry_scheduled', 1477 text: `Task retry scheduled: "${task.title}" (${taskId}) attempt ${t2[taskId].attempts}/${t2[taskId].maxAttempts} in ${t2[taskId].retryBackoffSec}s.`, 1478 }) 1479 } 1480 } 1481 1482 // Copy ALL CLI resume IDs from the execution session to the task record 1483 try { 1484 const execSessions = loadSessions() 1485 const execSession = execSessions[sessionId] as unknown as Record<string, unknown> | undefined 1486 if (execSession) { 1487 const delegateIds = execSession.delegateResumeIds as 1488 | { claudeCode?: string | null; codex?: string | null; opencode?: string | null; gemini?: string | null; cursor?: string | null; qwen?: string | null } 1489 | undefined 1490 // Store each CLI resume ID separately 1491 const claudeId = (execSession.claudeSessionId as string) || delegateIds?.claudeCode || null 1492 const codexId = (execSession.codexThreadId as string) || delegateIds?.codex || null 1493 const opencodeId = (execSession.opencodeSessionId as string) || delegateIds?.opencode || null 1494 const geminiId = delegateIds?.gemini || null 1495 const cursorId = (execSession.cursorSessionId as string) || delegateIds?.cursor || null 1496 const qwenId = (execSession.qwenSessionId as string) || delegateIds?.qwen || null 1497 if (claudeId) t2[taskId].claudeResumeId = claudeId 1498 if (codexId) t2[taskId].codexResumeId = codexId 1499 if (opencodeId) t2[taskId].opencodeResumeId = opencodeId 1500 if (geminiId) t2[taskId].geminiResumeId = geminiId 1501 // Keep backward-compat single field (first available) 1502 const primaryId = claudeId || codexId || opencodeId || geminiId || cursorId || qwenId 1503 if (primaryId) { 1504 t2[taskId].cliResumeId = primaryId 1505 if (claudeId) t2[taskId].cliProvider = 'claude-cli' 1506 else if (codexId) t2[taskId].cliProvider = 'codex-cli' 1507 else if (opencodeId) t2[taskId].cliProvider = 'opencode-cli' 1508 else if (geminiId) t2[taskId].cliProvider = 'gemini-cli' 1509 else if (cursorId) t2[taskId].cliProvider = 'cursor-cli' 1510 else if (qwenId) t2[taskId].cliProvider = 'qwen-code-cli' 1511 } 1512 log.info(TAG, `[queue] CLI resume IDs for task ${taskId}: claude=${claudeId}, codex=${codexId}, opencode=${opencodeId}, gemini=${geminiId}, cursor=${cursorId}, qwen=${qwenId}`) 1513 } 1514 } catch (e) { 1515 log.warn(TAG, `[queue] Failed to extract CLI resume IDs for task ${taskId}:`, e) 1516 } 1517 1518 saveTasks(t2) 1519 notify('tasks') 1520 notify('runs') 1521 disableSessionHeartbeat(t2[taskId].sessionId) 1522 } 1523 const doneTask = t2[taskId] 1524 queueTaskAutonomyObservation({ 1525 runId: taskRunId, 1526 sessionId, 1527 taskId, 1528 agentId: agent.id, 1529 status: doneTask?.status === 'completed' 1530 ? 'completed' 1531 : doneTask?.status === 'cancelled' 1532 ? 'cancelled' 1533 : 'failed', 1534 resultText: doneTask?.result || result || null, 1535 error: doneTask?.status === 'completed' ? null : (doneTask?.error || taskRun.error || null), 1536 toolEvents: taskRun.toolEvents, 1537 sourceMessage: task.description || task.title, 1538 }) 1539 if (doneTask?.status === 'completed') { 1540 pushMainLoopEventToMainSessions({ 1541 type: 'task_completed', 1542 text: `Task completed: "${task.title}" (${taskId})`, 1543 }) 1544 notifyOrchestrators(`Task completed: "${task.title}"`, `task-complete:${taskId}`) 1545 queueSwarmFeedTaskCompletionWake(doneTask) 1546 handleTerminalTaskResultDeliveries(doneTask) 1547 cleanupTerminalOneOffSchedule(doneTask) 1548 // Clean up LangGraph checkpoints for completed tasks 1549 getCheckpointSaver().deleteThread(taskId).catch((e) => 1550 log.warn(TAG, `[queue] Failed to clean up checkpoints for task ${taskId}:`, e) 1551 ) 1552 // Cascade unblock: auto-queue tasks whose blockers are all done 1553 const latestTasks = loadTasks() 1554 const unblockedIds = cascadeUnblock(latestTasks, taskId) 1555 if (unblockedIds.length > 0) { 1556 saveTasks(latestTasks) 1557 for (const uid of unblockedIds) { 1558 enqueueTask(uid) 1559 log.info(TAG, `[queue] Auto-unblocked task "${latestTasks[uid]?.title}" (${uid})`) 1560 } 1561 notify('tasks') 1562 } 1563 // Wake waiting protocol runs when a linked task completes 1564 if (latestTasks[taskId]?.protocolRunId) { 1565 try { 1566 const { wakeProtocolRunFromTaskCompletion } = await import('@/lib/server/protocols/protocol-service') 1567 wakeProtocolRunFromTaskCompletion(taskId) 1568 } catch (e) { 1569 log.warn(TAG, `[queue] Failed to wake protocol run for task ${taskId}:`, e) 1570 } 1571 } 1572 log.info(TAG, `[queue] Task "${task.title}" completed`) 1573 } else if (doneTask?.status === 'cancelled') { 1574 log.warn(TAG, `[queue] Task "${task.title}" cancelled during execution`) 1575 } else { 1576 if (doneTask?.status === 'queued') { 1577 log.warn(TAG, `[queue] Task "${task.title}" scheduled for retry`) 1578 } else { 1579 pushMainLoopEventToMainSessions({ 1580 type: 'task_failed', 1581 text: `Task failed validation: "${task.title}" (${taskId})`, 1582 }) 1583 notifyOrchestrators(`Task failed: "${task.title}" — validation failure`, `task-fail:${taskId}`) 1584 if (doneTask?.status === 'failed') { 1585 handleTerminalTaskResultDeliveries(doneTask) 1586 cleanupTerminalOneOffSchedule(doneTask) 1587 } 1588 log.warn(TAG, `[queue] Task "${task.title}" failed completion validation`) 1589 } 1590 } 1591 } catch (err: unknown) { 1592 const errMsg = err instanceof Error ? err.message : String(err || 'Unknown error') 1593 log.error(TAG, `[queue] Task "${task.title}" failed:`, errMsg) 1594 const taskRunId = `${taskId}:attempt-${(task.attempts || 0) + 1}` 1595 const t2 = loadTasks() 1596 if (isCancelledTask(t2[taskId])) { 1597 disableSessionHeartbeat(t2[taskId].sessionId) 1598 notify('tasks') 1599 notify('runs') 1600 queueTaskAutonomyObservation({ 1601 runId: taskRunId, 1602 sessionId, 1603 taskId, 1604 agentId: agent.id, 1605 status: 'cancelled', 1606 error: t2[taskId].error || errMsg, 1607 sourceMessage: task.description || task.title, 1608 }) 1609 log.warn(TAG, `[queue] Task "${task.title}" aborted because it was cancelled`) 1610 return 1611 } 1612 if (t2[taskId]) { 1613 applyTaskPolicyDefaults(t2[taskId]) 1614 1615 // Auto-repair: attempt a repair turn before retrying if a repairPrompt is available 1616 const failureClassification = classifyRuntimeFailure({ source: 'task', message: errMsg }) 1617 if (failureClassification.repairPrompt && t2[taskId].sessionId) { 1618 try { 1619 const repairRunId = `repair:${taskId}:${Date.now()}` 1620 t2[taskId].repairRunId = repairRunId 1621 t2[taskId].lastRepairAttemptAt = Date.now() 1622 saveTasks(t2) 1623 await enqueueExecution({ 1624 kind: 'session_turn', 1625 input: { 1626 sessionId: t2[taskId].sessionId!, 1627 message: `[AUTO-REPAIR] ${failureClassification.repairPrompt}\n\nOriginal error: ${errMsg.slice(0, 300)}`, 1628 internal: true, 1629 source: 'task-repair', 1630 mode: 'followup', 1631 dedupeKey: repairRunId, 1632 }, 1633 }).promise 1634 log.info(TAG, `[queue] Repair turn completed for task "${task.title}" (${taskId})`) 1635 } catch (repairErr: unknown) { 1636 log.warn(TAG, `[queue] Repair turn failed for task "${task.title}":`, repairErr instanceof Error ? repairErr.message : String(repairErr)) 1637 // If repair fails, attempt guardian recovery 1638 const taskCwd = t2[taskId].cwd || WORKSPACE_DIR 1639 prepareGuardianRecovery({ 1640 cwd: taskCwd, 1641 reason: `Auto-repair failed for task "${task.title}": ${errMsg.slice(0, 200)}`, 1642 requester: agent.id, 1643 }) 1644 } 1645 } 1646 1647 // Reload tasks after the async repair turn to avoid overwriting concurrent mutations 1648 const t3 = loadTasks() 1649 // Carry forward repair fields that were saved before the async turn 1650 if (t2[taskId].repairRunId && t3[taskId]) { 1651 t3[taskId].repairRunId = t2[taskId].repairRunId 1652 t3[taskId].lastRepairAttemptAt = t2[taskId].lastRepairAttemptAt 1653 } 1654 const retryState = scheduleRetryOrDeadLetter(t3[taskId], errMsg.slice(0, 500) || 'Unknown error') 1655 if (!t3[taskId].comments) t3[taskId].comments = [] 1656 // Only add a failure comment if the last comment isn't already an error comment 1657 const lastComment = t3[taskId].comments!.at(-1) 1658 const isRepeatError = lastComment?.agentId === agent.id && lastComment?.text.startsWith('Task failed') 1659 if (!isRepeatError) { 1660 t3[taskId].comments!.push({ 1661 id: genId(), 1662 author: agent.name, 1663 agentId: agent.id, 1664 text: 'Task failed — see error details above.', 1665 createdAt: Date.now(), 1666 }) 1667 } 1668 saveTasks(t3) 1669 notify('tasks') 1670 notify('runs') 1671 disableSessionHeartbeat(t3[taskId].sessionId) 1672 if (retryState === 'retry') { 1673 const qRetry = loadQueue() 1674 pushQueueUnique(qRetry, taskId) 1675 saveQueue(qRetry) 1676 pushMainLoopEventToMainSessions({ 1677 type: 'task_retry_scheduled', 1678 text: `Task retry scheduled: "${task.title}" (${taskId}) attempt ${t3[taskId].attempts}/${t3[taskId].maxAttempts}.`, 1679 }) 1680 } 1681 } 1682 queueTaskAutonomyObservation({ 1683 runId: taskRunId, 1684 sessionId, 1685 taskId, 1686 agentId: agent.id, 1687 status: 'failed', 1688 error: errMsg, 1689 sourceMessage: task.description || task.title, 1690 }) 1691 const latest = loadTasks()[taskId] as BoardTask | undefined 1692 if (latest?.status === 'queued') { 1693 log.warn(TAG, `[queue] Task "${task.title}" queued for retry after error`) 1694 } else if (latest?.status === 'cancelled') { 1695 log.warn(TAG, `[queue] Task "${task.title}" stayed cancelled after abort`) 1696 } else { 1697 pushMainLoopEventToMainSessions({ 1698 type: 'task_failed', 1699 text: `Task failed: "${task.title}" (${taskId}) — ${errMsg.slice(0, 200)}`, 1700 }) 1701 if (latest?.status === 'failed') { 1702 handleTerminalTaskResultDeliveries(latest) 1703 cleanupTerminalOneOffSchedule(latest) 1704 } 1705 } 1706 } 1707 } 1708 } finally { 1709 _queueState.activeCount-- 1710 endQueuePerf() 1711 const pendingKick = _queueState.pendingKick 1712 _queueState.pendingKick = false 1713 if (pendingKick) { 1714 setTimeout(() => processNext(), 0) 1715 return 1716 } 1717 1718 // Only re-kick when work is actually runnable. This avoids hot loops when the 1719 // queue only contains blocked, deferred, or retry-gated tasks. 1720 const remainingQueue = loadQueue() 1721 if (!remainingQueue.length) return 1722 const tasks = loadTasks() as Record<string, BoardTask> 1723 const probeQueue = [...remainingQueue] 1724 const nextRunnableTaskId = dequeueNextRunnableTask(probeQueue, tasks) 1725 if (nextRunnableTaskId) { 1726 setTimeout(() => processNext(), 0) 1727 } 1728 } 1729 } 1730 1731 /** On boot, disable heartbeat on sessions whose tasks are already terminal. */ 1732 export function cleanupFinishedTaskSessions() { 1733 const tasks = loadTasks() 1734 const sessions = loadSessions() 1735 let cleaned = 0 1736 for (const task of Object.values(tasks) as BoardTask[]) { 1737 if ((task.status === 'completed' || task.status === 'failed' || task.status === 'cancelled') && task.sessionId) { 1738 const session = sessions[task.sessionId] 1739 if (session && session.heartbeatEnabled !== false) { 1740 session.heartbeatEnabled = false 1741 session.lastActiveAt = Date.now() 1742 cleaned++ 1743 } 1744 } 1745 } 1746 if (cleaned > 0) { 1747 saveSessions(sessions) 1748 log.info(TAG, `[queue] Disabled heartbeat on ${cleaned} session(s) with finished tasks`) 1749 } 1750 } 1751 1752 /** Recover running tasks that appear stalled and requeue/dead-letter them per retry policy. */ 1753 export function recoverStalledRunningTasks(): { recovered: number; deadLettered: number } { 1754 const finished = reconcileFinishedRunningTasks() 1755 const settings = loadSettings() 1756 const stallTimeoutMin = normalizeInt(settings.taskStallTimeoutMin, 45, 5, 24 * 60) 1757 const staleMs = stallTimeoutMin * 60_000 1758 const idleTimeoutMin = normalizeInt((settings as Record<string, unknown>).taskIdleTimeoutMin, 15, 2, 120) 1759 const idleMs = idleTimeoutMin * 60_000 1760 const now = Date.now() 1761 const tasks = loadTasks() 1762 const queue = loadQueue() 1763 let recovered = finished.reconciled 1764 let deadLettered = finished.deadLettered 1765 let changed = false 1766 1767 for (const task of Object.values(tasks) as BoardTask[]) { 1768 if (task.status !== 'running') continue 1769 if (!task.startedAt) { 1770 const recoveredAt = Date.now() 1771 task.status = 'queued' 1772 task.queuedAt = task.queuedAt || recoveredAt 1773 task.retryScheduledAt = Date.now() + 30_000 1774 task.updatedAt = recoveredAt 1775 task.error = 'Recovered inconsistent running state (missing startedAt); requeued.' 1776 if (!task.comments) task.comments = [] 1777 task.comments.push({ 1778 id: genId(), 1779 author: 'System', 1780 text: 'Recovered inconsistent running state (missing startedAt). Task requeued.', 1781 createdAt: recoveredAt, 1782 }) 1783 pushQueueUnique(queue, task.id) 1784 recovered++ 1785 changed = true 1786 pushMainLoopEventToMainSessions({ 1787 type: 'task_stall_recovered', 1788 text: `Recovered inconsistent running task "${task.title}" (${task.id}) and requeued it.`, 1789 }) 1790 continue 1791 } 1792 // Existing stall check (overall timeout based on updatedAt/startedAt) 1793 const since = Math.max(task.updatedAt || 0, task.startedAt || 0) 1794 const isStalled = since > 0 && (now - since) >= staleMs 1795 1796 // Idle check (no LLM output for idleTimeoutMin) 1797 const lastActivity = task.lastActivityAt || task.startedAt || 0 1798 const idleDuration = lastActivity > 0 ? now - lastActivity : 0 1799 const isIdle = lastActivity > 0 && idleDuration >= idleMs 1800 1801 if (!isStalled && !isIdle) continue 1802 1803 const reason = isIdle 1804 ? `Idle timeout: no output for ${Math.round(idleDuration / 60_000)}m` 1805 : `Detected stalled run after ${stallTimeoutMin}m without progress` 1806 const state = scheduleRetryOrDeadLetter(task, reason) 1807 disableSessionHeartbeat(task.sessionId) 1808 changed = true 1809 if (state === 'retry') { 1810 pushQueueUnique(queue, task.id) 1811 recovered++ 1812 pushMainLoopEventToMainSessions({ 1813 type: 'task_stall_recovered', 1814 text: `Recovered stalled task "${task.title}" (${task.id}) and requeued attempt ${task.attempts}/${task.maxAttempts}.`, 1815 }) 1816 } else { 1817 deadLettered++ 1818 pushMainLoopEventToMainSessions({ 1819 type: 'task_dead_lettered', 1820 text: `Task dead-lettered after stalling: "${task.title}" (${task.id}).`, 1821 }) 1822 notifyOrchestrators(`Task failed: "${task.title}" — stalled and dead-lettered`, `task-fail:${task.id}`) 1823 } 1824 } 1825 1826 if (changed) { 1827 saveTasks(tasks) 1828 saveQueue(queue) 1829 if (recovered > 0) { 1830 setTimeout(() => processNext(), 250) 1831 } 1832 } 1833 1834 return { recovered, deadLettered } 1835 } 1836 1837 let _resumeQueueCalled = false 1838 1839 export function claimPoolTask(taskId: string, agentId: string): { success: boolean; error?: string } { 1840 // Atomic claim inside a SQLite transaction to prevent concurrent double-claims 1841 const result = withTransaction(() => { 1842 const tasks = loadTasks() as Record<string, BoardTask> 1843 const task = tasks[taskId] 1844 if (!task) return { success: false as const, error: 'Task not found' } 1845 if (task.assignmentMode !== 'pool') return { success: false as const, error: 'Task is not in pool mode' } 1846 if (task.claimedByAgentId) return { success: false as const, error: `Task already claimed by ${task.claimedByAgentId}` } 1847 if (task.status !== 'queued' && task.status !== 'backlog') return { success: false as const, error: `Task status is ${task.status}, not claimable` } 1848 const candidates = Array.isArray(task.poolCandidateAgentIds) ? task.poolCandidateAgentIds : [] 1849 if (candidates.length > 0 && !candidates.includes(agentId)) { 1850 return { success: false as const, error: 'Agent is not in the candidate pool for this task' } 1851 } 1852 // Capability check — reject claim if agent doesn't have required capabilities 1853 const taskReqCaps = Array.isArray(task.requiredCapabilities) ? task.requiredCapabilities as string[] : [] 1854 if (taskReqCaps.length > 0) { 1855 const allAgents = loadAgents() 1856 const claimingAgent = allAgents[agentId] 1857 if (!claimingAgent || !matchesCapabilities(claimingAgent.capabilities, taskReqCaps)) { 1858 return { success: false as const, error: `Agent does not match required capabilities: [${taskReqCaps.join(', ')}]` } 1859 } 1860 } 1861 task.claimedByAgentId = agentId 1862 task.claimedAt = Date.now() 1863 task.agentId = agentId 1864 task.updatedAt = Date.now() 1865 saveTasks(tasks) 1866 return { success: true as const, title: task.title } 1867 }) 1868 if (!result.success) return result 1869 logActivity({ entityType: 'task', entityId: taskId, action: 'claimed', actor: 'agent', actorId: agentId, summary: `Task "${result.title}" claimed by agent ${agentId}` }) 1870 notify('tasks') 1871 return { success: true } 1872 } 1873 1874 export function listClaimableTasks(agentId: string): BoardTask[] { 1875 const tasks = loadTasks() as Record<string, BoardTask> 1876 return Object.values(tasks).filter((task) => { 1877 if (task.assignmentMode !== 'pool') return false 1878 if (task.claimedByAgentId) return false 1879 if (task.status !== 'queued' && task.status !== 'backlog') return false 1880 const candidates = Array.isArray(task.poolCandidateAgentIds) ? task.poolCandidateAgentIds : [] 1881 return candidates.length === 0 || candidates.includes(agentId) 1882 }) 1883 } 1884 1885 /** Resume any queued tasks on server boot */ 1886 export function resumeQueue() { 1887 if (_resumeQueueCalled) return 1888 _resumeQueueCalled = true 1889 // Check for tasks stuck in 'queued' status but not in the queue array 1890 const tasks = loadTasks() 1891 const queue = loadQueue() 1892 let modified = false 1893 for (const task of Object.values(tasks) as BoardTask[]) { 1894 if (task.status === 'queued' && !queue.includes(task.id)) { 1895 applyTaskPolicyDefaults(task) 1896 log.info(TAG, `[queue] Recovering stuck queued task: "${task.title}" (${task.id})`) 1897 queue.push(task.id) 1898 task.queuedAt = task.queuedAt || Date.now() 1899 modified = true 1900 } 1901 } 1902 1903 // Orphan reap: all running tasks are orphans on fresh daemon startup 1904 let recovered = 0 1905 for (const task of Object.values(tasks) as BoardTask[]) { 1906 if (task.status !== 'running') continue 1907 const reason = 'process_lost: task was running when daemon restarted' 1908 applyTaskPolicyDefaults(task) 1909 const outcome = scheduleRetryOrDeadLetter(task, reason) 1910 if (outcome === 'retry') { 1911 pushQueueUnique(queue, task.id) 1912 } 1913 if (!task.comments) task.comments = [] 1914 task.comments.push({ 1915 id: genId(), 1916 author: 'System', 1917 text: `Orphan recovery: ${reason}`, 1918 createdAt: Date.now(), 1919 }) 1920 modified = true 1921 recovered++ 1922 } 1923 if (recovered > 0) { 1924 log.info(TAG, `[queue] Recovered ${recovered} orphaned running task(s) on boot`) 1925 } 1926 1927 if (modified) { 1928 saveQueue(queue) 1929 saveTasks(tasks) 1930 } 1931 1932 if (queue.length > 0) { 1933 log.info(TAG, `[queue] Resuming ${queue.length} queued task(s) on boot`) 1934 processNext() 1935 } 1936 } 1937 1938 /** Re-queue deferred tasks whose agents are now available. */ 1939 export function promoteDeferred(agentId?: string): number { 1940 const tasks = loadTasks() as Record<string, BoardTask> 1941 const agents = loadAgents() 1942 const queue = loadQueue() 1943 let promoted = 0 1944 1945 for (const task of Object.values(tasks)) { 1946 if (task.status !== 'deferred') continue 1947 if (agentId && task.agentId !== agentId) continue 1948 1949 const agent = agents[task.agentId] 1950 if (!agent || isAgentDisabled(agent as Agent)) continue 1951 1952 // Check budget if applicable 1953 const typedAgent = agent as Agent 1954 if (typedAgent.monthlyBudget || typedAgent.dailyBudget || typedAgent.hourlyBudget) { 1955 try { 1956 const check = checkAgentBudgetLimits(typedAgent) 1957 if (!check.ok) continue // still over budget 1958 } catch {} 1959 } 1960 1961 task.status = 'queued' 1962 task.deferredReason = null 1963 task.updatedAt = Date.now() 1964 pushQueueUnique(queue, task.id) 1965 promoted++ 1966 } 1967 1968 if (promoted > 0) { 1969 saveTasks(tasks) 1970 saveQueue(queue) 1971 notify('tasks') 1972 setTimeout(() => processNext(), 0) 1973 } 1974 return promoted 1975 }