queue-recovery.test.ts
1 import assert from 'node:assert/strict' 2 import fs from 'node:fs' 3 import os from 'node:os' 4 import path from 'node:path' 5 import { spawnSync } from 'node:child_process' 6 import { describe, it } from 'node:test' 7 8 const repoRoot = path.resolve(path.dirname(new URL(import.meta.url).pathname), '../../..') 9 10 function runWithTempDataDir<T extends Record<string, unknown>>(script: string): T { 11 const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'swarmclaw-queue-recovery-')) 12 try { 13 const result = spawnSync(process.execPath, ['--import', 'tsx', '--input-type=module', '--eval', script], { 14 cwd: repoRoot, 15 env: { 16 ...process.env, 17 DATA_DIR: path.join(tempDir, 'data'), 18 WORKSPACE_DIR: path.join(tempDir, 'workspace'), 19 SWARMCLAW_BUILD_MODE: '1', 20 }, 21 encoding: 'utf-8', 22 timeout: 15000, 23 }) 24 assert.equal(result.status, 0, result.stderr || result.stdout || 'subprocess failed') 25 const lines = (result.stdout || '') 26 .trim() 27 .split('\n') 28 .map((line) => line.trim()) 29 .filter(Boolean) 30 const jsonLine = [...lines].reverse().find((line) => line.startsWith('{')) 31 return JSON.parse(jsonLine || '{}') as T 32 } finally { 33 fs.rmSync(tempDir, { recursive: true, force: true }) 34 } 35 } 36 37 describe('queue recovery', () => { 38 it('processNext recovers orphaned queued tasks and defers them when the agent is disabled', () => { 39 const output = runWithTempDataDir<{ 40 status: string | null 41 queued: string[] 42 retryDelayMs: number | null 43 error: string | null 44 deferredReason: string | null 45 }>(` 46 const storageMod = await import('@/lib/server/storage') 47 const queueMod = await import('@/lib/server/runtime/queue') 48 const storage = storageMod.default || storageMod 49 const queue = queueMod.default || queueMod 50 51 const now = Date.now() 52 storage.saveAgents({ 53 'agent-disabled': { 54 id: 'agent-disabled', 55 name: 'Disabled Agent', 56 provider: 'openai', 57 model: 'gpt-test', 58 disabled: true, 59 createdAt: now, 60 updatedAt: now, 61 }, 62 }) 63 storage.saveTasks({ 64 orphaned: { 65 id: 'orphaned', 66 title: 'Recover me', 67 description: 'Queued task missing from the queue array', 68 status: 'queued', 69 agentId: 'agent-disabled', 70 createdAt: now - 5_000, 71 updatedAt: now - 5_000, 72 }, 73 }) 74 storage.saveQueue([]) 75 76 await queue.processNext() 77 78 const task = storage.loadTasks().orphaned 79 const queueItems = storage.loadQueue() 80 console.log(JSON.stringify({ 81 status: task?.status ?? null, 82 queued: queueItems, 83 retryDelayMs: typeof task?.retryScheduledAt === 'number' ? task.retryScheduledAt - now : null, 84 error: task?.error ?? null, 85 deferredReason: task?.deferredReason ?? null, 86 })) 87 `) 88 89 assert.equal(output.status, 'deferred') 90 assert.deepEqual(output.queued, []) 91 assert.equal(output.retryDelayMs, null) 92 assert.equal(output.error, null) 93 assert.ok(output.deferredReason) 94 assert.match(output.deferredReason, /disabled/i) 95 }) 96 97 it('recoverStalledRunningTasks requeues tasks missing startedAt and records the recovery', () => { 98 const output = runWithTempDataDir<{ 99 result: { recovered: number; deadLettered: number } 100 status: string | null 101 queued: string[] 102 retryDelayMs: number | null 103 error: string | null 104 comment: string | null 105 scheduledCalls: number 106 }>(` 107 const storageMod = await import('@/lib/server/storage') 108 const queueMod = await import('@/lib/server/runtime/queue') 109 const storage = storageMod.default || storageMod 110 const queue = queueMod.default || queueMod 111 112 const now = Date.now() 113 storage.saveTasks({ 114 broken: { 115 id: 'broken', 116 title: 'Broken running task', 117 description: 'Missing startedAt should be recovered', 118 status: 'running', 119 agentId: 'agent-a', 120 createdAt: now - 20_000, 121 updatedAt: now - 15_000, 122 }, 123 }) 124 storage.saveQueue([]) 125 126 const originalSetTimeout = globalThis.setTimeout 127 const scheduled = [] 128 globalThis.setTimeout = (fn, delay, ...args) => { 129 scheduled.push(delay) 130 return 0 131 } 132 try { 133 const result = queue.recoverStalledRunningTasks() 134 const task = storage.loadTasks().broken 135 console.log(JSON.stringify({ 136 result, 137 status: task?.status ?? null, 138 queued: storage.loadQueue(), 139 retryDelayMs: typeof task?.retryScheduledAt === 'number' ? task.retryScheduledAt - now : null, 140 error: task?.error ?? null, 141 comment: task?.comments?.at(-1)?.text ?? null, 142 scheduledCalls: scheduled.length, 143 })) 144 } finally { 145 globalThis.setTimeout = originalSetTimeout 146 } 147 `) 148 149 assert.equal(output.result.recovered, 1) 150 assert.equal(output.result.deadLettered, 0) 151 assert.equal(output.status, 'queued') 152 assert.deepEqual(output.queued, ['broken']) 153 assert.equal(typeof output.retryDelayMs, 'number') 154 assert.ok(output.retryDelayMs !== null) 155 assert.ok(output.retryDelayMs >= 25_000 && output.retryDelayMs <= 35_000) 156 assert.ok(output.error) 157 assert.match(output.error, /missing startedAt/i) 158 assert.ok(output.comment) 159 assert.match(output.comment, /missing startedAt/i) 160 assert.equal(output.scheduledCalls, 1) 161 }) 162 163 it('recoverStalledRunningTasks preserves retry policy backoff for stalled tasks', () => { 164 const output = runWithTempDataDir<{ 165 result: { recovered: number; deadLettered: number } 166 status: string | null 167 attempts: number | null 168 queued: string[] 169 retryDelayMs: number | null 170 error: string | null 171 heartbeatEnabled: boolean | null 172 scheduledCalls: number 173 }>(` 174 const storageMod = await import('@/lib/server/storage') 175 const queueMod = await import('@/lib/server/runtime/queue') 176 const storage = storageMod.default || storageMod 177 const queue = queueMod.default || queueMod 178 179 const now = Date.now() 180 storage.saveSettings({ 181 ...storage.loadSettings(), 182 taskStallTimeoutMin: 5, 183 taskRetryBackoffSec: 90, 184 }) 185 storage.saveSessions({ 186 'sess-stalled': { 187 id: 'sess-stalled', 188 agentId: 'agent-a', 189 messages: [], 190 createdAt: now - 100_000, 191 lastActiveAt: now - 5_000, 192 heartbeatEnabled: true, 193 }, 194 }) 195 storage.saveTasks({ 196 stalled: { 197 id: 'stalled', 198 title: 'Stalled task', 199 description: 'Should use configured backoff when recovered', 200 status: 'running', 201 agentId: 'agent-a', 202 sessionId: 'sess-stalled', 203 createdAt: now - 200_000, 204 updatedAt: now - 420_000, 205 startedAt: now - 420_000, 206 maxAttempts: 3, 207 attempts: 0, 208 }, 209 }) 210 storage.saveQueue([]) 211 212 const originalSetTimeout = globalThis.setTimeout 213 const scheduled = [] 214 globalThis.setTimeout = (fn, delay, ...args) => { 215 scheduled.push(delay) 216 return 0 217 } 218 try { 219 const result = queue.recoverStalledRunningTasks() 220 const task = storage.loadTasks().stalled 221 const session = storage.loadSessions()['sess-stalled'] 222 console.log(JSON.stringify({ 223 result, 224 status: task?.status ?? null, 225 attempts: task?.attempts ?? null, 226 queued: storage.loadQueue(), 227 retryDelayMs: typeof task?.retryScheduledAt === 'number' ? task.retryScheduledAt - now : null, 228 error: task?.error ?? null, 229 heartbeatEnabled: session?.heartbeatEnabled ?? null, 230 scheduledCalls: scheduled.length, 231 })) 232 } finally { 233 globalThis.setTimeout = originalSetTimeout 234 } 235 `) 236 237 assert.equal(output.result.recovered, 1) 238 assert.equal(output.result.deadLettered, 0) 239 assert.equal(output.status, 'queued') 240 assert.equal(output.attempts, 1) 241 assert.deepEqual(output.queued, ['stalled']) 242 assert.equal(typeof output.retryDelayMs, 'number') 243 assert.ok(output.retryDelayMs !== null) 244 assert.ok(output.retryDelayMs >= 85_000 && output.retryDelayMs <= 100_000) 245 assert.ok(output.error) 246 assert.match(output.error, /Retry scheduled after failure/i) 247 assert.equal(output.heartbeatEnabled, false) 248 assert.equal(output.scheduledCalls, 1) 249 }) 250 251 it('scheduleRetryOrDeadLetter via stall recovery clears checkoutRunId so the next attempt can check out', () => { 252 // Regression: a task transitioning running -> queued on retry must release its 253 // prior checkout. Without this, checkoutTask() returns null on every attempt 254 // and the orphan-recovery loop burns CPU re-queueing a task that can never run. 255 const output = runWithTempDataDir<{ 256 status: string | null 257 checkoutRunId: string | null 258 queued: string[] 259 attempts: number | null 260 }>(` 261 const storageMod = await import('@/lib/server/storage') 262 const queueMod = await import('@/lib/server/runtime/queue') 263 const storage = storageMod.default || storageMod 264 const queue = queueMod.default || queueMod 265 266 const now = Date.now() 267 storage.saveSettings({ 268 ...storage.loadSettings(), 269 taskStallTimeoutMin: 5, 270 taskRetryBackoffSec: 30, 271 }) 272 storage.saveTasks({ 273 stuck: { 274 id: 'stuck', 275 title: 'Stuck with stale checkout', 276 description: 'Running task that stalled and must release its checkout on retry', 277 status: 'running', 278 agentId: 'agent-a', 279 startedAt: now - 600_000, 280 updatedAt: now - 600_000, 281 createdAt: now - 700_000, 282 maxAttempts: 3, 283 attempts: 0, 284 checkoutRunId: 'stale-run-id', 285 }, 286 }) 287 storage.saveQueue([]) 288 289 const originalSetTimeout = globalThis.setTimeout 290 globalThis.setTimeout = () => 0 291 try { 292 queue.recoverStalledRunningTasks() 293 } finally { 294 globalThis.setTimeout = originalSetTimeout 295 } 296 297 const task = storage.loadTasks().stuck 298 console.log(JSON.stringify({ 299 status: task?.status ?? null, 300 checkoutRunId: task?.checkoutRunId ?? null, 301 queued: storage.loadQueue(), 302 attempts: task?.attempts ?? null, 303 })) 304 `) 305 306 assert.equal(output.status, 'queued', 'task should be requeued for retry') 307 assert.equal(output.checkoutRunId, null, 'stale checkoutRunId must be released so retry can check out') 308 assert.deepEqual(output.queued, ['stuck']) 309 assert.equal(output.attempts, 1) 310 }) 311 312 it('processNext orphan recovery clears stale checkoutRunId on queued tasks', () => { 313 // Regression: tasks written before the 1.5.38 fix could land in storage with 314 // status='queued' + a set checkoutRunId (because the old scheduleRetryOrDeadLetter 315 // forgot to release the checkout). Orphan recovery must repair this invalid combo 316 // so the next checkoutTask() can succeed — otherwise the loop spins forever. 317 const output = runWithTempDataDir<{ 318 status: string | null 319 checkoutRunId: string | null 320 queued: string[] 321 }>(` 322 const storageMod = await import('@/lib/server/storage') 323 const queueMod = await import('@/lib/server/runtime/queue') 324 const storage = storageMod.default || storageMod 325 const queue = queueMod.default || queueMod 326 327 const now = Date.now() 328 storage.saveAgents({ 329 'agent-a': { 330 id: 'agent-a', 331 name: 'Agent A', 332 provider: 'openai', 333 model: 'gpt-test', 334 createdAt: now, 335 updatedAt: now, 336 }, 337 }) 338 storage.saveTasks({ 339 stale: { 340 id: 'stale', 341 title: 'Pre-1.5.38 stuck task', 342 description: 'Queued but still holds a stale checkoutRunId from a prior failed run', 343 status: 'queued', 344 agentId: 'agent-a', 345 checkoutRunId: 'stale-run-id', 346 createdAt: now - 10_000, 347 updatedAt: now - 10_000, 348 }, 349 }) 350 // Intentionally NOT in the queue array — simulates the orphan condition. 351 storage.saveQueue([]) 352 353 await queue.processNext() 354 355 const task = storage.loadTasks().stale 356 console.log(JSON.stringify({ 357 status: task?.status ?? null, 358 checkoutRunId: task?.checkoutRunId ?? null, 359 queued: storage.loadQueue(), 360 })) 361 `) 362 363 // Orphan recovery should have put the task back in the queue AND cleared the stale id. 364 assert.equal(output.checkoutRunId, null, 'orphan recovery must clear stale checkoutRunId') 365 // After recovery the task either stayed queued or moved to running (depending on concurrency). 366 // Either way it must not still be stuck in an orphan state. 367 assert.ok( 368 output.status === 'queued' || output.status === 'running' || output.status === 'failed', 369 `unexpected status after recovery: ${output.status}`, 370 ) 371 }) 372 373 it('dead-letter path clears checkoutRunId so terminal tasks do not appear checked-out', () => { 374 const output = runWithTempDataDir<{ 375 status: string | null 376 checkoutRunId: string | null 377 attempts: number | null 378 }>(` 379 const storageMod = await import('@/lib/server/storage') 380 const queueMod = await import('@/lib/server/runtime/queue') 381 const storage = storageMod.default || storageMod 382 const queue = queueMod.default || queueMod 383 384 const now = Date.now() 385 storage.saveSettings({ 386 ...storage.loadSettings(), 387 taskStallTimeoutMin: 5, 388 }) 389 storage.saveTasks({ 390 doomed: { 391 id: 'doomed', 392 title: 'Exhausted retries', 393 description: 'Task at its last attempt that stalls should dead-letter and release checkout', 394 status: 'running', 395 agentId: 'agent-a', 396 startedAt: now - 600_000, 397 updatedAt: now - 600_000, 398 createdAt: now - 700_000, 399 maxAttempts: 2, 400 attempts: 1, 401 checkoutRunId: 'stale-run-id', 402 }, 403 }) 404 storage.saveQueue([]) 405 406 const originalSetTimeout = globalThis.setTimeout 407 globalThis.setTimeout = () => 0 408 try { 409 queue.recoverStalledRunningTasks() 410 } finally { 411 globalThis.setTimeout = originalSetTimeout 412 } 413 414 const task = storage.loadTasks().doomed 415 console.log(JSON.stringify({ 416 status: task?.status ?? null, 417 checkoutRunId: task?.checkoutRunId ?? null, 418 attempts: task?.attempts ?? null, 419 })) 420 `) 421 422 assert.equal(output.status, 'failed', 'task should be dead-lettered after exhausting retries') 423 assert.equal(output.checkoutRunId, null, 'dead-lettered tasks must not retain a stale checkoutRunId') 424 assert.equal(output.attempts, 2) 425 }) 426 427 it('resumeQueue restores blocked queued tasks without clobbering their queuedAt timestamp', () => { 428 const output = runWithTempDataDir<{ 429 queued: string[] 430 queuedAt: number | null 431 status: string | null 432 }>(` 433 const storageMod = await import('@/lib/server/storage') 434 const queueMod = await import('@/lib/server/runtime/queue') 435 const storage = storageMod.default || storageMod 436 const queue = queueMod.default || queueMod 437 438 const originalQueuedAt = Date.now() - 45_000 439 storage.saveTasks({ 440 dep: { 441 id: 'dep', 442 title: 'Dependency', 443 description: 'Still running', 444 status: 'running', 445 agentId: 'agent-a', 446 createdAt: originalQueuedAt - 10_000, 447 updatedAt: originalQueuedAt - 10_000, 448 startedAt: originalQueuedAt - 10_000, 449 }, 450 blocked: { 451 id: 'blocked', 452 title: 'Blocked task', 453 description: 'Should be re-added to the queue on boot', 454 status: 'queued', 455 agentId: 'agent-a', 456 blockedBy: ['dep'], 457 queuedAt: originalQueuedAt, 458 createdAt: originalQueuedAt - 20_000, 459 updatedAt: originalQueuedAt - 5_000, 460 }, 461 }) 462 storage.saveQueue([]) 463 464 queue.resumeQueue() 465 466 const task = storage.loadTasks().blocked 467 console.log(JSON.stringify({ 468 queued: storage.loadQueue(), 469 queuedAt: task?.queuedAt ?? null, 470 status: task?.status ?? null, 471 })) 472 `) 473 474 assert.deepEqual(output.queued, ['blocked', 'dep']) 475 assert.equal(output.status, 'queued') 476 assert.equal(typeof output.queuedAt, 'number') 477 assert.ok(output.queuedAt !== null) 478 assert.ok(output.queuedAt < Date.now() - 30_000) 479 }) 480 })