/ src / lib / server / runtime / queue-advanced.test.ts
queue-advanced.test.ts
  1  import assert from 'node:assert/strict'
  2  import { describe, it } from 'node:test'
  3  import {
  4    dequeueNextRunnableTask,
  5    resolveTaskOriginConnectorFollowupTarget,
  6    resolveTaskResumeContext,
  7    resolveReusableTaskSessionId,
  8    applyTaskResumeStateToSession,
  9  } from '@/lib/server/runtime/queue'
 10  import type { BoardTask, Session } from '@/types'
 11  
 12  function makeTask(partial?: Partial<BoardTask> & { createdInSessionId?: string | null }): BoardTask {
 13    const now = Date.now()
 14    return { id: 'task-1', title: 'Test task', description: 'desc', status: 'queued', agentId: 'agent-a', createdAt: now, updatedAt: now, ...(partial || {}) } as BoardTask
 15  }
 16  
 17  // ---------------------------------------------------------------------------
 18  // dequeueNextRunnableTask
 19  // ---------------------------------------------------------------------------
 20  
 21  describe('dequeueNextRunnableTask', () => {
 22    it('diamond dependency graph — dequeues unblocked leaves in FIFO order', () => {
 23      const taskA = makeTask({ id: 'A', status: 'completed', title: 'A' })
 24      const taskB = makeTask({ id: 'B', status: 'queued', title: 'B', blockedBy: ['A'] })
 25      const taskC = makeTask({ id: 'C', status: 'queued', title: 'C', blockedBy: ['A'] })
 26      const taskD = makeTask({ id: 'D', status: 'queued', title: 'D', blockedBy: ['B', 'C'] })
 27  
 28      const tasks: Record<string, BoardTask> = {
 29        A: taskA, B: taskB, C: taskC, D: taskD,
 30      }
 31  
 32      // B and C both unblocked (A completed). D still blocked by B and C.
 33      const queue1 = ['B', 'C', 'D']
 34      const first = dequeueNextRunnableTask(queue1, tasks)
 35      assert.equal(first, 'B', 'first dequeue should pick B (FIFO)')
 36      assert.deepStrictEqual(queue1, ['C', 'D'], 'B removed from queue')
 37  
 38      // Complete B, now C is unblocked, D still blocked by C.
 39      tasks.B.status = 'completed'
 40      const queue2 = ['C', 'D']
 41      const second = dequeueNextRunnableTask(queue2, tasks)
 42      assert.equal(second, 'C', 'second dequeue should pick C')
 43      assert.deepStrictEqual(queue2, ['D'], 'C removed from queue')
 44  
 45      // Complete C, now D is unblocked.
 46      tasks.C.status = 'completed'
 47      const queue3 = ['D']
 48      const third = dequeueNextRunnableTask(queue3, tasks)
 49      assert.equal(third, 'D', 'third dequeue should pick D (all blockers completed)')
 50      assert.deepStrictEqual(queue3, [], 'queue is now empty')
 51    })
 52  
 53    it('retry scheduling gate — skips tasks with future retryScheduledAt', () => {
 54      const futureMs = Date.now() + 60_000
 55      const taskFuture = makeTask({ id: 'future', retryScheduledAt: futureMs })
 56      const tasks: Record<string, BoardTask> = { future: taskFuture }
 57      const queue = ['future']
 58  
 59      const result = dequeueNextRunnableTask(queue, tasks)
 60      assert.equal(result, null, 'should not dequeue task scheduled in the future')
 61  
 62      // Now set retryScheduledAt to the past.
 63      taskFuture.retryScheduledAt = Date.now() - 1000
 64      const queue2 = ['future']
 65      const result2 = dequeueNextRunnableTask(queue2, tasks)
 66      assert.equal(result2, 'future', 'should dequeue task with past retryScheduledAt')
 67    })
 68  
 69    it('stale queue cleanup — skips missing/non-queued tasks without crashing', () => {
 70      const taskValid = makeTask({ id: 'valid', title: 'Valid' })
 71      const taskCompleted = makeTask({ id: 'done', status: 'completed', title: 'Done' })
 72      const tasks: Record<string, BoardTask> = { valid: taskValid, done: taskCompleted }
 73  
 74      // Queue has stale IDs: 'ghost' doesn't exist, 'done' is completed, then 'valid'.
 75      const queue = ['ghost', 'done', 'valid']
 76      const result = dequeueNextRunnableTask(queue, tasks)
 77      assert.equal(result, 'valid', 'should skip stale entries and dequeue valid task')
 78    })
 79  
 80    it('empty queue returns null', () => {
 81      const result = dequeueNextRunnableTask([], {})
 82      assert.equal(result, null)
 83    })
 84  
 85    it('all-blocked queue returns null', () => {
 86      const taskA = makeTask({ id: 'A', status: 'queued', blockedBy: ['X'] })
 87      const taskB = makeTask({ id: 'B', status: 'queued', blockedBy: ['Y'] })
 88      const taskX = makeTask({ id: 'X', status: 'running' })
 89      const taskY = makeTask({ id: 'Y', status: 'running' })
 90      const tasks: Record<string, BoardTask> = { A: taskA, B: taskB, X: taskX, Y: taskY }
 91  
 92      const queue = ['A', 'B']
 93      const result = dequeueNextRunnableTask(queue, tasks)
 94      assert.equal(result, null, 'should return null when all tasks are blocked')
 95    })
 96  
 97    it('priority ordering — FIFO among unblocked tasks', () => {
 98      const t1 = makeTask({ id: 't1', title: 'First' })
 99      const t2 = makeTask({ id: 't2', title: 'Second' })
100      const t3 = makeTask({ id: 't3', title: 'Third' })
101      const tasks: Record<string, BoardTask> = { t1, t2, t3 }
102  
103      const queue = ['t1', 't2', 't3']
104      const first = dequeueNextRunnableTask(queue, tasks)
105      assert.equal(first, 't1', 'first in queue gets dequeued first')
106      const second = dequeueNextRunnableTask(queue, tasks)
107      assert.equal(second, 't2')
108      const third = dequeueNextRunnableTask(queue, tasks)
109      assert.equal(third, 't3')
110      const fourth = dequeueNextRunnableTask(queue, tasks)
111      assert.equal(fourth, null)
112    })
113  })
114  
115  // ---------------------------------------------------------------------------
116  // resolveTaskResumeContext
117  // ---------------------------------------------------------------------------
118  
119  describe('resolveTaskResumeContext', () => {
120    it('self-resume from codexResumeId on the task itself', () => {
121      const task = makeTask({
122        id: 'self-task',
123        codexResumeId: 'codex-thread-abc',
124        sessionId: 'sess-1',
125      })
126      const tasksById: Record<string, BoardTask> = { 'self-task': task }
127      const result = resolveTaskResumeContext(task, tasksById)
128  
129      assert.ok(result, 'should find resume context')
130      assert.equal(result.source, 'self')
131      assert.equal(result.sourceTaskId, 'self-task')
132      assert.equal(result.resume.codexThreadId, 'codex-thread-abc')
133    })
134  
135    it('deep delegation chain resume — falls back to parent task', () => {
136      const grandparent = makeTask({
137        id: 'gp',
138        status: 'completed',
139        title: 'Grandparent',
140        claudeResumeId: 'claude-gp',
141      })
142      const parent = makeTask({
143        id: 'parent',
144        status: 'completed',
145        title: 'Parent',
146        delegatedFromTaskId: 'gp',
147        codexResumeId: 'codex-parent',
148      })
149      const child = makeTask({
150        id: 'child',
151        status: 'queued',
152        title: 'Child',
153        delegatedFromTaskId: 'parent',
154      })
155      const tasksById: Record<string, BoardTask> = { gp: grandparent, parent, child }
156  
157      const result = resolveTaskResumeContext(child, tasksById)
158      assert.ok(result, 'should resolve resume context')
159      // Child has no resume state itself, so it should fall through to delegatedFromTaskId (parent).
160      assert.equal(result.source, 'delegated_from_task')
161      assert.equal(result.sourceTaskId, 'parent')
162      assert.equal(result.resume.codexThreadId, 'codex-parent')
163    })
164  
165    it('blocked-by resume with multiple blockers — falls back to second', () => {
166      const blockerNoResume = makeTask({
167        id: 'blocker-1',
168        status: 'completed',
169        title: 'Blocker 1',
170        // No resume state at all.
171      })
172      const blockerWithResume = makeTask({
173        id: 'blocker-2',
174        status: 'completed',
175        title: 'Blocker 2',
176        claudeResumeId: 'claude-b2',
177      })
178      const task = makeTask({
179        id: 'blocked-task',
180        blockedBy: ['blocker-1', 'blocker-2'],
181      })
182      const tasksById: Record<string, BoardTask> = {
183        'blocker-1': blockerNoResume,
184        'blocker-2': blockerWithResume,
185        'blocked-task': task,
186      }
187  
188      const result = resolveTaskResumeContext(task, tasksById)
189      assert.ok(result, 'should find resume context from second blocker')
190      assert.equal(result.source, 'blocked_by')
191      assert.equal(result.sourceTaskId, 'blocker-2')
192      assert.equal(result.resume.claudeSessionId, 'claude-b2')
193    })
194  
195    it('no resume context available — returns null', () => {
196      const task = makeTask({ id: 'fresh' })
197      const tasksById: Record<string, BoardTask> = { fresh: task }
198  
199      const result = resolveTaskResumeContext(task, tasksById)
200      assert.equal(result, null)
201    })
202  })
203  
204  // ---------------------------------------------------------------------------
205  // resolveReusableTaskSessionId
206  // ---------------------------------------------------------------------------
207  
208  describe('resolveReusableTaskSessionId', () => {
209    it('reuse blocker session when task has no own session', () => {
210      const blocker = makeTask({
211        id: 'blocker',
212        status: 'completed',
213        sessionId: 'sess-blocker',
214      })
215      const task = makeTask({
216        id: 'followup',
217        blockedBy: ['blocker'],
218      })
219      const tasks: Record<string, BoardTask> = { blocker, followup: task }
220      const sessions: Record<string, Partial<Session>> = {
221        'sess-blocker': { id: 'sess-blocker', messages: [] } as Partial<Session>,
222      }
223  
224      const result = resolveReusableTaskSessionId(task, tasks, sessions as Record<string, Session>)
225      assert.equal(result, 'sess-blocker')
226    })
227  
228    it('prefer task own checkpoint.lastSessionId over blocker session', () => {
229      const blocker = makeTask({
230        id: 'blocker',
231        status: 'completed',
232        sessionId: 'sess-blocker',
233      })
234      const task = makeTask({
235        id: 'followup',
236        blockedBy: ['blocker'],
237        checkpoint: { lastSessionId: 'sess-own', updatedAt: Date.now() },
238      })
239      const tasks: Record<string, BoardTask> = { blocker, followup: task }
240      const sessions: Record<string, Partial<Session>> = {
241        'sess-own': { id: 'sess-own', messages: [] } as Partial<Session>,
242        'sess-blocker': { id: 'sess-blocker', messages: [] } as Partial<Session>,
243      }
244  
245      const result = resolveReusableTaskSessionId(task, tasks, sessions as Record<string, Session>)
246      assert.equal(result, 'sess-own', 'should prefer own checkpoint session')
247    })
248  
249    it('no session available — returns empty string', () => {
250      const task = makeTask({ id: 'orphan' })
251      const tasks: Record<string, BoardTask> = { orphan: task }
252  
253      const result = resolveReusableTaskSessionId(task, tasks, {})
254      assert.equal(result, '', 'should return empty string when no session is available')
255    })
256  })
257  
258  // ---------------------------------------------------------------------------
259  // resolveTaskOriginConnectorFollowupTarget
260  // ---------------------------------------------------------------------------
261  
262  describe('resolveTaskOriginConnectorFollowupTarget', () => {
263    it('multi-hop delegation followup via delegatedByAgentId chain', () => {
264      // agent-C's task was delegated by agent-B, which traces back to agent-A's connector.
265      const task = makeTask({
266        id: 'task-c',
267        agentId: 'agent-c',
268        delegatedByAgentId: 'agent-b',
269        createdInSessionId: 'sess-origin',
270      } as Partial<BoardTask> & { createdInSessionId: string })
271  
272      const sessions: Record<string, { messages: Array<{ role: string; text: string; time: number; source?: { connectorId?: string; channelId?: string } }> }> = {
273        'sess-origin': {
274          messages: [
275            {
276              role: 'user',
277              text: 'Do this task',
278              time: Date.now(),
279              source: { connectorId: 'conn-a', channelId: 'channel-1' },
280            },
281          ],
282        },
283      }
284  
285      const connectors: Record<string, { id: string; name: string; platform: string; agentId: string; config: Record<string, string>; isEnabled: boolean; status: string; createdAt: number; updatedAt: number }> = {
286        'conn-a': {
287          id: 'conn-a',
288          name: 'Agent A WhatsApp',
289          platform: 'discord',
290          agentId: 'agent-a',
291          config: {},
292          isEnabled: true,
293          status: 'running',
294          createdAt: Date.now(),
295          updatedAt: Date.now(),
296        },
297      }
298  
299      const running = [
300        {
301          id: 'conn-a',
302          platform: 'discord',
303          agentId: 'agent-a',
304          supportsSend: true,
305          configuredTargets: [],
306          recentChannelId: null,
307        },
308      ]
309  
310      // The connector belongs to agent-a. The task's agentId is agent-c and
311      // delegatedByAgentId is agent-b. Neither matches agent-a, so the connector
312      // should NOT match (owner filter excludes it).
313      const result = resolveTaskOriginConnectorFollowupTarget({
314        task: task as never,
315        sessions: sessions as never,
316        connectors: connectors as never,
317        running,
318      })
319      assert.equal(result, null, 'connector owned by agent-a should not be accessible via agent-c delegated by agent-b')
320  
321      // Now set delegatedByAgentId to agent-a — it should match.
322      const mutableTask = task as unknown as Record<string, unknown>
323      mutableTask.delegatedByAgentId = 'agent-a'
324      const result2 = resolveTaskOriginConnectorFollowupTarget({
325        task: task as never,
326        sessions: sessions as never,
327        connectors: connectors as never,
328        running,
329      })
330      assert.ok(result2, 'should find connector followup target when delegatedByAgentId matches connector owner')
331      assert.equal(result2.connectorId, 'conn-a')
332      assert.equal(result2.channelId, 'channel-1')
333    })
334  
335    it('WhatsApp connector normalizes channel to JID format', () => {
336      const task = makeTask({
337        id: 'wa-task',
338        agentId: 'agent-wa',
339        createdInSessionId: 'sess-wa',
340      } as Partial<BoardTask> & { createdInSessionId: string })
341  
342      const sessions: Record<string, { messages: Array<{ role: string; text: string; time: number; source?: { connectorId?: string; channelId?: string } }> }> = {
343        'sess-wa': {
344          messages: [
345            {
346              role: 'user',
347              text: 'Check status',
348              time: Date.now(),
349              source: { connectorId: 'conn-wa', channelId: '+1 555 000 0000' },
350            },
351          ],
352        },
353      }
354  
355      const connectors: Record<string, { id: string; name: string; platform: string; agentId: string; config: Record<string, string>; isEnabled: boolean; status: string; createdAt: number; updatedAt: number }> = {
356        'conn-wa': {
357          id: 'conn-wa',
358          name: 'WhatsApp',
359          platform: 'whatsapp',
360          agentId: 'agent-wa',
361          config: {},
362          isEnabled: true,
363          status: 'running',
364          createdAt: Date.now(),
365          updatedAt: Date.now(),
366        },
367      }
368  
369      const running = [
370        {
371          id: 'conn-wa',
372          platform: 'whatsapp',
373          agentId: 'agent-wa',
374          supportsSend: true,
375          configuredTargets: [],
376          recentChannelId: null,
377        },
378      ]
379  
380      const result = resolveTaskOriginConnectorFollowupTarget({
381        task: task as never,
382        sessions: sessions as never,
383        connectors: connectors as never,
384        running,
385      })
386  
387      assert.ok(result, 'should resolve WhatsApp followup target')
388      assert.equal(result.connectorId, 'conn-wa')
389      // +1 555 000 0000 → cleaned to 15550000000 → 15550000000@s.whatsapp.net
390      assert.equal(result.channelId, '15550000000@s.whatsapp.net')
391    })
392  
393    it('no user messages with connector source — returns null', () => {
394      const task = makeTask({
395        id: 'no-source',
396        agentId: 'agent-x',
397        createdInSessionId: 'sess-empty',
398      } as Partial<BoardTask> & { createdInSessionId: string })
399  
400      const sessions: Record<string, { messages: Array<{ role: string; text: string; time: number }> }> = {
401        'sess-empty': {
402          messages: [
403            { role: 'user', text: 'Hello', time: Date.now() },
404            { role: 'assistant', text: 'Hi there', time: Date.now() },
405          ],
406        },
407      }
408  
409      const result = resolveTaskOriginConnectorFollowupTarget({
410        task: task as never,
411        sessions: sessions as never,
412        connectors: {},
413        running: [],
414      })
415  
416      assert.equal(result, null, 'should return null when no messages have connector source')
417    })
418  })
419  
420  // ---------------------------------------------------------------------------
421  // applyTaskResumeStateToSession
422  // ---------------------------------------------------------------------------
423  
424  describe('applyTaskResumeStateToSession', () => {
425    function makeSession(partial?: Partial<Session>): Session {
426      return {
427        id: 'sess-1',
428        name: 'Test',
429        cwd: '/tmp',
430        user: 'test',
431        provider: 'anthropic',
432        model: 'claude-sonnet-4-20250514',
433        claudeSessionId: null,
434        messages: [],
435        createdAt: Date.now(),
436        lastActiveAt: Date.now(),
437        ...(partial || {}),
438      } as Session
439    }
440  
441    it('partial resume — only codexThreadId set, others null', () => {
442      const session = makeSession()
443      const resume = {
444        claudeSessionId: null,
445        codexThreadId: 'codex-123',
446        opencodeSessionId: null,
447        delegateResumeIds: {
448          claudeCode: null,
449          codex: 'codex-123',
450          opencode: null,
451          gemini: null,
452        },
453      }
454  
455      const changed = applyTaskResumeStateToSession(session, resume)
456      assert.equal(changed, true, 'should report change')
457      assert.equal(session.codexThreadId, 'codex-123')
458      assert.equal(session.claudeSessionId, null, 'claudeSessionId should remain null')
459    })
460  
461    it('no-op when session already has the same resume state', () => {
462      const session = makeSession({
463        claudeSessionId: 'claude-abc',
464        codexThreadId: 'codex-123',
465        opencodeSessionId: 'oc-456',
466        delegateResumeIds: {
467          claudeCode: 'claude-abc',
468          codex: 'codex-123',
469          opencode: 'oc-456',
470          gemini: 'gem-789',
471        },
472      })
473  
474      const resume = {
475        claudeSessionId: 'claude-abc',
476        codexThreadId: 'codex-123',
477        opencodeSessionId: 'oc-456',
478        delegateResumeIds: {
479          claudeCode: 'claude-abc',
480          codex: 'codex-123',
481          opencode: 'oc-456',
482          gemini: 'gem-789',
483        },
484      }
485  
486      const changed = applyTaskResumeStateToSession(session, resume)
487      assert.equal(changed, false, 'should return false when nothing changes')
488    })
489  
490    it('full resume state hydration — all 4 fields applied', () => {
491      const session = makeSession()
492      const resume = {
493        claudeSessionId: 'claude-new',
494        codexThreadId: 'codex-new',
495        opencodeSessionId: 'oc-new',
496        delegateResumeIds: {
497          claudeCode: 'claude-new',
498          codex: 'codex-new',
499          opencode: 'oc-new',
500          gemini: 'gem-new',
501        },
502      }
503  
504      const changed = applyTaskResumeStateToSession(session, resume)
505      assert.equal(changed, true, 'should report change')
506      assert.equal(session.claudeSessionId, 'claude-new')
507      assert.equal(session.codexThreadId, 'codex-new')
508      assert.equal(session.opencodeSessionId, 'oc-new')
509      assert.deepStrictEqual(session.delegateResumeIds, {
510        claudeCode: 'claude-new',
511        codex: 'codex-new',
512        opencode: 'oc-new',
513        gemini: 'gem-new',
514      })
515    })
516  
517    it('returns false for null resume', () => {
518      const session = makeSession()
519      const changed = applyTaskResumeStateToSession(session, null)
520      assert.equal(changed, false)
521    })
522  
523    it('returns false for undefined resume', () => {
524      const session = makeSession()
525      const changed = applyTaskResumeStateToSession(session, undefined)
526      assert.equal(changed, false)
527    })
528  })