/ src / lib / server / runtime / queue-recovery.test.ts
queue-recovery.test.ts
  1  import assert from 'node:assert/strict'
  2  import fs from 'node:fs'
  3  import os from 'node:os'
  4  import path from 'node:path'
  5  import { spawnSync } from 'node:child_process'
  6  import { describe, it } from 'node:test'
  7  
  8  const repoRoot = path.resolve(path.dirname(new URL(import.meta.url).pathname), '../../..')
  9  
 10  function runWithTempDataDir<T extends Record<string, unknown>>(script: string): T {
 11    const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'swarmclaw-queue-recovery-'))
 12    try {
 13      const result = spawnSync(process.execPath, ['--import', 'tsx', '--input-type=module', '--eval', script], {
 14        cwd: repoRoot,
 15        env: {
 16          ...process.env,
 17          DATA_DIR: path.join(tempDir, 'data'),
 18          WORKSPACE_DIR: path.join(tempDir, 'workspace'),
 19          SWARMCLAW_BUILD_MODE: '1',
 20        },
 21        encoding: 'utf-8',
 22        timeout: 15000,
 23      })
 24      assert.equal(result.status, 0, result.stderr || result.stdout || 'subprocess failed')
 25      const lines = (result.stdout || '')
 26        .trim()
 27        .split('\n')
 28        .map((line) => line.trim())
 29        .filter(Boolean)
 30      const jsonLine = [...lines].reverse().find((line) => line.startsWith('{'))
 31      return JSON.parse(jsonLine || '{}') as T
 32    } finally {
 33      fs.rmSync(tempDir, { recursive: true, force: true })
 34    }
 35  }
 36  
 37  describe('queue recovery', () => {
 38    it('processNext recovers orphaned queued tasks and defers them when the agent is disabled', () => {
 39      const output = runWithTempDataDir<{
 40        status: string | null
 41        queued: string[]
 42        retryDelayMs: number | null
 43        error: string | null
 44        deferredReason: string | null
 45      }>(`
 46        const storageMod = await import('@/lib/server/storage')
 47        const queueMod = await import('@/lib/server/runtime/queue')
 48        const storage = storageMod.default || storageMod
 49        const queue = queueMod.default || queueMod
 50  
 51        const now = Date.now()
 52        storage.saveAgents({
 53          'agent-disabled': {
 54            id: 'agent-disabled',
 55            name: 'Disabled Agent',
 56            provider: 'openai',
 57            model: 'gpt-test',
 58            disabled: true,
 59            createdAt: now,
 60            updatedAt: now,
 61          },
 62        })
 63        storage.saveTasks({
 64          orphaned: {
 65            id: 'orphaned',
 66            title: 'Recover me',
 67            description: 'Queued task missing from the queue array',
 68            status: 'queued',
 69            agentId: 'agent-disabled',
 70            createdAt: now - 5_000,
 71            updatedAt: now - 5_000,
 72          },
 73        })
 74        storage.saveQueue([])
 75  
 76        await queue.processNext()
 77  
 78        const task = storage.loadTasks().orphaned
 79        const queueItems = storage.loadQueue()
 80        console.log(JSON.stringify({
 81          status: task?.status ?? null,
 82          queued: queueItems,
 83          retryDelayMs: typeof task?.retryScheduledAt === 'number' ? task.retryScheduledAt - now : null,
 84          error: task?.error ?? null,
 85          deferredReason: task?.deferredReason ?? null,
 86        }))
 87      `)
 88  
 89      assert.equal(output.status, 'deferred')
 90      assert.deepEqual(output.queued, [])
 91      assert.equal(output.retryDelayMs, null)
 92      assert.equal(output.error, null)
 93      assert.ok(output.deferredReason)
 94      assert.match(output.deferredReason, /disabled/i)
 95    })
 96  
 97    it('recoverStalledRunningTasks requeues tasks missing startedAt and records the recovery', () => {
 98      const output = runWithTempDataDir<{
 99        result: { recovered: number; deadLettered: number }
100        status: string | null
101        queued: string[]
102        retryDelayMs: number | null
103        error: string | null
104        comment: string | null
105        scheduledCalls: number
106      }>(`
107        const storageMod = await import('@/lib/server/storage')
108        const queueMod = await import('@/lib/server/runtime/queue')
109        const storage = storageMod.default || storageMod
110        const queue = queueMod.default || queueMod
111  
112        const now = Date.now()
113        storage.saveTasks({
114          broken: {
115            id: 'broken',
116            title: 'Broken running task',
117            description: 'Missing startedAt should be recovered',
118            status: 'running',
119            agentId: 'agent-a',
120            createdAt: now - 20_000,
121            updatedAt: now - 15_000,
122          },
123        })
124        storage.saveQueue([])
125  
126        const originalSetTimeout = globalThis.setTimeout
127        const scheduled = []
128        globalThis.setTimeout = (fn, delay, ...args) => {
129          scheduled.push(delay)
130          return 0
131        }
132        try {
133          const result = queue.recoverStalledRunningTasks()
134          const task = storage.loadTasks().broken
135          console.log(JSON.stringify({
136            result,
137            status: task?.status ?? null,
138            queued: storage.loadQueue(),
139            retryDelayMs: typeof task?.retryScheduledAt === 'number' ? task.retryScheduledAt - now : null,
140            error: task?.error ?? null,
141            comment: task?.comments?.at(-1)?.text ?? null,
142            scheduledCalls: scheduled.length,
143          }))
144        } finally {
145          globalThis.setTimeout = originalSetTimeout
146        }
147      `)
148  
149      assert.equal(output.result.recovered, 1)
150      assert.equal(output.result.deadLettered, 0)
151      assert.equal(output.status, 'queued')
152      assert.deepEqual(output.queued, ['broken'])
153      assert.equal(typeof output.retryDelayMs, 'number')
154      assert.ok(output.retryDelayMs !== null)
155      assert.ok(output.retryDelayMs >= 25_000 && output.retryDelayMs <= 35_000)
156      assert.ok(output.error)
157      assert.match(output.error, /missing startedAt/i)
158      assert.ok(output.comment)
159      assert.match(output.comment, /missing startedAt/i)
160      assert.equal(output.scheduledCalls, 1)
161    })
162  
163    it('recoverStalledRunningTasks preserves retry policy backoff for stalled tasks', () => {
164      const output = runWithTempDataDir<{
165        result: { recovered: number; deadLettered: number }
166        status: string | null
167        attempts: number | null
168        queued: string[]
169        retryDelayMs: number | null
170        error: string | null
171        heartbeatEnabled: boolean | null
172        scheduledCalls: number
173      }>(`
174        const storageMod = await import('@/lib/server/storage')
175        const queueMod = await import('@/lib/server/runtime/queue')
176        const storage = storageMod.default || storageMod
177        const queue = queueMod.default || queueMod
178  
179        const now = Date.now()
180        storage.saveSettings({
181          ...storage.loadSettings(),
182          taskStallTimeoutMin: 5,
183          taskRetryBackoffSec: 90,
184        })
185        storage.saveSessions({
186          'sess-stalled': {
187            id: 'sess-stalled',
188            agentId: 'agent-a',
189            messages: [],
190            createdAt: now - 100_000,
191            lastActiveAt: now - 5_000,
192            heartbeatEnabled: true,
193          },
194        })
195        storage.saveTasks({
196          stalled: {
197            id: 'stalled',
198            title: 'Stalled task',
199            description: 'Should use configured backoff when recovered',
200            status: 'running',
201            agentId: 'agent-a',
202            sessionId: 'sess-stalled',
203            createdAt: now - 200_000,
204            updatedAt: now - 420_000,
205            startedAt: now - 420_000,
206            maxAttempts: 3,
207            attempts: 0,
208          },
209        })
210        storage.saveQueue([])
211  
212        const originalSetTimeout = globalThis.setTimeout
213        const scheduled = []
214        globalThis.setTimeout = (fn, delay, ...args) => {
215          scheduled.push(delay)
216          return 0
217        }
218        try {
219          const result = queue.recoverStalledRunningTasks()
220          const task = storage.loadTasks().stalled
221          const session = storage.loadSessions()['sess-stalled']
222          console.log(JSON.stringify({
223            result,
224            status: task?.status ?? null,
225            attempts: task?.attempts ?? null,
226            queued: storage.loadQueue(),
227            retryDelayMs: typeof task?.retryScheduledAt === 'number' ? task.retryScheduledAt - now : null,
228            error: task?.error ?? null,
229            heartbeatEnabled: session?.heartbeatEnabled ?? null,
230            scheduledCalls: scheduled.length,
231          }))
232        } finally {
233          globalThis.setTimeout = originalSetTimeout
234        }
235      `)
236  
237      assert.equal(output.result.recovered, 1)
238      assert.equal(output.result.deadLettered, 0)
239      assert.equal(output.status, 'queued')
240      assert.equal(output.attempts, 1)
241      assert.deepEqual(output.queued, ['stalled'])
242      assert.equal(typeof output.retryDelayMs, 'number')
243      assert.ok(output.retryDelayMs !== null)
244      assert.ok(output.retryDelayMs >= 85_000 && output.retryDelayMs <= 100_000)
245      assert.ok(output.error)
246      assert.match(output.error, /Retry scheduled after failure/i)
247      assert.equal(output.heartbeatEnabled, false)
248      assert.equal(output.scheduledCalls, 1)
249    })
250  
251    it('scheduleRetryOrDeadLetter via stall recovery clears checkoutRunId so the next attempt can check out', () => {
252      // Regression: a task transitioning running -> queued on retry must release its
253      // prior checkout. Without this, checkoutTask() returns null on every attempt
254      // and the orphan-recovery loop burns CPU re-queueing a task that can never run.
255      const output = runWithTempDataDir<{
256        status: string | null
257        checkoutRunId: string | null
258        queued: string[]
259        attempts: number | null
260      }>(`
261        const storageMod = await import('@/lib/server/storage')
262        const queueMod = await import('@/lib/server/runtime/queue')
263        const storage = storageMod.default || storageMod
264        const queue = queueMod.default || queueMod
265  
266        const now = Date.now()
267        storage.saveSettings({
268          ...storage.loadSettings(),
269          taskStallTimeoutMin: 5,
270          taskRetryBackoffSec: 30,
271        })
272        storage.saveTasks({
273          stuck: {
274            id: 'stuck',
275            title: 'Stuck with stale checkout',
276            description: 'Running task that stalled and must release its checkout on retry',
277            status: 'running',
278            agentId: 'agent-a',
279            startedAt: now - 600_000,
280            updatedAt: now - 600_000,
281            createdAt: now - 700_000,
282            maxAttempts: 3,
283            attempts: 0,
284            checkoutRunId: 'stale-run-id',
285          },
286        })
287        storage.saveQueue([])
288  
289        const originalSetTimeout = globalThis.setTimeout
290        globalThis.setTimeout = () => 0
291        try {
292          queue.recoverStalledRunningTasks()
293        } finally {
294          globalThis.setTimeout = originalSetTimeout
295        }
296  
297        const task = storage.loadTasks().stuck
298        console.log(JSON.stringify({
299          status: task?.status ?? null,
300          checkoutRunId: task?.checkoutRunId ?? null,
301          queued: storage.loadQueue(),
302          attempts: task?.attempts ?? null,
303        }))
304      `)
305  
306      assert.equal(output.status, 'queued', 'task should be requeued for retry')
307      assert.equal(output.checkoutRunId, null, 'stale checkoutRunId must be released so retry can check out')
308      assert.deepEqual(output.queued, ['stuck'])
309      assert.equal(output.attempts, 1)
310    })
311  
312    it('processNext orphan recovery clears stale checkoutRunId on queued tasks', () => {
313      // Regression: tasks written before the 1.5.38 fix could land in storage with
314      // status='queued' + a set checkoutRunId (because the old scheduleRetryOrDeadLetter
315      // forgot to release the checkout). Orphan recovery must repair this invalid combo
316      // so the next checkoutTask() can succeed — otherwise the loop spins forever.
317      const output = runWithTempDataDir<{
318        status: string | null
319        checkoutRunId: string | null
320        queued: string[]
321      }>(`
322        const storageMod = await import('@/lib/server/storage')
323        const queueMod = await import('@/lib/server/runtime/queue')
324        const storage = storageMod.default || storageMod
325        const queue = queueMod.default || queueMod
326  
327        const now = Date.now()
328        storage.saveAgents({
329          'agent-a': {
330            id: 'agent-a',
331            name: 'Agent A',
332            provider: 'openai',
333            model: 'gpt-test',
334            createdAt: now,
335            updatedAt: now,
336          },
337        })
338        storage.saveTasks({
339          stale: {
340            id: 'stale',
341            title: 'Pre-1.5.38 stuck task',
342            description: 'Queued but still holds a stale checkoutRunId from a prior failed run',
343            status: 'queued',
344            agentId: 'agent-a',
345            checkoutRunId: 'stale-run-id',
346            createdAt: now - 10_000,
347            updatedAt: now - 10_000,
348          },
349        })
350        // Intentionally NOT in the queue array — simulates the orphan condition.
351        storage.saveQueue([])
352  
353        await queue.processNext()
354  
355        const task = storage.loadTasks().stale
356        console.log(JSON.stringify({
357          status: task?.status ?? null,
358          checkoutRunId: task?.checkoutRunId ?? null,
359          queued: storage.loadQueue(),
360        }))
361      `)
362  
363      // Orphan recovery should have put the task back in the queue AND cleared the stale id.
364      assert.equal(output.checkoutRunId, null, 'orphan recovery must clear stale checkoutRunId')
365      // After recovery the task either stayed queued or moved to running (depending on concurrency).
366      // Either way it must not still be stuck in an orphan state.
367      assert.ok(
368        output.status === 'queued' || output.status === 'running' || output.status === 'failed',
369        `unexpected status after recovery: ${output.status}`,
370      )
371    })
372  
373    it('dead-letter path clears checkoutRunId so terminal tasks do not appear checked-out', () => {
374      const output = runWithTempDataDir<{
375        status: string | null
376        checkoutRunId: string | null
377        attempts: number | null
378      }>(`
379        const storageMod = await import('@/lib/server/storage')
380        const queueMod = await import('@/lib/server/runtime/queue')
381        const storage = storageMod.default || storageMod
382        const queue = queueMod.default || queueMod
383  
384        const now = Date.now()
385        storage.saveSettings({
386          ...storage.loadSettings(),
387          taskStallTimeoutMin: 5,
388        })
389        storage.saveTasks({
390          doomed: {
391            id: 'doomed',
392            title: 'Exhausted retries',
393            description: 'Task at its last attempt that stalls should dead-letter and release checkout',
394            status: 'running',
395            agentId: 'agent-a',
396            startedAt: now - 600_000,
397            updatedAt: now - 600_000,
398            createdAt: now - 700_000,
399            maxAttempts: 2,
400            attempts: 1,
401            checkoutRunId: 'stale-run-id',
402          },
403        })
404        storage.saveQueue([])
405  
406        const originalSetTimeout = globalThis.setTimeout
407        globalThis.setTimeout = () => 0
408        try {
409          queue.recoverStalledRunningTasks()
410        } finally {
411          globalThis.setTimeout = originalSetTimeout
412        }
413  
414        const task = storage.loadTasks().doomed
415        console.log(JSON.stringify({
416          status: task?.status ?? null,
417          checkoutRunId: task?.checkoutRunId ?? null,
418          attempts: task?.attempts ?? null,
419        }))
420      `)
421  
422      assert.equal(output.status, 'failed', 'task should be dead-lettered after exhausting retries')
423      assert.equal(output.checkoutRunId, null, 'dead-lettered tasks must not retain a stale checkoutRunId')
424      assert.equal(output.attempts, 2)
425    })
426  
427    it('resumeQueue restores blocked queued tasks without clobbering their queuedAt timestamp', () => {
428      const output = runWithTempDataDir<{
429        queued: string[]
430        queuedAt: number | null
431        status: string | null
432      }>(`
433        const storageMod = await import('@/lib/server/storage')
434        const queueMod = await import('@/lib/server/runtime/queue')
435        const storage = storageMod.default || storageMod
436        const queue = queueMod.default || queueMod
437  
438        const originalQueuedAt = Date.now() - 45_000
439        storage.saveTasks({
440          dep: {
441            id: 'dep',
442            title: 'Dependency',
443            description: 'Still running',
444            status: 'running',
445            agentId: 'agent-a',
446            createdAt: originalQueuedAt - 10_000,
447            updatedAt: originalQueuedAt - 10_000,
448            startedAt: originalQueuedAt - 10_000,
449          },
450          blocked: {
451            id: 'blocked',
452            title: 'Blocked task',
453            description: 'Should be re-added to the queue on boot',
454            status: 'queued',
455            agentId: 'agent-a',
456            blockedBy: ['dep'],
457            queuedAt: originalQueuedAt,
458            createdAt: originalQueuedAt - 20_000,
459            updatedAt: originalQueuedAt - 5_000,
460          },
461        })
462        storage.saveQueue([])
463  
464        queue.resumeQueue()
465  
466        const task = storage.loadTasks().blocked
467        console.log(JSON.stringify({
468          queued: storage.loadQueue(),
469          queuedAt: task?.queuedAt ?? null,
470          status: task?.status ?? null,
471        }))
472      `)
473  
474      assert.deepEqual(output.queued, ['blocked', 'dep'])
475      assert.equal(output.status, 'queued')
476      assert.equal(typeof output.queuedAt, 'number')
477      assert.ok(output.queuedAt !== null)
478      assert.ok(output.queuedAt < Date.now() - 30_000)
479    })
480  })