queue-advanced.test.ts
1 import assert from 'node:assert/strict' 2 import { describe, it } from 'node:test' 3 import { 4 dequeueNextRunnableTask, 5 resolveTaskOriginConnectorFollowupTarget, 6 resolveTaskResumeContext, 7 resolveReusableTaskSessionId, 8 applyTaskResumeStateToSession, 9 } from '@/lib/server/runtime/queue' 10 import type { BoardTask, Session } from '@/types' 11 12 function makeTask(partial?: Partial<BoardTask> & { createdInSessionId?: string | null }): BoardTask { 13 const now = Date.now() 14 return { id: 'task-1', title: 'Test task', description: 'desc', status: 'queued', agentId: 'agent-a', createdAt: now, updatedAt: now, ...(partial || {}) } as BoardTask 15 } 16 17 // --------------------------------------------------------------------------- 18 // dequeueNextRunnableTask 19 // --------------------------------------------------------------------------- 20 21 describe('dequeueNextRunnableTask', () => { 22 it('diamond dependency graph — dequeues unblocked leaves in FIFO order', () => { 23 const taskA = makeTask({ id: 'A', status: 'completed', title: 'A' }) 24 const taskB = makeTask({ id: 'B', status: 'queued', title: 'B', blockedBy: ['A'] }) 25 const taskC = makeTask({ id: 'C', status: 'queued', title: 'C', blockedBy: ['A'] }) 26 const taskD = makeTask({ id: 'D', status: 'queued', title: 'D', blockedBy: ['B', 'C'] }) 27 28 const tasks: Record<string, BoardTask> = { 29 A: taskA, B: taskB, C: taskC, D: taskD, 30 } 31 32 // B and C both unblocked (A completed). D still blocked by B and C. 33 const queue1 = ['B', 'C', 'D'] 34 const first = dequeueNextRunnableTask(queue1, tasks) 35 assert.equal(first, 'B', 'first dequeue should pick B (FIFO)') 36 assert.deepStrictEqual(queue1, ['C', 'D'], 'B removed from queue') 37 38 // Complete B, now C is unblocked, D still blocked by C. 39 tasks.B.status = 'completed' 40 const queue2 = ['C', 'D'] 41 const second = dequeueNextRunnableTask(queue2, tasks) 42 assert.equal(second, 'C', 'second dequeue should pick C') 43 assert.deepStrictEqual(queue2, ['D'], 'C removed from queue') 44 45 // Complete C, now D is unblocked. 46 tasks.C.status = 'completed' 47 const queue3 = ['D'] 48 const third = dequeueNextRunnableTask(queue3, tasks) 49 assert.equal(third, 'D', 'third dequeue should pick D (all blockers completed)') 50 assert.deepStrictEqual(queue3, [], 'queue is now empty') 51 }) 52 53 it('retry scheduling gate — skips tasks with future retryScheduledAt', () => { 54 const futureMs = Date.now() + 60_000 55 const taskFuture = makeTask({ id: 'future', retryScheduledAt: futureMs }) 56 const tasks: Record<string, BoardTask> = { future: taskFuture } 57 const queue = ['future'] 58 59 const result = dequeueNextRunnableTask(queue, tasks) 60 assert.equal(result, null, 'should not dequeue task scheduled in the future') 61 62 // Now set retryScheduledAt to the past. 63 taskFuture.retryScheduledAt = Date.now() - 1000 64 const queue2 = ['future'] 65 const result2 = dequeueNextRunnableTask(queue2, tasks) 66 assert.equal(result2, 'future', 'should dequeue task with past retryScheduledAt') 67 }) 68 69 it('stale queue cleanup — skips missing/non-queued tasks without crashing', () => { 70 const taskValid = makeTask({ id: 'valid', title: 'Valid' }) 71 const taskCompleted = makeTask({ id: 'done', status: 'completed', title: 'Done' }) 72 const tasks: Record<string, BoardTask> = { valid: taskValid, done: taskCompleted } 73 74 // Queue has stale IDs: 'ghost' doesn't exist, 'done' is completed, then 'valid'. 75 const queue = ['ghost', 'done', 'valid'] 76 const result = dequeueNextRunnableTask(queue, tasks) 77 assert.equal(result, 'valid', 'should skip stale entries and dequeue valid task') 78 }) 79 80 it('empty queue returns null', () => { 81 const result = dequeueNextRunnableTask([], {}) 82 assert.equal(result, null) 83 }) 84 85 it('all-blocked queue returns null', () => { 86 const taskA = makeTask({ id: 'A', status: 'queued', blockedBy: ['X'] }) 87 const taskB = makeTask({ id: 'B', status: 'queued', blockedBy: ['Y'] }) 88 const taskX = makeTask({ id: 'X', status: 'running' }) 89 const taskY = makeTask({ id: 'Y', status: 'running' }) 90 const tasks: Record<string, BoardTask> = { A: taskA, B: taskB, X: taskX, Y: taskY } 91 92 const queue = ['A', 'B'] 93 const result = dequeueNextRunnableTask(queue, tasks) 94 assert.equal(result, null, 'should return null when all tasks are blocked') 95 }) 96 97 it('priority ordering — FIFO among unblocked tasks', () => { 98 const t1 = makeTask({ id: 't1', title: 'First' }) 99 const t2 = makeTask({ id: 't2', title: 'Second' }) 100 const t3 = makeTask({ id: 't3', title: 'Third' }) 101 const tasks: Record<string, BoardTask> = { t1, t2, t3 } 102 103 const queue = ['t1', 't2', 't3'] 104 const first = dequeueNextRunnableTask(queue, tasks) 105 assert.equal(first, 't1', 'first in queue gets dequeued first') 106 const second = dequeueNextRunnableTask(queue, tasks) 107 assert.equal(second, 't2') 108 const third = dequeueNextRunnableTask(queue, tasks) 109 assert.equal(third, 't3') 110 const fourth = dequeueNextRunnableTask(queue, tasks) 111 assert.equal(fourth, null) 112 }) 113 }) 114 115 // --------------------------------------------------------------------------- 116 // resolveTaskResumeContext 117 // --------------------------------------------------------------------------- 118 119 describe('resolveTaskResumeContext', () => { 120 it('self-resume from codexResumeId on the task itself', () => { 121 const task = makeTask({ 122 id: 'self-task', 123 codexResumeId: 'codex-thread-abc', 124 sessionId: 'sess-1', 125 }) 126 const tasksById: Record<string, BoardTask> = { 'self-task': task } 127 const result = resolveTaskResumeContext(task, tasksById) 128 129 assert.ok(result, 'should find resume context') 130 assert.equal(result.source, 'self') 131 assert.equal(result.sourceTaskId, 'self-task') 132 assert.equal(result.resume.codexThreadId, 'codex-thread-abc') 133 }) 134 135 it('deep delegation chain resume — falls back to parent task', () => { 136 const grandparent = makeTask({ 137 id: 'gp', 138 status: 'completed', 139 title: 'Grandparent', 140 claudeResumeId: 'claude-gp', 141 }) 142 const parent = makeTask({ 143 id: 'parent', 144 status: 'completed', 145 title: 'Parent', 146 delegatedFromTaskId: 'gp', 147 codexResumeId: 'codex-parent', 148 }) 149 const child = makeTask({ 150 id: 'child', 151 status: 'queued', 152 title: 'Child', 153 delegatedFromTaskId: 'parent', 154 }) 155 const tasksById: Record<string, BoardTask> = { gp: grandparent, parent, child } 156 157 const result = resolveTaskResumeContext(child, tasksById) 158 assert.ok(result, 'should resolve resume context') 159 // Child has no resume state itself, so it should fall through to delegatedFromTaskId (parent). 160 assert.equal(result.source, 'delegated_from_task') 161 assert.equal(result.sourceTaskId, 'parent') 162 assert.equal(result.resume.codexThreadId, 'codex-parent') 163 }) 164 165 it('blocked-by resume with multiple blockers — falls back to second', () => { 166 const blockerNoResume = makeTask({ 167 id: 'blocker-1', 168 status: 'completed', 169 title: 'Blocker 1', 170 // No resume state at all. 171 }) 172 const blockerWithResume = makeTask({ 173 id: 'blocker-2', 174 status: 'completed', 175 title: 'Blocker 2', 176 claudeResumeId: 'claude-b2', 177 }) 178 const task = makeTask({ 179 id: 'blocked-task', 180 blockedBy: ['blocker-1', 'blocker-2'], 181 }) 182 const tasksById: Record<string, BoardTask> = { 183 'blocker-1': blockerNoResume, 184 'blocker-2': blockerWithResume, 185 'blocked-task': task, 186 } 187 188 const result = resolveTaskResumeContext(task, tasksById) 189 assert.ok(result, 'should find resume context from second blocker') 190 assert.equal(result.source, 'blocked_by') 191 assert.equal(result.sourceTaskId, 'blocker-2') 192 assert.equal(result.resume.claudeSessionId, 'claude-b2') 193 }) 194 195 it('no resume context available — returns null', () => { 196 const task = makeTask({ id: 'fresh' }) 197 const tasksById: Record<string, BoardTask> = { fresh: task } 198 199 const result = resolveTaskResumeContext(task, tasksById) 200 assert.equal(result, null) 201 }) 202 }) 203 204 // --------------------------------------------------------------------------- 205 // resolveReusableTaskSessionId 206 // --------------------------------------------------------------------------- 207 208 describe('resolveReusableTaskSessionId', () => { 209 it('reuse blocker session when task has no own session', () => { 210 const blocker = makeTask({ 211 id: 'blocker', 212 status: 'completed', 213 sessionId: 'sess-blocker', 214 }) 215 const task = makeTask({ 216 id: 'followup', 217 blockedBy: ['blocker'], 218 }) 219 const tasks: Record<string, BoardTask> = { blocker, followup: task } 220 const sessions: Record<string, Partial<Session>> = { 221 'sess-blocker': { id: 'sess-blocker', messages: [] } as Partial<Session>, 222 } 223 224 const result = resolveReusableTaskSessionId(task, tasks, sessions as Record<string, Session>) 225 assert.equal(result, 'sess-blocker') 226 }) 227 228 it('prefer task own checkpoint.lastSessionId over blocker session', () => { 229 const blocker = makeTask({ 230 id: 'blocker', 231 status: 'completed', 232 sessionId: 'sess-blocker', 233 }) 234 const task = makeTask({ 235 id: 'followup', 236 blockedBy: ['blocker'], 237 checkpoint: { lastSessionId: 'sess-own', updatedAt: Date.now() }, 238 }) 239 const tasks: Record<string, BoardTask> = { blocker, followup: task } 240 const sessions: Record<string, Partial<Session>> = { 241 'sess-own': { id: 'sess-own', messages: [] } as Partial<Session>, 242 'sess-blocker': { id: 'sess-blocker', messages: [] } as Partial<Session>, 243 } 244 245 const result = resolveReusableTaskSessionId(task, tasks, sessions as Record<string, Session>) 246 assert.equal(result, 'sess-own', 'should prefer own checkpoint session') 247 }) 248 249 it('no session available — returns empty string', () => { 250 const task = makeTask({ id: 'orphan' }) 251 const tasks: Record<string, BoardTask> = { orphan: task } 252 253 const result = resolveReusableTaskSessionId(task, tasks, {}) 254 assert.equal(result, '', 'should return empty string when no session is available') 255 }) 256 }) 257 258 // --------------------------------------------------------------------------- 259 // resolveTaskOriginConnectorFollowupTarget 260 // --------------------------------------------------------------------------- 261 262 describe('resolveTaskOriginConnectorFollowupTarget', () => { 263 it('multi-hop delegation followup via delegatedByAgentId chain', () => { 264 // agent-C's task was delegated by agent-B, which traces back to agent-A's connector. 265 const task = makeTask({ 266 id: 'task-c', 267 agentId: 'agent-c', 268 delegatedByAgentId: 'agent-b', 269 createdInSessionId: 'sess-origin', 270 } as Partial<BoardTask> & { createdInSessionId: string }) 271 272 const sessions: Record<string, { messages: Array<{ role: string; text: string; time: number; source?: { connectorId?: string; channelId?: string } }> }> = { 273 'sess-origin': { 274 messages: [ 275 { 276 role: 'user', 277 text: 'Do this task', 278 time: Date.now(), 279 source: { connectorId: 'conn-a', channelId: 'channel-1' }, 280 }, 281 ], 282 }, 283 } 284 285 const connectors: Record<string, { id: string; name: string; platform: string; agentId: string; config: Record<string, string>; isEnabled: boolean; status: string; createdAt: number; updatedAt: number }> = { 286 'conn-a': { 287 id: 'conn-a', 288 name: 'Agent A WhatsApp', 289 platform: 'discord', 290 agentId: 'agent-a', 291 config: {}, 292 isEnabled: true, 293 status: 'running', 294 createdAt: Date.now(), 295 updatedAt: Date.now(), 296 }, 297 } 298 299 const running = [ 300 { 301 id: 'conn-a', 302 platform: 'discord', 303 agentId: 'agent-a', 304 supportsSend: true, 305 configuredTargets: [], 306 recentChannelId: null, 307 }, 308 ] 309 310 // The connector belongs to agent-a. The task's agentId is agent-c and 311 // delegatedByAgentId is agent-b. Neither matches agent-a, so the connector 312 // should NOT match (owner filter excludes it). 313 const result = resolveTaskOriginConnectorFollowupTarget({ 314 task: task as never, 315 sessions: sessions as never, 316 connectors: connectors as never, 317 running, 318 }) 319 assert.equal(result, null, 'connector owned by agent-a should not be accessible via agent-c delegated by agent-b') 320 321 // Now set delegatedByAgentId to agent-a — it should match. 322 const mutableTask = task as unknown as Record<string, unknown> 323 mutableTask.delegatedByAgentId = 'agent-a' 324 const result2 = resolveTaskOriginConnectorFollowupTarget({ 325 task: task as never, 326 sessions: sessions as never, 327 connectors: connectors as never, 328 running, 329 }) 330 assert.ok(result2, 'should find connector followup target when delegatedByAgentId matches connector owner') 331 assert.equal(result2.connectorId, 'conn-a') 332 assert.equal(result2.channelId, 'channel-1') 333 }) 334 335 it('WhatsApp connector normalizes channel to JID format', () => { 336 const task = makeTask({ 337 id: 'wa-task', 338 agentId: 'agent-wa', 339 createdInSessionId: 'sess-wa', 340 } as Partial<BoardTask> & { createdInSessionId: string }) 341 342 const sessions: Record<string, { messages: Array<{ role: string; text: string; time: number; source?: { connectorId?: string; channelId?: string } }> }> = { 343 'sess-wa': { 344 messages: [ 345 { 346 role: 'user', 347 text: 'Check status', 348 time: Date.now(), 349 source: { connectorId: 'conn-wa', channelId: '+1 555 000 0000' }, 350 }, 351 ], 352 }, 353 } 354 355 const connectors: Record<string, { id: string; name: string; platform: string; agentId: string; config: Record<string, string>; isEnabled: boolean; status: string; createdAt: number; updatedAt: number }> = { 356 'conn-wa': { 357 id: 'conn-wa', 358 name: 'WhatsApp', 359 platform: 'whatsapp', 360 agentId: 'agent-wa', 361 config: {}, 362 isEnabled: true, 363 status: 'running', 364 createdAt: Date.now(), 365 updatedAt: Date.now(), 366 }, 367 } 368 369 const running = [ 370 { 371 id: 'conn-wa', 372 platform: 'whatsapp', 373 agentId: 'agent-wa', 374 supportsSend: true, 375 configuredTargets: [], 376 recentChannelId: null, 377 }, 378 ] 379 380 const result = resolveTaskOriginConnectorFollowupTarget({ 381 task: task as never, 382 sessions: sessions as never, 383 connectors: connectors as never, 384 running, 385 }) 386 387 assert.ok(result, 'should resolve WhatsApp followup target') 388 assert.equal(result.connectorId, 'conn-wa') 389 // +1 555 000 0000 → cleaned to 15550000000 → 15550000000@s.whatsapp.net 390 assert.equal(result.channelId, '15550000000@s.whatsapp.net') 391 }) 392 393 it('no user messages with connector source — returns null', () => { 394 const task = makeTask({ 395 id: 'no-source', 396 agentId: 'agent-x', 397 createdInSessionId: 'sess-empty', 398 } as Partial<BoardTask> & { createdInSessionId: string }) 399 400 const sessions: Record<string, { messages: Array<{ role: string; text: string; time: number }> }> = { 401 'sess-empty': { 402 messages: [ 403 { role: 'user', text: 'Hello', time: Date.now() }, 404 { role: 'assistant', text: 'Hi there', time: Date.now() }, 405 ], 406 }, 407 } 408 409 const result = resolveTaskOriginConnectorFollowupTarget({ 410 task: task as never, 411 sessions: sessions as never, 412 connectors: {}, 413 running: [], 414 }) 415 416 assert.equal(result, null, 'should return null when no messages have connector source') 417 }) 418 }) 419 420 // --------------------------------------------------------------------------- 421 // applyTaskResumeStateToSession 422 // --------------------------------------------------------------------------- 423 424 describe('applyTaskResumeStateToSession', () => { 425 function makeSession(partial?: Partial<Session>): Session { 426 return { 427 id: 'sess-1', 428 name: 'Test', 429 cwd: '/tmp', 430 user: 'test', 431 provider: 'anthropic', 432 model: 'claude-sonnet-4-20250514', 433 claudeSessionId: null, 434 messages: [], 435 createdAt: Date.now(), 436 lastActiveAt: Date.now(), 437 ...(partial || {}), 438 } as Session 439 } 440 441 it('partial resume — only codexThreadId set, others null', () => { 442 const session = makeSession() 443 const resume = { 444 claudeSessionId: null, 445 codexThreadId: 'codex-123', 446 opencodeSessionId: null, 447 delegateResumeIds: { 448 claudeCode: null, 449 codex: 'codex-123', 450 opencode: null, 451 gemini: null, 452 }, 453 } 454 455 const changed = applyTaskResumeStateToSession(session, resume) 456 assert.equal(changed, true, 'should report change') 457 assert.equal(session.codexThreadId, 'codex-123') 458 assert.equal(session.claudeSessionId, null, 'claudeSessionId should remain null') 459 }) 460 461 it('no-op when session already has the same resume state', () => { 462 const session = makeSession({ 463 claudeSessionId: 'claude-abc', 464 codexThreadId: 'codex-123', 465 opencodeSessionId: 'oc-456', 466 delegateResumeIds: { 467 claudeCode: 'claude-abc', 468 codex: 'codex-123', 469 opencode: 'oc-456', 470 gemini: 'gem-789', 471 }, 472 }) 473 474 const resume = { 475 claudeSessionId: 'claude-abc', 476 codexThreadId: 'codex-123', 477 opencodeSessionId: 'oc-456', 478 delegateResumeIds: { 479 claudeCode: 'claude-abc', 480 codex: 'codex-123', 481 opencode: 'oc-456', 482 gemini: 'gem-789', 483 }, 484 } 485 486 const changed = applyTaskResumeStateToSession(session, resume) 487 assert.equal(changed, false, 'should return false when nothing changes') 488 }) 489 490 it('full resume state hydration — all 4 fields applied', () => { 491 const session = makeSession() 492 const resume = { 493 claudeSessionId: 'claude-new', 494 codexThreadId: 'codex-new', 495 opencodeSessionId: 'oc-new', 496 delegateResumeIds: { 497 claudeCode: 'claude-new', 498 codex: 'codex-new', 499 opencode: 'oc-new', 500 gemini: 'gem-new', 501 }, 502 } 503 504 const changed = applyTaskResumeStateToSession(session, resume) 505 assert.equal(changed, true, 'should report change') 506 assert.equal(session.claudeSessionId, 'claude-new') 507 assert.equal(session.codexThreadId, 'codex-new') 508 assert.equal(session.opencodeSessionId, 'oc-new') 509 assert.deepStrictEqual(session.delegateResumeIds, { 510 claudeCode: 'claude-new', 511 codex: 'codex-new', 512 opencode: 'oc-new', 513 gemini: 'gem-new', 514 }) 515 }) 516 517 it('returns false for null resume', () => { 518 const session = makeSession() 519 const changed = applyTaskResumeStateToSession(session, null) 520 assert.equal(changed, false) 521 }) 522 523 it('returns false for undefined resume', () => { 524 const session = makeSession() 525 const changed = applyTaskResumeStateToSession(session, undefined) 526 assert.equal(changed, false) 527 }) 528 })