/ src / lib / server / runtime / queue / core.ts
core.ts
   1  import { log } from '@/lib/server/logger'
   2  import { matchesCapabilities, filterAgentsByCapabilities, capabilityMatchScore } from '@/lib/server/agents/capability-match'
   3  import { genId } from '@/lib/id'
   4  import { dedup, hmrSingleton, jitteredBackoff } from '@/lib/shared-utils'
   5  import fs from 'node:fs'
   6  import path from 'node:path'
   7  import { logActivity } from '@/lib/server/activity/activity-log'
   8  import { loadAgents } from '@/lib/server/agents/agent-repository'
   9  import { withTransaction } from '@/lib/server/persistence/transaction'
  10  import { loadQueue, saveQueue } from '@/lib/server/runtime/queue-repository'
  11  import { loadSchedules, saveSchedules } from '@/lib/server/schedules/schedule-repository'
  12  import { loadSessions, saveSessions } from '@/lib/server/sessions/session-repository'
  13  import { loadSettings } from '@/lib/server/settings/settings-repository'
  14  import { loadTasks, saveTasks } from '@/lib/server/tasks/task-repository'
  15  import { notify } from '@/lib/server/ws-hub'
  16  import { getMessages, getLastMessage, appendMessage } from '@/lib/server/messages/message-repository'
  17  import { perf } from '@/lib/server/runtime/perf'
  18  import { WORKSPACE_DIR } from '@/lib/server/data-dir'
  19  import { createAgentTaskSession } from '@/lib/server/agents/task-session'
  20  import { formatValidationFailure } from '@/lib/server/tasks/task-validation'
  21  import { pushMainLoopEventToMainSessions } from '@/lib/server/agents/main-agent-loop'
  22  import type { ExecuteChatTurnResult } from '@/lib/server/chat-execution/chat-execution-types'
  23  import { checkAgentBudgetLimits } from '@/lib/server/cost'
  24  import { enqueueExecution } from '@/lib/server/execution-engine'
  25  import { extractTaskResult, formatResultBody } from '@/lib/server/tasks/task-result'
  26  import { checkoutTask } from '@/lib/server/tasks/task-checkout'
  27  import { queueSwarmFeedTaskCompletionWake } from '@/lib/server/swarmfeed-runtime'
  28  import {
  29    classifyRuntimeFailure,
  30    observeAutonomyRunOutcome,
  31    recordSupervisorIncident,
  32  } from '@/lib/server/autonomy/supervisor-reflection'
  33  import {
  34    collectTaskConnectorFollowupTargets as collectTaskConnectorFollowupTargetsImpl,
  35    extractLikelyOutputFiles,
  36    isSendableAttachment,
  37    maybeResolveUploadMediaPathFromUrl,
  38    notifyConnectorTaskFollowups,
  39    resolveExistingOutputFilePath,
  40    resolveTaskOriginConnectorFollowupTarget as resolveTaskOriginConnectorFollowupTargetImpl,
  41    type ScheduleTaskMeta,
  42    type SessionLike,
  43  } from '@/lib/server/tasks/task-followups'
  44  import { getCheckpointSaver } from '@/lib/server/langgraph-checkpoint'
  45  import { cascadeUnblock } from '@/lib/server/dag-validation'
  46  import { prepareGuardianRecovery } from '@/lib/server/agents/guardian'
  47  import { notifyOrchestrators } from '@/lib/server/runtime/orchestrator-events'
  48  import type { Agent, BoardTask, Message, Session } from '@/types'
  49  import { buildAgentDisabledMessage, isAgentDisabled } from '@/lib/server/agents/agent-availability'
  50  import {
  51    didTaskValidationChange,
  52    markInvalidCompletedTaskFailed,
  53    markValidatedTaskCompleted,
  54    refreshTaskCompletionValidation,
  55  } from '@/lib/server/tasks/task-lifecycle'
  56  
  57  const TAG = 'queue'
  58  
  59  export const collectTaskConnectorFollowupTargets = collectTaskConnectorFollowupTargetsImpl
  60  export const resolveTaskOriginConnectorFollowupTarget = resolveTaskOriginConnectorFollowupTargetImpl
  61  
  62  // HMR-safe: pin processing state to globalThis so hot reloads don't reset it
  63  const _queueState = hmrSingleton('__swarmclaw_queue__', () => ({
  64    activeCount: 0,
  65    maxConcurrent: 3,
  66    pendingKick: false,
  67  }))
  68  
  69  function normalizeInt(value: unknown, fallback: number, min: number, max: number): number {
  70    const parsed = typeof value === 'number'
  71      ? value
  72      : typeof value === 'string'
  73        ? Number.parseInt(value, 10)
  74        : Number.NaN
  75    if (!Number.isFinite(parsed)) return fallback
  76    return Math.max(min, Math.min(max, Math.trunc(parsed)))
  77  }
  78  
  79  const OPENCLAW_USE_CASE_TAGS = new Set([
  80    'local-dev',
  81    'single-vps',
  82    'private-tailnet',
  83    'browser-heavy',
  84    'team-control',
  85  ])
  86  
  87  function deriveTaskRoutePreferences(task: BoardTask): {
  88    preferredGatewayTags?: string[]
  89    preferredGatewayUseCase?: string | null
  90  } {
  91    const tags = Array.isArray(task.tags)
  92      ? dedup(task.tags.map((tag) => (typeof tag === 'string' ? tag.trim().toLowerCase() : '')).filter(Boolean))
  93      : []
  94    const customUseCase = typeof task.customFields?.openclawUseCase === 'string'
  95      ? task.customFields.openclawUseCase
  96      : typeof task.customFields?.gatewayUseCase === 'string'
  97        ? task.customFields.gatewayUseCase
  98        : null
  99    const preferredGatewayUseCase = customUseCase && OPENCLAW_USE_CASE_TAGS.has(customUseCase)
 100      ? customUseCase
 101      : (tags.find((tag) => OPENCLAW_USE_CASE_TAGS.has(tag)) || null)
 102    const preferredGatewayTags = tags.filter((tag) => tag !== preferredGatewayUseCase)
 103    return {
 104      preferredGatewayTags,
 105      preferredGatewayUseCase,
 106    }
 107  }
 108  
 109  function resolveTaskPolicy(task: BoardTask): { maxAttempts: number; backoffSec: number } {
 110    const settings = loadSettings()
 111    const defaultMaxAttempts = normalizeInt(settings.defaultTaskMaxAttempts, 3, 1, 20)
 112    const defaultBackoffSec = normalizeInt(settings.taskRetryBackoffSec, 30, 1, 3600)
 113    const maxAttempts = normalizeInt(task.maxAttempts, defaultMaxAttempts, 1, 20)
 114    const backoffSec = normalizeInt(task.retryBackoffSec, defaultBackoffSec, 1, 3600)
 115    return { maxAttempts, backoffSec }
 116  }
 117  
 118  function applyTaskPolicyDefaults(task: BoardTask): void {
 119    const policy = resolveTaskPolicy(task)
 120    if (typeof task.attempts !== 'number' || task.attempts < 0) task.attempts = 0
 121    task.maxAttempts = policy.maxAttempts
 122    task.retryBackoffSec = policy.backoffSec
 123    if (task.retryScheduledAt === undefined) task.retryScheduledAt = null
 124    if (task.deadLetteredAt === undefined) task.deadLetteredAt = null
 125  }
 126  
 127  export interface TaskResumeState {
 128    claudeSessionId: string | null
 129    codexThreadId: string | null
 130    opencodeSessionId: string | null
 131    cursorSessionId?: string | null
 132    qwenSessionId?: string | null
 133    delegateResumeIds: NonNullable<Session['delegateResumeIds']>
 134  }
 135  
 136  export interface TaskResumeContext {
 137    source: 'self' | 'delegated_from_task' | 'blocked_by'
 138    sourceTaskId: string
 139    sourceTaskTitle: string
 140    sourceSessionId: string | null
 141    resume: TaskResumeState
 142  }
 143  
 144  function normalizeResumeHandle(value: unknown): string | null {
 145    return typeof value === 'string' && value.trim() ? value.trim() : null
 146  }
 147  
 148  function buildEmptyDelegateResumeIds(): NonNullable<Session['delegateResumeIds']> {
 149    return {
 150      claudeCode: null,
 151      codex: null,
 152      opencode: null,
 153      gemini: null,
 154      copilot: null,
 155      cursor: null,
 156      qwen: null,
 157    }
 158  }
 159  
 160  function normalizeCliProvider(value: unknown): string | null {
 161    return typeof value === 'string' && value.trim() ? value.trim().toLowerCase() : null
 162  }
 163  
 164  function hasResumeState(state: TaskResumeState | null | undefined): state is TaskResumeState {
 165    if (!state) return false
 166    return Boolean(
 167      state.claudeSessionId
 168      || state.codexThreadId
 169      || state.opencodeSessionId
 170      || state.delegateResumeIds.claudeCode
 171      || state.delegateResumeIds.codex
 172      || state.delegateResumeIds.opencode
 173      || state.delegateResumeIds.gemini
 174      || state.delegateResumeIds.cursor
 175      || state.delegateResumeIds.qwen
 176    )
 177  }
 178  
 179  export function extractTaskResumeState(task: Partial<BoardTask> | null | undefined): TaskResumeState | null {
 180    if (!task) return null
 181  
 182    const legacyResumeId = normalizeResumeHandle(task.cliResumeId)
 183    const legacyProvider = normalizeCliProvider(task.cliProvider)
 184    const claudeSessionId = normalizeResumeHandle(task.claudeResumeId)
 185      || (legacyProvider === 'claude-cli' ? legacyResumeId : null)
 186    const codexThreadId = normalizeResumeHandle(task.codexResumeId)
 187      || (legacyProvider === 'codex-cli' ? legacyResumeId : null)
 188    const opencodeSessionId = normalizeResumeHandle(task.opencodeResumeId)
 189      || (legacyProvider === 'opencode-cli' ? legacyResumeId : null)
 190    const geminiSessionId = normalizeResumeHandle(task.geminiResumeId)
 191      || (legacyProvider === 'gemini-cli' ? legacyResumeId : null)
 192    const cursorSessionId = legacyProvider === 'cursor-cli' ? legacyResumeId : null
 193    const qwenSessionId = legacyProvider === 'qwen-code-cli' ? legacyResumeId : null
 194  
 195    const resume = {
 196      claudeSessionId,
 197      codexThreadId,
 198      opencodeSessionId,
 199      cursorSessionId,
 200      qwenSessionId,
 201      delegateResumeIds: {
 202        claudeCode: claudeSessionId,
 203        codex: codexThreadId,
 204        opencode: opencodeSessionId,
 205        gemini: geminiSessionId,
 206        cursor: cursorSessionId,
 207        qwen: qwenSessionId,
 208      },
 209    } satisfies TaskResumeState
 210  
 211    return hasResumeState(resume) ? resume : null
 212  }
 213  
 214  export function extractSessionResumeState(session: Partial<Session> | null | undefined): TaskResumeState | null {
 215    if (!session) return null
 216  
 217    const claudeSessionId = normalizeResumeHandle(session.claudeSessionId)
 218    const codexThreadId = normalizeResumeHandle(session.codexThreadId)
 219    const opencodeSessionId = normalizeResumeHandle(session.opencodeSessionId)
 220    const cursorSessionId = normalizeResumeHandle(session.cursorSessionId)
 221    const qwenSessionId = normalizeResumeHandle(session.qwenSessionId)
 222    const delegateResumeIds = session.delegateResumeIds && typeof session.delegateResumeIds === 'object'
 223      ? { ...buildEmptyDelegateResumeIds(), ...session.delegateResumeIds }
 224      : buildEmptyDelegateResumeIds()
 225  
 226    const resume = {
 227      claudeSessionId,
 228      codexThreadId,
 229      opencodeSessionId,
 230      cursorSessionId,
 231      qwenSessionId,
 232      delegateResumeIds: {
 233        claudeCode: normalizeResumeHandle(delegateResumeIds.claudeCode) || claudeSessionId,
 234        codex: normalizeResumeHandle(delegateResumeIds.codex) || codexThreadId,
 235        opencode: normalizeResumeHandle(delegateResumeIds.opencode) || opencodeSessionId,
 236        gemini: normalizeResumeHandle(delegateResumeIds.gemini),
 237        cursor: normalizeResumeHandle(delegateResumeIds.cursor) || cursorSessionId,
 238        qwen: normalizeResumeHandle(delegateResumeIds.qwen) || qwenSessionId,
 239        copilot: normalizeResumeHandle(delegateResumeIds.copilot),
 240      },
 241    } satisfies TaskResumeState
 242  
 243    return hasResumeState(resume) ? resume : null
 244  }
 245  
 246  export function resolveTaskResumeContext(
 247    task: BoardTask,
 248    tasksById: Record<string, BoardTask>,
 249    sessionsById?: Record<string, SessionLike | Session>,
 250  ): TaskResumeContext | null {
 251    const candidates: Array<{ source: TaskResumeContext['source']; taskId: string | null | undefined }> = [
 252      { source: 'self', taskId: task.id },
 253      { source: 'delegated_from_task', taskId: task.delegatedFromTaskId },
 254      ...((Array.isArray(task.blockedBy) ? task.blockedBy : []).map((taskId) => ({ source: 'blocked_by' as const, taskId }))),
 255    ]
 256    const seen = new Set<string>()
 257  
 258    for (const candidate of candidates) {
 259      const taskId = typeof candidate.taskId === 'string' ? candidate.taskId.trim() : ''
 260      if (!taskId || seen.has(taskId)) continue
 261      seen.add(taskId)
 262      const sourceTask = taskId === task.id ? task : tasksById[taskId]
 263      if (!sourceTask) continue
 264      const sourceSessionId = normalizeResumeHandle(sourceTask.checkpoint?.lastSessionId) || normalizeResumeHandle(sourceTask.sessionId)
 265      const resume = extractTaskResumeState(sourceTask)
 266        || (sourceSessionId && sessionsById?.[sourceSessionId]
 267          ? extractSessionResumeState(sessionsById[sourceSessionId] as Session)
 268          : null)
 269      if (!resume) continue
 270      return {
 271        source: candidate.source,
 272        sourceTaskId: sourceTask.id,
 273        sourceTaskTitle: sourceTask.title,
 274        sourceSessionId,
 275        resume,
 276      }
 277    }
 278  
 279    return null
 280  }
 281  
 282  export function applyTaskResumeStateToSession(session: Session, resume: TaskResumeState | null | undefined): boolean {
 283    if (!hasResumeState(resume)) return false
 284  
 285    let changed = false
 286    const directFields: Array<['claudeSessionId' | 'codexThreadId' | 'opencodeSessionId' | 'cursorSessionId' | 'qwenSessionId', string | null]> = [
 287      ['claudeSessionId', resume.claudeSessionId],
 288      ['codexThreadId', resume.codexThreadId],
 289      ['opencodeSessionId', resume.opencodeSessionId],
 290      ['cursorSessionId', resume.cursorSessionId ?? null],
 291      ['qwenSessionId', resume.qwenSessionId ?? null],
 292    ]
 293    for (const [key, value] of directFields) {
 294      if (!value || session[key] === value) continue
 295      session[key] = value
 296      changed = true
 297    }
 298  
 299    const currentDelegateResume = session.delegateResumeIds && typeof session.delegateResumeIds === 'object'
 300      ? { ...buildEmptyDelegateResumeIds(), ...session.delegateResumeIds }
 301      : buildEmptyDelegateResumeIds()
 302    for (const [key, value] of Object.entries(resume.delegateResumeIds) as Array<[keyof NonNullable<Session['delegateResumeIds']>, string | null]>) {
 303      if (!value || currentDelegateResume[key] === value) continue
 304      currentDelegateResume[key] = value
 305      changed = true
 306    }
 307    if (changed) session.delegateResumeIds = currentDelegateResume
 308    return changed
 309  }
 310  
 311  export function resolveReusableTaskSessionId(
 312    task: BoardTask,
 313    tasks: Record<string, BoardTask>,
 314    sessions: Record<string, SessionLike>,
 315  ): string {
 316    const candidateTaskIds = [
 317      task.id,
 318      typeof task.delegatedFromTaskId === 'string' ? task.delegatedFromTaskId : '',
 319      ...(Array.isArray(task.blockedBy) ? task.blockedBy : []),
 320    ]
 321    const seen = new Set<string>()
 322    for (const candidateTaskId of candidateTaskIds) {
 323      const taskId = typeof candidateTaskId === 'string' ? candidateTaskId.trim() : ''
 324      if (!taskId || seen.has(taskId)) continue
 325      seen.add(taskId)
 326      const sourceTask = taskId === task.id ? task : tasks[taskId]
 327      if (!sourceTask) continue
 328      const candidates = [
 329        normalizeResumeHandle(sourceTask.checkpoint?.lastSessionId),
 330        normalizeResumeHandle(sourceTask.sessionId),
 331      ]
 332      for (const candidate of candidates) {
 333        if (candidate && sessions[candidate]) return candidate
 334      }
 335    }
 336    return ''
 337  }
 338  
 339  function buildTaskContinuationNote(
 340    reusedExistingSession: boolean,
 341    resumeContext: TaskResumeContext | null,
 342  ): string {
 343    const notes: string[] = []
 344    if (reusedExistingSession) {
 345      notes.push('Reusing the previous execution session for this task.')
 346    }
 347    if (resumeContext?.source === 'delegated_from_task' || resumeContext?.source === 'blocked_by') {
 348      notes.push(`Stored CLI context is available from related task "${resumeContext.sourceTaskTitle}".`)
 349    } else if (resumeContext?.source === 'self' && !reusedExistingSession) {
 350      notes.push('Stored CLI resume handles are available for continuation.')
 351    }
 352    return notes.length ? `\n\n${notes.join(' ')}` : ''
 353  }
 354  
 355  const DEV_TASK_HINT = /\b(dev(?:\s+server)?|start(?:ing)?\s+(?:the\s+)?server|run(?:ning)?\s+(?:the\s+)?(?:app|project|site)|serve|localhost|http\s+server|web\s+server|npm\b|pnpm\b|yarn\b|bun\b|vite|next(?:\.js)?|react|build|compile)\b/i
 356  const TASK_CWD_NOISE_DIRS = new Set([
 357    'uploads',
 358    'data',
 359    'projects',
 360    'tasks',
 361    '.swarm-data-test',
 362    '.git',
 363    '.next',
 364    'node_modules',
 365  ])
 366  const PROJECT_MARKER_FILES = ['package.json', 'pyproject.toml', 'Cargo.toml', 'go.mod', '.git']
 367  const SOURCE_MARKER_DIRS = ['src', 'app', 'public', 'pages']
 368  const WORKSPACE_PROJECTS_DIR = path.join(WORKSPACE_DIR, 'projects')
 369  
 370  interface WorkspaceDirCandidate {
 371    dir: string
 372    name: string
 373    hasProjectMarker: boolean
 374    hasSourceMarker: boolean
 375  }
 376  
 377  let workspaceDirCache: { expiresAt: number; candidates: WorkspaceDirCandidate[] } | null = null
 378  
 379  function isExistingDirectory(dirPath: string): boolean {
 380    try {
 381      return fs.statSync(dirPath).isDirectory()
 382    } catch {
 383      return false
 384    }
 385  }
 386  
 387  function isWithinDirectory(parent: string, child: string): boolean {
 388    const parentResolved = path.resolve(parent)
 389    const childResolved = path.resolve(child)
 390    const rel = path.relative(parentResolved, childResolved)
 391    return rel === '' || (!rel.startsWith('..') && !path.isAbsolute(rel))
 392  }
 393  
 394  function normalizeForMatch(value: string): string {
 395    return value.toLowerCase().replace(/[^a-z0-9]+/g, ' ').trim()
 396  }
 397  
 398  function hasAnyMarker(dirPath: string, markers: string[]): boolean {
 399    return markers.some((marker) => fs.existsSync(path.join(dirPath, marker)))
 400  }
 401  
 402  function normalizeDirCandidate(raw: unknown, baseDir: string): string | null {
 403    if (typeof raw !== 'string') return null
 404    const trimmed = raw.trim()
 405    if (!trimmed) return null
 406    const homeDir = process.env.HOME || ''
 407    const expanded = trimmed === '~'
 408      ? homeDir
 409      : trimmed.startsWith('~/')
 410        ? path.join(homeDir, trimmed.slice(2))
 411        : trimmed
 412    const resolved = path.isAbsolute(expanded) ? path.resolve(expanded) : path.resolve(baseDir, expanded)
 413    return isExistingDirectory(resolved) ? resolved : null
 414  }
 415  
 416  function looksLikeDevTask(task: Pick<BoardTask, 'title' | 'description'>): boolean {
 417    const text = `${task.title || ''} ${task.description || ''}`.trim()
 418    return DEV_TASK_HINT.test(text)
 419  }
 420  
 421  function listWorkspaceDirCandidates(): WorkspaceDirCandidate[] {
 422    const now = Date.now()
 423    if (workspaceDirCache && workspaceDirCache.expiresAt > now) return workspaceDirCache.candidates
 424  
 425    const candidates: WorkspaceDirCandidate[] = []
 426    const seen = new Set<string>()
 427    const roots = [WORKSPACE_DIR, WORKSPACE_PROJECTS_DIR]
 428  
 429    for (const root of roots) {
 430      if (!isExistingDirectory(root)) continue
 431      let entries: fs.Dirent[] = []
 432      try {
 433        entries = fs.readdirSync(root, { withFileTypes: true })
 434      } catch {
 435        continue
 436      }
 437      for (const entry of entries) {
 438        if (!entry.isDirectory()) continue
 439        const name = entry.name
 440        if (!name || name.startsWith('.')) continue
 441        if (TASK_CWD_NOISE_DIRS.has(name)) continue
 442        const dir = path.join(root, name)
 443        const key = path.resolve(dir)
 444        if (seen.has(key)) continue
 445        seen.add(key)
 446        candidates.push({
 447          dir: key,
 448          name,
 449          hasProjectMarker: hasAnyMarker(key, PROJECT_MARKER_FILES),
 450          hasSourceMarker: hasAnyMarker(key, SOURCE_MARKER_DIRS),
 451        })
 452      }
 453    }
 454  
 455    candidates.sort((a, b) => a.name.localeCompare(b.name))
 456    workspaceDirCache = {
 457      expiresAt: now + 15_000,
 458      candidates,
 459    }
 460    return candidates
 461  }
 462  
 463  function inferWorkspaceProjectCwd(task: Pick<BoardTask, 'title' | 'description' | 'file'>): string | null {
 464    const candidates = listWorkspaceDirCandidates()
 465    if (!candidates.length) return null
 466  
 467    const taskText = normalizeForMatch(`${task.title || ''} ${task.description || ''} ${task.file || ''}`)
 468    const devTask = looksLikeDevTask(task)
 469    const markerCandidates = candidates.filter((candidate) => candidate.hasProjectMarker)
 470  
 471    let best: { dir: string; score: number } | null = null
 472    for (const candidate of candidates) {
 473      const nameNorm = normalizeForMatch(candidate.name)
 474      if (!nameNorm) continue
 475      let score = 0
 476      if (taskText.includes(nameNorm)) score += 8
 477      for (const token of nameNorm.split(' ')) {
 478        if (token.length < 3) continue
 479        if (taskText.includes(token)) score += 1
 480      }
 481      if (candidate.hasProjectMarker) score += devTask ? 3 : 1
 482      if (candidate.hasSourceMarker) score += 1
 483      if (!best || score > best.score) best = { dir: candidate.dir, score }
 484    }
 485  
 486    if (best && best.score >= 4) return best.dir
 487    if (devTask && markerCandidates.length === 1) return markerCandidates[0].dir
 488    return null
 489  }
 490  
 491  function resolveTaskExecutionCwd(task: ScheduleTaskMeta, sessions: Record<string, SessionLike>): string {
 492    const workspaceRoot = path.resolve(WORKSPACE_DIR)
 493  
 494    const explicitCwd = normalizeDirCandidate(task.cwd, workspaceRoot)
 495    if (explicitCwd) return explicitCwd
 496  
 497    const projectId = typeof task.projectId === 'string' ? task.projectId.trim() : ''
 498    if (projectId) {
 499      const projectDir = path.join(WORKSPACE_PROJECTS_DIR, projectId)
 500      if (isExistingDirectory(projectDir)) return projectDir
 501    }
 502  
 503    const fileRef = typeof task.file === 'string' ? task.file.trim() : ''
 504    if (fileRef) {
 505      const filePath = path.isAbsolute(fileRef) ? fileRef : path.resolve(workspaceRoot, fileRef)
 506      const fileDir = isExistingDirectory(filePath) ? filePath : path.dirname(filePath)
 507      if (isExistingDirectory(fileDir) && isWithinDirectory(workspaceRoot, fileDir)) return fileDir
 508    }
 509  
 510    const inferredCwd = inferWorkspaceProjectCwd(task)
 511    if (inferredCwd) return inferredCwd
 512  
 513    const sourceSessionId = typeof task.createdInSessionId === 'string' ? task.createdInSessionId.trim() : ''
 514    const sourceSessionCwd = sourceSessionId
 515      ? normalizeDirCandidate(sessions[sourceSessionId]?.cwd, workspaceRoot)
 516      : null
 517    if (sourceSessionCwd && path.resolve(sourceSessionCwd) !== workspaceRoot) return sourceSessionCwd
 518  
 519    const runSessionId = typeof task.sessionId === 'string' ? task.sessionId.trim() : ''
 520    const runSessionCwd = runSessionId
 521      ? normalizeDirCandidate(sessions[runSessionId]?.cwd, workspaceRoot)
 522      : null
 523    if (runSessionCwd && path.resolve(runSessionCwd) !== workspaceRoot) return runSessionCwd
 524  
 525    const sandboxDir = path.join(workspaceRoot, 'tasks', task.id)
 526    fs.mkdirSync(sandboxDir, { recursive: true })
 527    return sandboxDir
 528  }
 529  
 530  function queueContains(queue: string[], id: string): boolean {
 531    return queue.includes(id)
 532  }
 533  
 534  function isCancelledTask(task: Partial<BoardTask> | null | undefined): boolean {
 535    return task?.status === 'cancelled'
 536  }
 537  
 538  function pushQueueUnique(queue: string[], id: string): void {
 539    if (!queueContains(queue, id)) queue.push(id)
 540  }
 541  
 542  function isAgentCreatedTask(task: Partial<BoardTask> | null | undefined): boolean {
 543    return Boolean(typeof task?.createdByAgentId === 'string' && task.createdByAgentId.trim())
 544  }
 545  
 546  function resolveTaskTerminalChatSessionId(
 547    task: BoardTask,
 548    sessions: Record<string, SessionLike>,
 549  ): string | null {
 550    if (task.status !== 'completed' && task.status !== 'failed') return null
 551    if (task.sourceType === 'schedule') return null
 552    if (isAgentCreatedTask(task)) return null
 553    const createdInSessionId = typeof task.createdInSessionId === 'string'
 554      ? task.createdInSessionId.trim()
 555      : ''
 556    return createdInSessionId && sessions[createdInSessionId] ? createdInSessionId : null
 557  }
 558  
 559  interface TaskResultDeliveryData {
 560    statusLabel: 'completed' | 'failed'
 561    resultBody: string
 562    outputFileRefs: string[]
 563    firstImage?: NonNullable<BoardTask['artifacts']>[number]
 564    followupMediaPath?: string
 565    mediaFileName?: string
 566    execCwd: string
 567    resumeLines: string[]
 568  }
 569  
 570  function collectTaskResultDeliveryData(
 571    task: BoardTask,
 572    sessions: Record<string, SessionLike>,
 573  ): TaskResultDeliveryData {
 574    const runSessionId = typeof task.sessionId === 'string' ? task.sessionId : ''
 575    const runSession = runSessionId ? sessions[runSessionId] : null
 576    const fallbackText = runSessionId ? latestAssistantText(runSessionId) : ''
 577    const taskResult = extractTaskResult(
 578      runSessionId ? getMessages(runSessionId) : [],
 579      task.result || fallbackText || null,
 580      { sinceTime: typeof task.startedAt === 'number' ? task.startedAt : null },
 581    )
 582    const resultBody = formatResultBody(taskResult)
 583    const outputFileRefs = Array.isArray(task.outputFiles) && task.outputFiles.length > 0
 584      ? task.outputFiles
 585      : extractLikelyOutputFiles(resultBody)
 586    const firstImage = taskResult.artifacts.find((artifact) => artifact.type === 'image')
 587    const firstArtifactMediaPath = taskResult.artifacts
 588      .map((artifact) => maybeResolveUploadMediaPathFromUrl(artifact.url))
 589      .find((candidate): candidate is string => Boolean(candidate))
 590    const resumeLines: string[] = []
 591    if (task.claudeResumeId) resumeLines.push(`Claude session: \`${task.claudeResumeId}\``)
 592    if (task.codexResumeId) resumeLines.push(`Codex thread: \`${task.codexResumeId}\``)
 593    if (task.opencodeResumeId) resumeLines.push(`OpenCode session: \`${task.opencodeResumeId}\``)
 594    if (task.geminiResumeId) resumeLines.push(`Gemini session: \`${task.geminiResumeId}\``)
 595    if (resumeLines.length === 0 && task.cliResumeId) {
 596      resumeLines.push(`${task.cliProvider || 'CLI'} session: \`${task.cliResumeId}\``)
 597    }
 598    const execCwd = runSession?.cwd || ''
 599    const existingOutputPaths = outputFileRefs
 600      .map((fileRef: string) => resolveExistingOutputFilePath(fileRef, execCwd))
 601      .filter((candidate: string | null): candidate is string => Boolean(candidate))
 602    const firstLocalOutputPath = existingOutputPaths.find((candidate: string) => isSendableAttachment(candidate))
 603    const followupMediaPath = firstArtifactMediaPath || firstLocalOutputPath || undefined
 604  
 605    return {
 606      statusLabel: task.status === 'completed' ? 'completed' : 'failed',
 607      resultBody,
 608      outputFileRefs,
 609      firstImage,
 610      followupMediaPath,
 611      mediaFileName: followupMediaPath ? path.basename(followupMediaPath) : undefined,
 612      execCwd,
 613      resumeLines,
 614    }
 615  }
 616  
 617  function buildTaskTerminalMessage(
 618    prefix: string,
 619    task: BoardTask,
 620    delivery: TaskResultDeliveryData,
 621  ): string {
 622    const parts = [prefix]
 623    if (delivery.execCwd) parts.push(`Working directory: \`${delivery.execCwd}\``)
 624    if (delivery.outputFileRefs.length > 0) {
 625      parts.push(`Output files:\n${delivery.outputFileRefs.slice(0, 8).map((fileRef: string) => `- \`${fileRef}\``).join('\n')}`)
 626    }
 627    if (task.completionReportPath) parts.push(`Task report: \`${task.completionReportPath}\``)
 628    if (delivery.resumeLines.length > 0) parts.push(delivery.resumeLines.join(' | '))
 629    parts.push(delivery.resultBody || 'No summary.')
 630    return parts.join('\n\n')
 631  }
 632  
 633  function latestAssistantText(sessionId: string | null | undefined): string {
 634    if (!sessionId) return ''
 635    const messages = getMessages(sessionId)
 636    for (let i = messages.length - 1; i >= 0; i--) {
 637      const msg = messages[i]
 638      if (msg?.role !== 'assistant') continue
 639      const text = typeof msg?.text === 'string' ? msg.text.trim() : ''
 640      if (!text) continue
 641      if (/^HEARTBEAT_OK$/i.test(text)) continue
 642      return text
 643    }
 644    return ''
 645  }
 646  
 647  function queueTaskAutonomyObservation(input: {
 648    runId: string
 649    sessionId: string
 650    taskId: string
 651    agentId: string
 652    status: 'completed' | 'failed' | 'cancelled'
 653    resultText?: string | null
 654    error?: string | null
 655    toolEvents?: ExecuteChatTurnResult['toolEvents']
 656    sourceMessage?: string | null
 657  }) {
 658    void observeAutonomyRunOutcome({
 659      runId: input.runId,
 660      sessionId: input.sessionId,
 661      taskId: input.taskId,
 662      agentId: input.agentId,
 663      source: 'task',
 664      status: input.status,
 665      resultText: input.resultText,
 666      error: input.error || undefined,
 667      toolEvents: input.toolEvents,
 668      sourceMessage: input.sourceMessage,
 669    }).catch((err: unknown) => {
 670      log.warn(TAG, `[queue] Autonomy observation failed for ${input.runId}:`, err)
 671    })
 672  }
 673  
 674  function hasFinishedExecutionSession(session: SessionLike | Session | null | undefined): boolean {
 675    if (!session) return false
 676    return session.active === false && !session.currentRunId
 677  }
 678  
 679  export function reconcileFinishedRunningTasks(): { reconciled: number; deadLettered: number } {
 680    const tasks = loadTasks()
 681    const sessions = loadSessions() as Record<string, SessionLike>
 682    const settings = loadSettings()
 683    const queue = loadQueue()
 684    const now = Date.now()
 685    let reconciled = 0
 686    let deadLettered = 0
 687    let tasksDirty = false
 688    let sessionsDirty = false
 689    let queueDirty = false
 690    const terminalTasks: BoardTask[] = []
 691  
 692    for (const task of Object.values(tasks) as BoardTask[]) {
 693      if (task.status !== 'running') continue
 694      const sessionId = typeof task.sessionId === 'string' ? task.sessionId : ''
 695      if (!sessionId) continue
 696      const session = sessions[sessionId]
 697      if (!hasFinishedExecutionSession(session)) continue
 698  
 699      const fallbackText = latestAssistantText(sessionId)
 700      if (!fallbackText && !task.result) {
 701        task.status = 'failed'
 702        task.result = 'Agent session finished without producing output.'
 703        task.checkoutRunId = null
 704        task.updatedAt = now
 705        tasksDirty = true
 706        continue
 707      }
 708  
 709      applyTaskPolicyDefaults(task)
 710      const taskResult = extractTaskResult(
 711        getMessages(sessionId),
 712        task.result || fallbackText || null,
 713        { sinceTime: typeof task.startedAt === 'number' ? task.startedAt : null },
 714      )
 715      const enrichedResult = formatResultBody(taskResult)
 716      task.result = enrichedResult.slice(0, 4000) || null
 717      task.artifacts = taskResult.artifacts.slice(0, 24)
 718      task.outputFiles = extractLikelyOutputFiles(enrichedResult).slice(0, 24)
 719      task.updatedAt = now
 720      const { validation } = refreshTaskCompletionValidation(task, settings)
 721      if (!task.comments) task.comments = []
 722  
 723      if (validation.ok) {
 724        markValidatedTaskCompleted(task, { now })
 725        task.retryScheduledAt = null
 726        task.deadLetteredAt = null
 727        task.checkpoint = {
 728          ...(task.checkpoint || {}),
 729          lastRunId: sessionId,
 730          lastSessionId: sessionId,
 731          note: 'Recovered completed task state from finished session.',
 732          updatedAt: now,
 733        }
 734        task.comments.push({
 735          id: genId(),
 736          author: 'System',
 737          text: 'Recovered completed task state from a finished execution session.',
 738          createdAt: now,
 739        })
 740        reconciled++
 741        terminalTasks.push(task)
 742      } else {
 743        const failureReason = formatValidationFailure(validation.reasons).slice(0, 500)
 744        const retryState = scheduleRetryOrDeadLetter(task, failureReason)
 745        task.completedAt = retryState === 'dead_lettered' ? null : task.completedAt
 746        task.comments.push({
 747          id: genId(),
 748          author: 'System',
 749          text: `Recovered finished session but the task result failed validation.\n\n${validation.reasons.map((reason) => `- ${reason}`).join('\n')}`,
 750          createdAt: now,
 751        })
 752        if (retryState === 'retry') {
 753          pushQueueUnique(queue, task.id)
 754          queueDirty = true
 755          reconciled++
 756          pushMainLoopEventToMainSessions({
 757            type: 'task_retry_scheduled',
 758            text: `Task retry scheduled: "${task.title}" (${task.id}) attempt ${task.attempts}/${task.maxAttempts} in ${task.retryBackoffSec}s.`,
 759          })
 760        } else {
 761          deadLettered++
 762          terminalTasks.push(task)
 763        }
 764      }
 765  
 766      if (session.heartbeatEnabled !== false) {
 767        session.heartbeatEnabled = false
 768        session.lastActiveAt = now
 769        sessionsDirty = true
 770      }
 771      tasksDirty = true
 772    }
 773  
 774    if (tasksDirty) {
 775      saveTasks(tasks)
 776      notify('tasks')
 777      notify('runs')
 778    }
 779    if (sessionsDirty) saveSessions(sessions as Record<string, Session>)
 780    if (queueDirty) saveQueue(queue)
 781  
 782    for (const task of terminalTasks) {
 783      if (task.status === 'completed') {
 784        logActivity({ entityType: 'task', entityId: task.id, action: 'completed', actor: 'system', actorId: task.agentId, summary: `Task completed: "${task.title}"` })
 785        pushMainLoopEventToMainSessions({
 786          type: 'task_completed',
 787          text: `Task completed: "${task.title}" (${task.id})`,
 788        })
 789        notifyOrchestrators(`Task completed: "${task.title}"`, `task-complete:${task.id}`)
 790      } else if (task.status === 'failed') {
 791        logActivity({ entityType: 'task', entityId: task.id, action: 'failed', actor: 'system', actorId: task.agentId, summary: `Task failed: "${task.title}"` })
 792        pushMainLoopEventToMainSessions({
 793          type: 'task_failed',
 794          text: `Task failed validation: "${task.title}" (${task.id})`,
 795        })
 796        notifyOrchestrators(`Task failed: "${task.title}" — validation failure`, `task-fail:${task.id}`)
 797      }
 798      handleTerminalTaskResultDeliveries(task)
 799      cleanupTerminalOneOffSchedule(task)
 800    }
 801  
 802    return { reconciled, deadLettered }
 803  }
 804  
 805  function cleanupTerminalOneOffSchedule(task: BoardTask): void {
 806    void task
 807  }
 808  
 809  function pushUserFacingTaskResult(task: BoardTask, sessions: Record<string, SessionLike>): void {
 810    if (task.status !== 'completed' && task.status !== 'failed') return
 811    const targetSessionId = resolveTaskTerminalChatSessionId(task, sessions)
 812    if (!targetSessionId) return
 813    const targetSession = sessions[targetSessionId]
 814    if (!targetSession) return
 815  
 816    const delivery = collectTaskResultDeliveryData(task, sessions)
 817    const taskLink = `[${task.title}](#task:${task.id})`
 818    const body = buildTaskTerminalMessage(`Task ${delivery.statusLabel}: **${taskLink}**`, task, delivery)
 819    const now = Date.now()
 820    const lastMsg = getLastMessage(targetSessionId)
 821    if (lastMsg?.role === 'assistant' && lastMsg?.text === body && typeof lastMsg?.time === 'number' && now - lastMsg.time < 30_000) {
 822      return
 823    }
 824  
 825    const message: Message = {
 826      role: 'assistant',
 827      text: body,
 828      time: now,
 829      kind: 'system',
 830    }
 831    if (delivery.firstImage) message.imageUrl = delivery.firstImage.url
 832    appendMessage(targetSessionId, message)
 833    notify(`messages:${targetSessionId}`)
 834  }
 835  
 836  function deliverTaskConnectorFollowups(task: BoardTask, sessions: Record<string, SessionLike>): void {
 837    if (task.status !== 'completed' && task.status !== 'failed') return
 838    const delivery = collectTaskResultDeliveryData(task, sessions)
 839    void notifyConnectorTaskFollowups({
 840      task,
 841      statusLabel: delivery.statusLabel,
 842      summaryText: delivery.resultBody || '',
 843      imageUrl: delivery.firstImage?.url,
 844      mediaPath: delivery.followupMediaPath,
 845      mediaFileName: delivery.mediaFileName,
 846    })
 847  }
 848  
 849  function handleTerminalTaskResultDeliveries(task: BoardTask): void {
 850    const sessions = loadSessions() as Record<string, SessionLike>
 851    pushUserFacingTaskResult(task, sessions)
 852    deliverTaskConnectorFollowups(task, sessions)
 853  }
 854  
 855  /** Disable heartbeat on a task's session when the task finishes. */
 856  export function disableSessionHeartbeat(sessionId: string | null | undefined) {
 857    if (!sessionId) return
 858    const sessions = loadSessions()
 859    const session = sessions[sessionId]
 860    if (!session || session.heartbeatEnabled === false) return
 861    session.heartbeatEnabled = false
 862    session.lastActiveAt = Date.now()
 863    saveSessions(sessions)
 864    log.info(TAG, `[queue] Disabled heartbeat on session ${sessionId} (task finished)`)
 865  }
 866  
 867  export function enqueueTask(taskId: string) {
 868    const tasks = loadTasks()
 869    const task = tasks[taskId] as BoardTask | undefined
 870    if (!task) return
 871  
 872    applyTaskPolicyDefaults(task)
 873    task.status = 'queued'
 874    task.queuedAt = Date.now()
 875    task.retryScheduledAt = null
 876    task.updatedAt = Date.now()
 877    saveTasks(tasks)
 878  
 879    const queue = loadQueue()
 880    pushQueueUnique(queue, taskId)
 881    saveQueue(queue)
 882  
 883    logActivity({ entityType: 'task', entityId: taskId, action: 'queued', actor: 'system', summary: `Task queued: "${task.title}"` })
 884  
 885    pushMainLoopEventToMainSessions({
 886      type: 'task_queued',
 887      text: `Task queued: "${task.title}" (${task.id})`,
 888    })
 889  
 890    // If processNext is at capacity, mark a pending kick so it picks up work when a slot frees
 891    if (_queueState.activeCount >= _queueState.maxConcurrent) {
 892      _queueState.pendingKick = true
 893    }
 894    // Delay before kicking worker so UI shows the queued state
 895    setTimeout(() => processNext(), 2000)
 896  }
 897  
 898  /**
 899   * Re-validate all completed tasks so the completed queue only contains
 900   * tasks with concrete completion evidence.
 901   */
 902  export function validateCompletedTasksQueue() {
 903    const tasks = loadTasks()
 904    const sessions = loadSessions()
 905    const settings = loadSettings()
 906    const now = Date.now()
 907    let checked = 0
 908    let demoted = 0
 909    let tasksDirty = false
 910    let sessionsDirty = false
 911  
 912    for (const task of Object.values(tasks) as BoardTask[]) {
 913      if (task.status !== 'completed') continue
 914      checked++
 915  
 916      const previousValidation = task.validation || null
 917      const previousReportPath = task.completionReportPath || null
 918      const { validation } = refreshTaskCompletionValidation(task, settings)
 919      if (task.completionReportPath !== previousReportPath) {
 920        tasksDirty = true
 921      }
 922      const validationChanged = didTaskValidationChange(previousValidation, validation)
 923  
 924      if (validationChanged) {
 925        tasksDirty = true
 926      }
 927  
 928      if (validation.ok) {
 929        if (!task.completedAt) {
 930          markValidatedTaskCompleted(task, { now, preserveCompletedAt: true })
 931          tasksDirty = true
 932        }
 933        continue
 934      }
 935  
 936      markInvalidCompletedTaskFailed(task, validation, {
 937        now,
 938        comment: {
 939          author: 'System',
 940          text: `Task auto-failed completed-queue validation.\n\n${validation.reasons.map((r) => `- ${r}`).join('\n')}`,
 941        },
 942      })
 943      tasksDirty = true
 944      demoted++
 945  
 946      if (task.sessionId) {
 947        const session = sessions[task.sessionId]
 948        if (session && session.heartbeatEnabled !== false) {
 949          session.heartbeatEnabled = false
 950          session.lastActiveAt = now
 951          sessionsDirty = true
 952        }
 953      }
 954    }
 955  
 956    if (tasksDirty) { saveTasks(tasks); notify('tasks') }
 957    if (sessionsDirty) saveSessions(sessions)
 958    if (demoted > 0) {
 959      log.warn(TAG, `[queue] Demoted ${demoted} invalid completed task(s) to failed after validation audit`)
 960    }
 961    return { checked, demoted }
 962  }
 963  
 964  function scheduleRetryOrDeadLetter(task: BoardTask, reason: string): 'retry' | 'dead_lettered' {
 965    if (isCancelledTask(task)) {
 966      task.retryScheduledAt = null
 967      task.deadLetteredAt = null
 968      task.checkoutRunId = null
 969      task.updatedAt = Date.now()
 970      return 'dead_lettered'
 971    }
 972    applyTaskPolicyDefaults(task)
 973    const now = Date.now()
 974    task.attempts = (task.attempts || 0) + 1
 975  
 976    if ((task.attempts || 0) < (task.maxAttempts || 1)) {
 977      const delayMs = jitteredBackoff((task.retryBackoffSec || 30) * 1000, Math.max(0, (task.attempts || 1) - 1), 6 * 3600_000)
 978      task.status = 'queued'
 979      task.retryScheduledAt = now + delayMs
 980      // Release the prior checkout so the task can be checked out again on retry.
 981      // Without this, checkoutTask() returns null every attempt and the orphan-
 982      // recovery loop burns CPU re-queueing a task that can never run.
 983      task.checkoutRunId = null
 984      task.updatedAt = now
 985      task.error = `Retry scheduled after failure: ${reason}`.slice(0, 500)
 986      if (!task.comments) task.comments = []
 987      task.comments.push({
 988        id: genId(),
 989        author: 'System',
 990        text: `Attempt ${task.attempts}/${task.maxAttempts} failed. Retrying in ${Math.round(delayMs / 1000)}s.\n\nReason: ${reason}`,
 991        createdAt: now,
 992      })
 993      return 'retry'
 994    }
 995  
 996    task.status = 'failed'
 997    task.deadLetteredAt = now
 998    task.retryScheduledAt = null
 999    task.checkoutRunId = null
1000    task.updatedAt = now
1001    task.error = `Dead-lettered after ${task.attempts}/${task.maxAttempts} attempts: ${reason}`.slice(0, 500)
1002    if (!task.comments) task.comments = []
1003    task.comments.push({
1004      id: genId(),
1005      author: 'System',
1006      text: `Task moved to dead-letter after ${task.attempts}/${task.maxAttempts} attempts.\n\nReason: ${reason}`,
1007      createdAt: now,
1008    })
1009    notifyOrchestrators(`Task failed: "${task.title}" — ${(reason || 'unknown error').slice(0, 100)}`, `task-fail:${task.id}`)
1010    if (task.sessionId) {
1011      const failure = classifyRuntimeFailure({ source: 'task', message: reason })
1012      recordSupervisorIncident({
1013        runId: task.id,
1014        sessionId: task.sessionId,
1015        taskId: task.id,
1016        agentId: task.agentId || null,
1017        source: 'task',
1018        kind: 'runtime_failure',
1019        severity: failure.severity,
1020        summary: `Task dead-lettered: ${reason}`.slice(0, 320),
1021        details: reason,
1022        failureFamily: failure.family,
1023        remediation: failure.remediation,
1024        repairPrompt: failure.repairPrompt,
1025        autoAction: null,
1026      })
1027    }
1028  
1029    // Guardian recovery is approval-backed. Dead-lettering prepares a restore
1030    // request instead of mutating the workspace automatically.
1031    const agents = loadAgents()
1032    const agent = task.agentId ? agents[task.agentId] : null
1033    if (agent?.autoRecovery) {
1034      const cwd = task.projectId 
1035        ? path.join(WORKSPACE_DIR, 'projects', task.projectId) 
1036        : WORKSPACE_DIR
1037      const recovery = prepareGuardianRecovery({
1038        cwd,
1039        reason,
1040        requester: `task:${task.id}`,
1041      })
1042      if (recovery.ok && recovery.approval) {
1043        task.comments.push({
1044          id: genId(),
1045          author: 'Guardian',
1046          text: `Recovery prepared for checkpoint ${recovery.checkpoint?.head.slice(0, 12) || 'unknown'}.\n\nApprove restore request ${recovery.approval.id} to roll the workspace back safely.`,
1047          createdAt: now + 1,
1048        })
1049      } else {
1050        task.comments.push({
1051          id: genId(),
1052          author: 'Guardian',
1053          text: `Recovery advisory: ${recovery.reason || 'Unable to prepare a restore request.'}`,
1054          createdAt: now + 1,
1055        })
1056      }
1057    }
1058  
1059    return 'dead_lettered'
1060  }
1061  
1062  export function dequeueNextRunnableTask(queue: string[], tasks: Record<string, BoardTask>): string | null {
1063    const now = Date.now()
1064  
1065    // Remove stale entries first.
1066    for (let i = queue.length - 1; i >= 0; i--) {
1067      const id = queue[i]
1068      const task = tasks[id]
1069      if (!task || task.status !== 'queued') queue.splice(i, 1)
1070    }
1071  
1072    const idx = queue.findIndex((id) => {
1073      const task = tasks[id]
1074      if (!task) return false
1075      const retryAt = typeof task.retryScheduledAt === 'number' ? task.retryScheduledAt : null
1076      if (retryAt && retryAt > now) return false
1077      const blockers = Array.isArray(task.blockedBy) ? task.blockedBy : []
1078      if (blockers.some((blockerId) => tasks[blockerId]?.status !== 'completed')) return false
1079      // Skip pool-mode tasks that haven't been claimed yet
1080      if (task.assignmentMode === 'pool' && !task.claimedByAgentId) return false
1081      return true
1082    })
1083    if (idx === -1) return null
1084    const [taskId] = queue.splice(idx, 1)
1085    return taskId || null
1086  }
1087  
1088  export async function processNext() {
1089    const settings = loadSettings()
1090    _queueState.maxConcurrent = normalizeInt(
1091      (settings as Record<string, unknown>).taskQueueConcurrency, 3, 1, 10
1092    )
1093  
1094    if (_queueState.activeCount >= _queueState.maxConcurrent) {
1095      _queueState.pendingKick = true
1096      return
1097    }
1098    _queueState.activeCount++
1099    const endQueuePerf = perf.start('queue', 'processNext')
1100  
1101    try {
1102      // Recover orphaned tasks: status is 'queued' but missing from the queue array
1103      // Only run from the first worker to avoid redundant scans
1104      if (_queueState.activeCount === 1) {
1105        const allTasks = loadTasks()
1106        const currentQueue = loadQueue()
1107        const queueSet = new Set(currentQueue)
1108        let recovered = false
1109        let tasksDirty = false
1110        for (const [id, t] of Object.entries(allTasks) as [string, BoardTask][]) {
1111          if (t.status === 'queued' && !queueSet.has(id)) {
1112            log.info(TAG, `[queue] Recovering orphaned queued task: "${t.title}" (${id})`)
1113            // Defence in depth: a queued task must not carry a stale checkoutRunId
1114            // (left over from pre-1.5.38 retries). If it does, checkoutTask() will
1115            // reject every attempt and this orphan-recovery loop will spin at 100%
1116            // CPU re-queueing a task that can never run.
1117            if (t.checkoutRunId) {
1118              t.checkoutRunId = null
1119              tasksDirty = true
1120            }
1121            pushQueueUnique(currentQueue, id)
1122            recovered = true
1123          }
1124        }
1125        if (tasksDirty) saveTasks(allTasks)
1126        if (recovered) saveQueue(currentQueue)
1127      }
1128  
1129      // Process ONE task per invocation (no while loop)
1130      {
1131        const tasks = loadTasks()
1132        const queue = loadQueue()
1133        if (queue.length === 0) return
1134  
1135        const taskId = dequeueNextRunnableTask(queue, tasks as Record<string, BoardTask>)
1136        saveQueue(queue)
1137        if (!taskId) return
1138        const latestTasks = loadTasks() as Record<string, BoardTask>
1139        let task = latestTasks[taskId] as BoardTask | undefined
1140  
1141        if (!task || task.status !== 'queued') {
1142          return
1143        }
1144  
1145        // Dependency guard: skip tasks whose blockers are not all completed
1146        const blockers = Array.isArray(task.blockedBy) ? task.blockedBy as string[] : []
1147        if (blockers.length > 0) {
1148          const allBlockersDone = blockers.every((bid) => {
1149            const blocker = latestTasks[bid] as BoardTask | undefined
1150            return blocker?.status === 'completed'
1151          })
1152          if (!allBlockersDone) {
1153            // Put it back in the queue and skip
1154            pushQueueUnique(queue, taskId)
1155            saveQueue(queue)
1156            log.info(TAG, `[queue] Skipping task "${task.title}" (${taskId}) — blocked by incomplete dependencies`)
1157            return
1158          }
1159        }
1160  
1161        const agents = loadAgents()
1162        let agent = agents[task.agentId]
1163        if (!agent) {
1164          task.status = 'failed'
1165          task.deadLetteredAt = Date.now()
1166          task.checkoutRunId = null
1167          task.error = `Agent ${task.agentId} not found`
1168          task.updatedAt = Date.now()
1169          saveTasks(latestTasks)
1170          pushMainLoopEventToMainSessions({
1171            type: 'task_failed',
1172            text: `Task failed: "${task.title}" (${task.id}) — agent not found.`,
1173          })
1174          return
1175        }
1176  
1177        // Capability matching — reroute if assigned agent doesn't have required capabilities
1178        const reqCaps = Array.isArray(task.requiredCapabilities) ? task.requiredCapabilities as string[] : []
1179        if (reqCaps.length > 0 && !matchesCapabilities(agent.capabilities, reqCaps)) {
1180          const candidates = filterAgentsByCapabilities(agents, reqCaps)
1181            .filter((a) => a.id !== agent!.id && !a.disabled)
1182          if (candidates.length > 0) {
1183            // Pick best match by capability score, then alphabetically for stability
1184            candidates.sort((a, b) => {
1185              const scoreA = capabilityMatchScore(a.capabilities, reqCaps)
1186              const scoreB = capabilityMatchScore(b.capabilities, reqCaps)
1187              if (scoreB !== scoreA) return scoreB - scoreA
1188              return a.name.localeCompare(b.name)
1189            })
1190            const rerouted = candidates[0]
1191            log.info(TAG, `[queue] Rerouting task "${task.title}" (${taskId}) from agent "${agent.name}" to "${rerouted.name}" — capability match`)
1192            task.agentId = rerouted.id
1193            agent = rerouted
1194          } else {
1195            task.status = 'failed'
1196            task.deadLetteredAt = Date.now()
1197            task.checkoutRunId = null
1198            task.error = `No agent matches required capabilities: [${reqCaps.join(', ')}]`
1199            task.updatedAt = Date.now()
1200            saveTasks(latestTasks)
1201            pushMainLoopEventToMainSessions({
1202              type: 'task_failed',
1203              text: `Task failed: "${task.title}" (${task.id}) — no agent matches required capabilities [${reqCaps.join(', ')}].`,
1204            })
1205            return
1206          }
1207        }
1208  
1209        if (isAgentDisabled(agent)) {
1210          const now = Date.now()
1211          task.deferredReason = buildAgentDisabledMessage(agent, 'process queued tasks')
1212          task.status = 'deferred'
1213          task.updatedAt = now
1214          task.retryScheduledAt = null
1215          saveTasks(latestTasks)
1216          notify('tasks')
1217          pushMainLoopEventToMainSessions({
1218            type: 'task_deferred',
1219            text: `Task deferred: "${task.title}" (${task.id}) — agent ${task.agentId} is disabled.`,
1220          })
1221          return
1222        }
1223  
1224        // Budget enforcement gate
1225        const typedAgent = agent as Agent
1226        if (typedAgent.monthlyBudget || typedAgent.dailyBudget || typedAgent.hourlyBudget) {
1227          try {
1228            const budgetCheck = checkAgentBudgetLimits(typedAgent)
1229            if (!budgetCheck.ok) {
1230              const now = Date.now()
1231              const exceeded = budgetCheck.exceeded[0]
1232              task.status = 'deferred'
1233              task.deferredReason = exceeded?.message || 'Agent budget exceeded'
1234              task.retryScheduledAt = null
1235              task.updatedAt = now
1236              saveTasks(latestTasks)
1237              notify('tasks')
1238  
1239              recordSupervisorIncident({
1240                runId: task.id,
1241                sessionId: task.sessionId || '',
1242                taskId: task.id,
1243                agentId: typedAgent.id,
1244                source: 'task',
1245                kind: 'budget_pressure',
1246                severity: 'high',
1247                summary: exceeded?.message || `Agent "${typedAgent.name}" budget exceeded, task deferred.`,
1248                autoAction: 'budget_trim',
1249              })
1250              return
1251            }
1252          } catch {}
1253        }
1254  
1255        // Atomic checkout — prevents two runners from starting the same task
1256        const runId = genId()
1257        task = checkoutTask(taskId, runId) as BoardTask | undefined
1258        if (!task) return
1259        applyTaskPolicyDefaults(task)
1260        logActivity({ entityType: 'task', entityId: taskId, action: 'running', actor: 'system', actorId: task.agentId, summary: `Task started: "${task.title}"` })
1261  
1262        // Reload tasks map for resolution functions and final save (checkoutTask already saved the running status)
1263        const allTasks = loadTasks() as Record<string, BoardTask>
1264        allTasks[taskId] = task
1265        const sessionsForCwd = loadSessions() as Record<string, SessionLike>
1266        const taskCwd = resolveTaskExecutionCwd(task as ScheduleTaskMeta, sessionsForCwd)
1267        task.cwd = taskCwd
1268        let sessionId = ''
1269        const scheduleTask = task as ScheduleTaskMeta
1270        const isScheduleTask = scheduleTask.sourceType === 'schedule'
1271        const sourceScheduleId = typeof scheduleTask.sourceScheduleId === 'string'
1272          ? scheduleTask.sourceScheduleId
1273          : ''
1274        const reusableTaskSessionId = resolveReusableTaskSessionId(task, allTasks, sessionsForCwd)
1275        const resumeContext = resolveTaskResumeContext(task, allTasks, sessionsForCwd as Record<string, SessionLike | Session>)
1276  
1277        // Resolve the agent's persistent thread session to use as parentSessionId
1278        const agentThreadSessionId = agent.threadSessionId || null
1279        const taskRoutePreferences = deriveTaskRoutePreferences(task)
1280  
1281        if (isScheduleTask && sourceScheduleId) {
1282          const schedules = loadSchedules()
1283          const linkedSchedule = schedules[sourceScheduleId]
1284          const linkedScheduleRecord = linkedSchedule as unknown as Record<string, unknown> | undefined
1285          const existingSessionId = typeof linkedScheduleRecord?.lastSessionId === 'string'
1286            ? linkedScheduleRecord.lastSessionId
1287            : ''
1288          if (existingSessionId) {
1289            const sessions = loadSessions()
1290            if (sessions[existingSessionId]) {
1291              sessionId = existingSessionId
1292            }
1293          }
1294          if (!sessionId) {
1295            sessionId = createAgentTaskSession(
1296              agent,
1297              task.title,
1298              agentThreadSessionId || undefined,
1299              taskCwd,
1300              taskRoutePreferences,
1301            )
1302          }
1303          if (linkedScheduleRecord && linkedScheduleRecord.lastSessionId !== sessionId) {
1304            linkedScheduleRecord.lastSessionId = sessionId
1305            linkedScheduleRecord.updatedAt = Date.now()
1306            const updatedLinkedSchedule = linkedScheduleRecord as unknown as typeof linkedSchedule
1307            schedules[sourceScheduleId] = updatedLinkedSchedule
1308            saveSchedules(schedules)
1309          }
1310        } else {
1311          sessionId = reusableTaskSessionId || createAgentTaskSession(
1312            agent,
1313            task.title,
1314            agentThreadSessionId || undefined,
1315            taskCwd,
1316            taskRoutePreferences,
1317          )
1318        }
1319  
1320        const executionSessions = loadSessions() as Record<string, Session>
1321        const executionSession = executionSessions[sessionId]
1322        const seededResumeState = executionSession
1323          ? applyTaskResumeStateToSession(executionSession, resumeContext?.resume)
1324          : false
1325        if (seededResumeState) saveSessions(executionSessions)
1326  
1327        task.sessionId = sessionId
1328        const reusedExistingSession = !isScheduleTask && Boolean(reusableTaskSessionId) && reusableTaskSessionId === sessionId
1329        const continuationBits: string[] = []
1330        if (reusedExistingSession) {
1331          continuationBits.push('reusing prior session')
1332        }
1333        if (resumeContext?.source === 'delegated_from_task' || resumeContext?.source === 'blocked_by') {
1334          continuationBits.push(`seeded from task ${resumeContext.sourceTaskId}`)
1335        } else if (seededResumeState) {
1336          continuationBits.push('restored CLI resume handles')
1337        }
1338        task.checkpoint = {
1339          lastSessionId: sessionId,
1340          note: `Attempt ${(task.attempts || 0) + 1}/${task.maxAttempts || '?'} started${continuationBits.length ? ` (${continuationBits.join('; ')})` : ''}`,
1341          updatedAt: Date.now(),
1342        }
1343        saveTasks(allTasks)
1344        pushMainLoopEventToMainSessions({
1345          type: 'task_running',
1346          text: `Task running: "${task.title}" (${task.id}) with ${agent.name}`,
1347        })
1348  
1349        // Save initial assistant message so user sees context when opening the session
1350        {
1351          const sessionExists = Boolean(loadSessions()[sessionId])
1352          if (sessionExists) {
1353            const isDelegation = (task as unknown as Record<string, unknown>).sourceType === 'delegation'
1354            let initialText: string
1355            if (isDelegation) {
1356              const delegatorId = (task as unknown as Record<string, unknown>).delegatedByAgentId as string | undefined
1357              const delegator = delegatorId ? agents[delegatorId] : null
1358              const prefix = `[delegation-source:${delegatorId || ''}:${delegator?.name || 'Agent'}:${delegator?.avatarSeed || ''}]`
1359              initialText = `${prefix}\nDelegated by **${delegator?.name || 'another agent'}** | [${task.title}](#task:${task.id})\n\n${task.description || ''}\n\nWorking directory: \`${taskCwd}\`${buildTaskContinuationNote(Boolean(reusedExistingSession), resumeContext)}\n\nI'll begin working on this now.`
1360            } else {
1361              initialText = `Starting task: **${task.title}**\n\n${task.description || ''}\n\nWorking directory: \`${taskCwd}\`${buildTaskContinuationNote(Boolean(reusedExistingSession), resumeContext)}\n\nI'll begin working on this now.`
1362            }
1363            // Inject upstream task results context
1364            if (Array.isArray(task.upstreamResults) && task.upstreamResults.length > 0) {
1365              const upstreamBlock = task.upstreamResults
1366                .map((ur) => `### ${ur.taskTitle}\n${ur.resultPreview || '(no result)'}`)
1367                .join('\n\n')
1368              initialText += `\n\n## Context from upstream tasks\n\n${upstreamBlock}`
1369            }
1370            appendMessage(sessionId, {
1371              role: 'assistant',
1372              text: initialText,
1373              time: Date.now(),
1374              ...(isDelegation ? { kind: 'system' as const } : {}),
1375            })
1376          }
1377        }
1378  
1379        log.info(TAG, `[queue] Running task "${task.title}" (${taskId}) with ${agent.name}`)
1380  
1381        try {
1382          const taskRunId = `${taskId}:attempt-${(task.attempts || 0) + 1}`
1383          const endTaskRunPerf = perf.start('queue', 'executeTaskRun', { taskId, agentName: agent.name })
1384          const taskRunHandle = enqueueExecution({
1385            kind: 'task_attempt',
1386            task,
1387            agent,
1388            sessionId,
1389            executionId: taskRunId,
1390          })
1391          const taskRun = await taskRunHandle.promise
1392          endTaskRunPerf()
1393          // Update lastActivityAt after execution completes (idle timeout tracking)
1394          {
1395            const latestTasks = loadTasks() as Record<string, BoardTask>
1396            const updatedTask = latestTasks[taskId]
1397            if (updatedTask) {
1398              updatedTask.lastActivityAt = Date.now()
1399              saveTasks(latestTasks)
1400            }
1401          }
1402          const result = taskRun.error
1403            ? (taskRun.text || `Error: ${taskRun.error}`)
1404            : taskRun.text
1405          const t2 = loadTasks()
1406          const settings = loadSettings()
1407          if (isCancelledTask(t2[taskId])) {
1408            disableSessionHeartbeat(t2[taskId].sessionId)
1409            notify('tasks')
1410            notify('runs')
1411            queueTaskAutonomyObservation({
1412              runId: taskRunId,
1413              sessionId,
1414              taskId,
1415              agentId: agent.id,
1416              status: 'cancelled',
1417              error: t2[taskId].error || 'Task cancelled',
1418              toolEvents: taskRun.toolEvents,
1419              sourceMessage: task.description || task.title,
1420            })
1421            log.warn(TAG, `[queue] Task "${task.title}" cancelled during execution`)
1422            return
1423          }
1424          if (t2[taskId]) {
1425            applyTaskPolicyDefaults(t2[taskId])
1426            // Structured extraction: Zod-validated result with typed artifacts
1427            const taskResult = extractTaskResult(
1428              getMessages(sessionId),
1429              result || null,
1430              { sinceTime: typeof t2[taskId].startedAt === 'number' ? t2[taskId].startedAt : null },
1431            )
1432            const enrichedResult = formatResultBody(taskResult)
1433            t2[taskId].result = enrichedResult.slice(0, 4000) || null
1434            t2[taskId].artifacts = taskResult.artifacts.slice(0, 24)
1435            t2[taskId].outputFiles = extractLikelyOutputFiles(enrichedResult).slice(0, 24)
1436            t2[taskId].updatedAt = Date.now()
1437            const { validation } = refreshTaskCompletionValidation(t2[taskId], settings)
1438  
1439            const now = Date.now()
1440            // Add a completion/failure comment from the executing agent.
1441            if (!t2[taskId].comments) t2[taskId].comments = []
1442  
1443            if (validation.ok) {
1444              markValidatedTaskCompleted(t2[taskId], { now })
1445              t2[taskId].retryScheduledAt = null
1446              t2[taskId].checkpoint = {
1447                ...(t2[taskId].checkpoint || {}),
1448                lastRunId: sessionId,
1449                lastSessionId: sessionId,
1450                note: `Completed on attempt ${t2[taskId].attempts || 0}/${t2[taskId].maxAttempts || '?'}`,
1451                updatedAt: now,
1452              }
1453              t2[taskId].comments!.push({
1454                id: genId(),
1455                author: agent.name,
1456                agentId: agent.id,
1457                text: `Task completed.\n\n${result?.slice(0, 1000) || 'No summary provided.'}`,
1458                createdAt: now,
1459              })
1460            } else {
1461              const failureReason = formatValidationFailure(validation.reasons).slice(0, 500)
1462              const retryState = scheduleRetryOrDeadLetter(t2[taskId], failureReason)
1463              t2[taskId].completedAt = retryState === 'dead_lettered' ? null : t2[taskId].completedAt
1464              t2[taskId].comments!.push({
1465                id: genId(),
1466                author: agent.name,
1467                agentId: agent.id,
1468                text: `Task failed validation and was not marked completed.\n\n${validation.reasons.map((r) => `- ${r}`).join('\n')}`,
1469                createdAt: now,
1470              })
1471              if (retryState === 'retry') {
1472                const qRetry = loadQueue()
1473                pushQueueUnique(qRetry, taskId)
1474                saveQueue(qRetry)
1475                pushMainLoopEventToMainSessions({
1476                  type: 'task_retry_scheduled',
1477                  text: `Task retry scheduled: "${task.title}" (${taskId}) attempt ${t2[taskId].attempts}/${t2[taskId].maxAttempts} in ${t2[taskId].retryBackoffSec}s.`,
1478                })
1479              }
1480            }
1481  
1482            // Copy ALL CLI resume IDs from the execution session to the task record
1483            try {
1484              const execSessions = loadSessions()
1485              const execSession = execSessions[sessionId] as unknown as Record<string, unknown> | undefined
1486              if (execSession) {
1487                const delegateIds = execSession.delegateResumeIds as
1488                  | { claudeCode?: string | null; codex?: string | null; opencode?: string | null; gemini?: string | null; cursor?: string | null; qwen?: string | null }
1489                  | undefined
1490                // Store each CLI resume ID separately
1491                const claudeId = (execSession.claudeSessionId as string) || delegateIds?.claudeCode || null
1492                const codexId = (execSession.codexThreadId as string) || delegateIds?.codex || null
1493                const opencodeId = (execSession.opencodeSessionId as string) || delegateIds?.opencode || null
1494                const geminiId = delegateIds?.gemini || null
1495                const cursorId = (execSession.cursorSessionId as string) || delegateIds?.cursor || null
1496                const qwenId = (execSession.qwenSessionId as string) || delegateIds?.qwen || null
1497                if (claudeId) t2[taskId].claudeResumeId = claudeId
1498                if (codexId) t2[taskId].codexResumeId = codexId
1499                if (opencodeId) t2[taskId].opencodeResumeId = opencodeId
1500                if (geminiId) t2[taskId].geminiResumeId = geminiId
1501                // Keep backward-compat single field (first available)
1502                const primaryId = claudeId || codexId || opencodeId || geminiId || cursorId || qwenId
1503                if (primaryId) {
1504                  t2[taskId].cliResumeId = primaryId
1505                  if (claudeId) t2[taskId].cliProvider = 'claude-cli'
1506                  else if (codexId) t2[taskId].cliProvider = 'codex-cli'
1507                  else if (opencodeId) t2[taskId].cliProvider = 'opencode-cli'
1508                  else if (geminiId) t2[taskId].cliProvider = 'gemini-cli'
1509                  else if (cursorId) t2[taskId].cliProvider = 'cursor-cli'
1510                  else if (qwenId) t2[taskId].cliProvider = 'qwen-code-cli'
1511                }
1512                log.info(TAG, `[queue] CLI resume IDs for task ${taskId}: claude=${claudeId}, codex=${codexId}, opencode=${opencodeId}, gemini=${geminiId}, cursor=${cursorId}, qwen=${qwenId}`)
1513              }
1514            } catch (e) {
1515              log.warn(TAG, `[queue] Failed to extract CLI resume IDs for task ${taskId}:`, e)
1516            }
1517  
1518            saveTasks(t2)
1519            notify('tasks')
1520            notify('runs')
1521            disableSessionHeartbeat(t2[taskId].sessionId)
1522          }
1523          const doneTask = t2[taskId]
1524          queueTaskAutonomyObservation({
1525            runId: taskRunId,
1526            sessionId,
1527            taskId,
1528            agentId: agent.id,
1529            status: doneTask?.status === 'completed'
1530              ? 'completed'
1531              : doneTask?.status === 'cancelled'
1532                ? 'cancelled'
1533                : 'failed',
1534            resultText: doneTask?.result || result || null,
1535            error: doneTask?.status === 'completed' ? null : (doneTask?.error || taskRun.error || null),
1536            toolEvents: taskRun.toolEvents,
1537            sourceMessage: task.description || task.title,
1538          })
1539          if (doneTask?.status === 'completed') {
1540            pushMainLoopEventToMainSessions({
1541              type: 'task_completed',
1542              text: `Task completed: "${task.title}" (${taskId})`,
1543            })
1544            notifyOrchestrators(`Task completed: "${task.title}"`, `task-complete:${taskId}`)
1545            queueSwarmFeedTaskCompletionWake(doneTask)
1546            handleTerminalTaskResultDeliveries(doneTask)
1547            cleanupTerminalOneOffSchedule(doneTask)
1548            // Clean up LangGraph checkpoints for completed tasks
1549            getCheckpointSaver().deleteThread(taskId).catch((e) =>
1550              log.warn(TAG, `[queue] Failed to clean up checkpoints for task ${taskId}:`, e)
1551            )
1552            // Cascade unblock: auto-queue tasks whose blockers are all done
1553            const latestTasks = loadTasks()
1554            const unblockedIds = cascadeUnblock(latestTasks, taskId)
1555            if (unblockedIds.length > 0) {
1556              saveTasks(latestTasks)
1557              for (const uid of unblockedIds) {
1558                enqueueTask(uid)
1559                log.info(TAG, `[queue] Auto-unblocked task "${latestTasks[uid]?.title}" (${uid})`)
1560              }
1561              notify('tasks')
1562            }
1563            // Wake waiting protocol runs when a linked task completes
1564            if (latestTasks[taskId]?.protocolRunId) {
1565              try {
1566                const { wakeProtocolRunFromTaskCompletion } = await import('@/lib/server/protocols/protocol-service')
1567                wakeProtocolRunFromTaskCompletion(taskId)
1568              } catch (e) {
1569                log.warn(TAG, `[queue] Failed to wake protocol run for task ${taskId}:`, e)
1570              }
1571            }
1572            log.info(TAG, `[queue] Task "${task.title}" completed`)
1573          } else if (doneTask?.status === 'cancelled') {
1574            log.warn(TAG, `[queue] Task "${task.title}" cancelled during execution`)
1575          } else {
1576            if (doneTask?.status === 'queued') {
1577              log.warn(TAG, `[queue] Task "${task.title}" scheduled for retry`)
1578            } else {
1579              pushMainLoopEventToMainSessions({
1580                type: 'task_failed',
1581                text: `Task failed validation: "${task.title}" (${taskId})`,
1582              })
1583              notifyOrchestrators(`Task failed: "${task.title}" — validation failure`, `task-fail:${taskId}`)
1584              if (doneTask?.status === 'failed') {
1585                handleTerminalTaskResultDeliveries(doneTask)
1586                cleanupTerminalOneOffSchedule(doneTask)
1587              }
1588              log.warn(TAG, `[queue] Task "${task.title}" failed completion validation`)
1589            }
1590          }
1591        } catch (err: unknown) {
1592          const errMsg = err instanceof Error ? err.message : String(err || 'Unknown error')
1593          log.error(TAG, `[queue] Task "${task.title}" failed:`, errMsg)
1594          const taskRunId = `${taskId}:attempt-${(task.attempts || 0) + 1}`
1595          const t2 = loadTasks()
1596          if (isCancelledTask(t2[taskId])) {
1597            disableSessionHeartbeat(t2[taskId].sessionId)
1598            notify('tasks')
1599            notify('runs')
1600            queueTaskAutonomyObservation({
1601              runId: taskRunId,
1602              sessionId,
1603              taskId,
1604              agentId: agent.id,
1605              status: 'cancelled',
1606              error: t2[taskId].error || errMsg,
1607              sourceMessage: task.description || task.title,
1608            })
1609            log.warn(TAG, `[queue] Task "${task.title}" aborted because it was cancelled`)
1610            return
1611          }
1612          if (t2[taskId]) {
1613            applyTaskPolicyDefaults(t2[taskId])
1614  
1615            // Auto-repair: attempt a repair turn before retrying if a repairPrompt is available
1616            const failureClassification = classifyRuntimeFailure({ source: 'task', message: errMsg })
1617            if (failureClassification.repairPrompt && t2[taskId].sessionId) {
1618              try {
1619                const repairRunId = `repair:${taskId}:${Date.now()}`
1620                t2[taskId].repairRunId = repairRunId
1621                t2[taskId].lastRepairAttemptAt = Date.now()
1622                saveTasks(t2)
1623                await enqueueExecution({
1624                  kind: 'session_turn',
1625                  input: {
1626                    sessionId: t2[taskId].sessionId!,
1627                    message: `[AUTO-REPAIR] ${failureClassification.repairPrompt}\n\nOriginal error: ${errMsg.slice(0, 300)}`,
1628                    internal: true,
1629                    source: 'task-repair',
1630                    mode: 'followup',
1631                    dedupeKey: repairRunId,
1632                  },
1633                }).promise
1634                log.info(TAG, `[queue] Repair turn completed for task "${task.title}" (${taskId})`)
1635              } catch (repairErr: unknown) {
1636                log.warn(TAG, `[queue] Repair turn failed for task "${task.title}":`, repairErr instanceof Error ? repairErr.message : String(repairErr))
1637                // If repair fails, attempt guardian recovery
1638                const taskCwd = t2[taskId].cwd || WORKSPACE_DIR
1639                prepareGuardianRecovery({
1640                  cwd: taskCwd,
1641                  reason: `Auto-repair failed for task "${task.title}": ${errMsg.slice(0, 200)}`,
1642                  requester: agent.id,
1643                })
1644              }
1645            }
1646  
1647            // Reload tasks after the async repair turn to avoid overwriting concurrent mutations
1648            const t3 = loadTasks()
1649            // Carry forward repair fields that were saved before the async turn
1650            if (t2[taskId].repairRunId && t3[taskId]) {
1651              t3[taskId].repairRunId = t2[taskId].repairRunId
1652              t3[taskId].lastRepairAttemptAt = t2[taskId].lastRepairAttemptAt
1653            }
1654            const retryState = scheduleRetryOrDeadLetter(t3[taskId], errMsg.slice(0, 500) || 'Unknown error')
1655            if (!t3[taskId].comments) t3[taskId].comments = []
1656            // Only add a failure comment if the last comment isn't already an error comment
1657            const lastComment = t3[taskId].comments!.at(-1)
1658            const isRepeatError = lastComment?.agentId === agent.id && lastComment?.text.startsWith('Task failed')
1659            if (!isRepeatError) {
1660              t3[taskId].comments!.push({
1661                id: genId(),
1662                author: agent.name,
1663                agentId: agent.id,
1664                text: 'Task failed — see error details above.',
1665                createdAt: Date.now(),
1666              })
1667            }
1668            saveTasks(t3)
1669            notify('tasks')
1670            notify('runs')
1671            disableSessionHeartbeat(t3[taskId].sessionId)
1672            if (retryState === 'retry') {
1673              const qRetry = loadQueue()
1674              pushQueueUnique(qRetry, taskId)
1675              saveQueue(qRetry)
1676              pushMainLoopEventToMainSessions({
1677                type: 'task_retry_scheduled',
1678                text: `Task retry scheduled: "${task.title}" (${taskId}) attempt ${t3[taskId].attempts}/${t3[taskId].maxAttempts}.`,
1679              })
1680            }
1681          }
1682          queueTaskAutonomyObservation({
1683            runId: taskRunId,
1684            sessionId,
1685            taskId,
1686            agentId: agent.id,
1687            status: 'failed',
1688            error: errMsg,
1689            sourceMessage: task.description || task.title,
1690          })
1691          const latest = loadTasks()[taskId] as BoardTask | undefined
1692          if (latest?.status === 'queued') {
1693            log.warn(TAG, `[queue] Task "${task.title}" queued for retry after error`)
1694          } else if (latest?.status === 'cancelled') {
1695            log.warn(TAG, `[queue] Task "${task.title}" stayed cancelled after abort`)
1696          } else {
1697            pushMainLoopEventToMainSessions({
1698              type: 'task_failed',
1699              text: `Task failed: "${task.title}" (${taskId}) — ${errMsg.slice(0, 200)}`,
1700            })
1701            if (latest?.status === 'failed') {
1702              handleTerminalTaskResultDeliveries(latest)
1703              cleanupTerminalOneOffSchedule(latest)
1704            }
1705          }
1706        }
1707      }
1708    } finally {
1709      _queueState.activeCount--
1710      endQueuePerf()
1711      const pendingKick = _queueState.pendingKick
1712      _queueState.pendingKick = false
1713      if (pendingKick) {
1714        setTimeout(() => processNext(), 0)
1715        return
1716      }
1717  
1718      // Only re-kick when work is actually runnable. This avoids hot loops when the
1719      // queue only contains blocked, deferred, or retry-gated tasks.
1720      const remainingQueue = loadQueue()
1721      if (!remainingQueue.length) return
1722      const tasks = loadTasks() as Record<string, BoardTask>
1723      const probeQueue = [...remainingQueue]
1724      const nextRunnableTaskId = dequeueNextRunnableTask(probeQueue, tasks)
1725      if (nextRunnableTaskId) {
1726        setTimeout(() => processNext(), 0)
1727      }
1728    }
1729  }
1730  
1731  /** On boot, disable heartbeat on sessions whose tasks are already terminal. */
1732  export function cleanupFinishedTaskSessions() {
1733    const tasks = loadTasks()
1734    const sessions = loadSessions()
1735    let cleaned = 0
1736    for (const task of Object.values(tasks) as BoardTask[]) {
1737      if ((task.status === 'completed' || task.status === 'failed' || task.status === 'cancelled') && task.sessionId) {
1738        const session = sessions[task.sessionId]
1739        if (session && session.heartbeatEnabled !== false) {
1740          session.heartbeatEnabled = false
1741          session.lastActiveAt = Date.now()
1742          cleaned++
1743        }
1744      }
1745    }
1746    if (cleaned > 0) {
1747      saveSessions(sessions)
1748      log.info(TAG, `[queue] Disabled heartbeat on ${cleaned} session(s) with finished tasks`)
1749    }
1750  }
1751  
1752  /** Recover running tasks that appear stalled and requeue/dead-letter them per retry policy. */
1753  export function recoverStalledRunningTasks(): { recovered: number; deadLettered: number } {
1754    const finished = reconcileFinishedRunningTasks()
1755    const settings = loadSettings()
1756    const stallTimeoutMin = normalizeInt(settings.taskStallTimeoutMin, 45, 5, 24 * 60)
1757    const staleMs = stallTimeoutMin * 60_000
1758    const idleTimeoutMin = normalizeInt((settings as Record<string, unknown>).taskIdleTimeoutMin, 15, 2, 120)
1759    const idleMs = idleTimeoutMin * 60_000
1760    const now = Date.now()
1761    const tasks = loadTasks()
1762    const queue = loadQueue()
1763    let recovered = finished.reconciled
1764    let deadLettered = finished.deadLettered
1765    let changed = false
1766  
1767    for (const task of Object.values(tasks) as BoardTask[]) {
1768      if (task.status !== 'running') continue
1769      if (!task.startedAt) {
1770        const recoveredAt = Date.now()
1771        task.status = 'queued'
1772        task.queuedAt = task.queuedAt || recoveredAt
1773        task.retryScheduledAt = Date.now() + 30_000
1774        task.updatedAt = recoveredAt
1775        task.error = 'Recovered inconsistent running state (missing startedAt); requeued.'
1776        if (!task.comments) task.comments = []
1777        task.comments.push({
1778          id: genId(),
1779          author: 'System',
1780          text: 'Recovered inconsistent running state (missing startedAt). Task requeued.',
1781          createdAt: recoveredAt,
1782        })
1783        pushQueueUnique(queue, task.id)
1784        recovered++
1785        changed = true
1786        pushMainLoopEventToMainSessions({
1787          type: 'task_stall_recovered',
1788          text: `Recovered inconsistent running task "${task.title}" (${task.id}) and requeued it.`,
1789        })
1790        continue
1791      }
1792      // Existing stall check (overall timeout based on updatedAt/startedAt)
1793      const since = Math.max(task.updatedAt || 0, task.startedAt || 0)
1794      const isStalled = since > 0 && (now - since) >= staleMs
1795  
1796      // Idle check (no LLM output for idleTimeoutMin)
1797      const lastActivity = task.lastActivityAt || task.startedAt || 0
1798      const idleDuration = lastActivity > 0 ? now - lastActivity : 0
1799      const isIdle = lastActivity > 0 && idleDuration >= idleMs
1800  
1801      if (!isStalled && !isIdle) continue
1802  
1803      const reason = isIdle
1804        ? `Idle timeout: no output for ${Math.round(idleDuration / 60_000)}m`
1805        : `Detected stalled run after ${stallTimeoutMin}m without progress`
1806      const state = scheduleRetryOrDeadLetter(task, reason)
1807      disableSessionHeartbeat(task.sessionId)
1808      changed = true
1809      if (state === 'retry') {
1810        pushQueueUnique(queue, task.id)
1811        recovered++
1812        pushMainLoopEventToMainSessions({
1813          type: 'task_stall_recovered',
1814          text: `Recovered stalled task "${task.title}" (${task.id}) and requeued attempt ${task.attempts}/${task.maxAttempts}.`,
1815        })
1816      } else {
1817        deadLettered++
1818        pushMainLoopEventToMainSessions({
1819          type: 'task_dead_lettered',
1820          text: `Task dead-lettered after stalling: "${task.title}" (${task.id}).`,
1821        })
1822        notifyOrchestrators(`Task failed: "${task.title}" — stalled and dead-lettered`, `task-fail:${task.id}`)
1823      }
1824    }
1825  
1826    if (changed) {
1827      saveTasks(tasks)
1828      saveQueue(queue)
1829      if (recovered > 0) {
1830        setTimeout(() => processNext(), 250)
1831      }
1832    }
1833  
1834    return { recovered, deadLettered }
1835  }
1836  
1837  let _resumeQueueCalled = false
1838  
1839  export function claimPoolTask(taskId: string, agentId: string): { success: boolean; error?: string } {
1840    // Atomic claim inside a SQLite transaction to prevent concurrent double-claims
1841    const result = withTransaction(() => {
1842      const tasks = loadTasks() as Record<string, BoardTask>
1843      const task = tasks[taskId]
1844      if (!task) return { success: false as const, error: 'Task not found' }
1845      if (task.assignmentMode !== 'pool') return { success: false as const, error: 'Task is not in pool mode' }
1846      if (task.claimedByAgentId) return { success: false as const, error: `Task already claimed by ${task.claimedByAgentId}` }
1847      if (task.status !== 'queued' && task.status !== 'backlog') return { success: false as const, error: `Task status is ${task.status}, not claimable` }
1848      const candidates = Array.isArray(task.poolCandidateAgentIds) ? task.poolCandidateAgentIds : []
1849      if (candidates.length > 0 && !candidates.includes(agentId)) {
1850        return { success: false as const, error: 'Agent is not in the candidate pool for this task' }
1851      }
1852      // Capability check — reject claim if agent doesn't have required capabilities
1853      const taskReqCaps = Array.isArray(task.requiredCapabilities) ? task.requiredCapabilities as string[] : []
1854      if (taskReqCaps.length > 0) {
1855        const allAgents = loadAgents()
1856        const claimingAgent = allAgents[agentId]
1857        if (!claimingAgent || !matchesCapabilities(claimingAgent.capabilities, taskReqCaps)) {
1858          return { success: false as const, error: `Agent does not match required capabilities: [${taskReqCaps.join(', ')}]` }
1859        }
1860      }
1861      task.claimedByAgentId = agentId
1862      task.claimedAt = Date.now()
1863      task.agentId = agentId
1864      task.updatedAt = Date.now()
1865      saveTasks(tasks)
1866      return { success: true as const, title: task.title }
1867    })
1868    if (!result.success) return result
1869    logActivity({ entityType: 'task', entityId: taskId, action: 'claimed', actor: 'agent', actorId: agentId, summary: `Task "${result.title}" claimed by agent ${agentId}` })
1870    notify('tasks')
1871    return { success: true }
1872  }
1873  
1874  export function listClaimableTasks(agentId: string): BoardTask[] {
1875    const tasks = loadTasks() as Record<string, BoardTask>
1876    return Object.values(tasks).filter((task) => {
1877      if (task.assignmentMode !== 'pool') return false
1878      if (task.claimedByAgentId) return false
1879      if (task.status !== 'queued' && task.status !== 'backlog') return false
1880      const candidates = Array.isArray(task.poolCandidateAgentIds) ? task.poolCandidateAgentIds : []
1881      return candidates.length === 0 || candidates.includes(agentId)
1882    })
1883  }
1884  
1885  /** Resume any queued tasks on server boot */
1886  export function resumeQueue() {
1887    if (_resumeQueueCalled) return
1888    _resumeQueueCalled = true
1889    // Check for tasks stuck in 'queued' status but not in the queue array
1890    const tasks = loadTasks()
1891    const queue = loadQueue()
1892    let modified = false
1893    for (const task of Object.values(tasks) as BoardTask[]) {
1894      if (task.status === 'queued' && !queue.includes(task.id)) {
1895        applyTaskPolicyDefaults(task)
1896        log.info(TAG, `[queue] Recovering stuck queued task: "${task.title}" (${task.id})`)
1897        queue.push(task.id)
1898        task.queuedAt = task.queuedAt || Date.now()
1899        modified = true
1900      }
1901    }
1902  
1903    // Orphan reap: all running tasks are orphans on fresh daemon startup
1904    let recovered = 0
1905    for (const task of Object.values(tasks) as BoardTask[]) {
1906      if (task.status !== 'running') continue
1907      const reason = 'process_lost: task was running when daemon restarted'
1908      applyTaskPolicyDefaults(task)
1909      const outcome = scheduleRetryOrDeadLetter(task, reason)
1910      if (outcome === 'retry') {
1911        pushQueueUnique(queue, task.id)
1912      }
1913      if (!task.comments) task.comments = []
1914      task.comments.push({
1915        id: genId(),
1916        author: 'System',
1917        text: `Orphan recovery: ${reason}`,
1918        createdAt: Date.now(),
1919      })
1920      modified = true
1921      recovered++
1922    }
1923    if (recovered > 0) {
1924      log.info(TAG, `[queue] Recovered ${recovered} orphaned running task(s) on boot`)
1925    }
1926  
1927    if (modified) {
1928      saveQueue(queue)
1929      saveTasks(tasks)
1930    }
1931  
1932    if (queue.length > 0) {
1933      log.info(TAG, `[queue] Resuming ${queue.length} queued task(s) on boot`)
1934      processNext()
1935    }
1936  }
1937  
1938  /** Re-queue deferred tasks whose agents are now available. */
1939  export function promoteDeferred(agentId?: string): number {
1940    const tasks = loadTasks() as Record<string, BoardTask>
1941    const agents = loadAgents()
1942    const queue = loadQueue()
1943    let promoted = 0
1944  
1945    for (const task of Object.values(tasks)) {
1946      if (task.status !== 'deferred') continue
1947      if (agentId && task.agentId !== agentId) continue
1948  
1949      const agent = agents[task.agentId]
1950      if (!agent || isAgentDisabled(agent as Agent)) continue
1951  
1952      // Check budget if applicable
1953      const typedAgent = agent as Agent
1954      if (typedAgent.monthlyBudget || typedAgent.dailyBudget || typedAgent.hourlyBudget) {
1955        try {
1956          const check = checkAgentBudgetLimits(typedAgent)
1957          if (!check.ok) continue // still over budget
1958        } catch {}
1959      }
1960  
1961      task.status = 'queued'
1962      task.deferredReason = null
1963      task.updatedAt = Date.now()
1964      pushQueueUnique(queue, task.id)
1965      promoted++
1966    }
1967  
1968    if (promoted > 0) {
1969      saveTasks(tasks)
1970      saveQueue(queue)
1971      notify('tasks')
1972      setTimeout(() => processNext(), 0)
1973    }
1974    return promoted
1975  }