session-run-manager.test.ts
1 import assert from 'node:assert/strict' 2 import fs from 'node:fs' 3 import os from 'node:os' 4 import path from 'node:path' 5 import { after, afterEach, before, describe, it } from 'node:test' 6 import { runWithTempDataDir } from '@/lib/server/test-utils/run-with-temp-data-dir' 7 8 // Suppress unhandled rejections from background drainExecution() calls 9 // that fail because executeSessionChatTurn has no real LLM provider. 10 const _suppressedErrors: unknown[] = [] 11 function suppressionHandler(err: unknown) { _suppressedErrors.push(err) } 12 process.on('unhandledRejection', suppressionHandler) 13 14 const originalEnv = { 15 DATA_DIR: process.env.DATA_DIR, 16 WORKSPACE_DIR: process.env.WORKSPACE_DIR, 17 SWARMCLAW_BUILD_MODE: process.env.SWARMCLAW_BUILD_MODE, 18 } 19 20 let tempDir = '' 21 let mgr: typeof import('@/lib/server/runtime/session-run-manager') 22 let storage: typeof import('@/lib/server/storage') 23 24 const globalKey = '__swarmclaw_session_run_manager__' as const 25 type RuntimeState = { 26 runningByExecution: Map<string, unknown> 27 queueByExecution: Map<string, unknown[]> 28 runs: Map<string, unknown> 29 recentRunIds: string[] 30 promises: Map<string, unknown> 31 deferredDrainTimers?: Map<string, ReturnType<typeof setTimeout>> 32 activityLeaseRenewTimers?: Map<string, ReturnType<typeof setInterval>> 33 } 34 35 type ManualQueueEntry = { 36 executionKey: string 37 run: { 38 id: string 39 sessionId: string 40 source: string 41 internal: boolean 42 mode: 'followup' | 'steer' | 'collect' 43 status: 'queued' | 'running' | 'completed' | 'failed' | 'cancelled' 44 messagePreview: string 45 queuedAt: number 46 startedAt?: number 47 endedAt?: number 48 error?: string 49 } 50 message: string 51 onEvents: Array<(event: unknown) => void> 52 signalController: AbortController 53 resolve: (value: unknown) => void 54 reject: (error: Error) => void 55 promise: Promise<unknown> 56 } 57 58 /** Pending promises from fire-and-forget drain calls. We suppress their 59 * rejections and await them in afterEach so node:test doesn't see 60 * "asynchronous activity after the test ended" warnings. */ 61 const pendingPromises: Promise<unknown>[] = [] 62 63 function resetState() { 64 if (mgr && 'resetSessionRunManagerForTests' in mgr && typeof mgr.resetSessionRunManagerForTests === 'function') { 65 mgr.resetSessionRunManagerForTests() 66 return 67 } 68 const state = (globalThis as Record<string, unknown>)[globalKey] as RuntimeState | undefined 69 if (state) { 70 state.runningByExecution.clear() 71 state.queueByExecution.clear() 72 state.runs.clear() 73 state.recentRunIds.length = 0 74 state.promises.clear() 75 state.deferredDrainTimers?.clear() 76 state.activityLeaseRenewTimers?.clear() 77 } 78 } 79 80 function getRuntimeState(): RuntimeState { 81 return (globalThis as Record<string, unknown>)[globalKey] as RuntimeState 82 } 83 84 function makeManualQueuedEntry(input: { 85 sessionId: string 86 runId: string 87 message: string 88 source?: string 89 internal?: boolean 90 queuedAt?: number 91 }): { entry: ManualQueueEntry; promise: Promise<unknown> } { 92 let resolve!: (value: unknown) => void 93 let reject!: (error: Error) => void 94 const promise = new Promise<unknown>((res, rej) => { 95 resolve = res 96 reject = rej 97 }) 98 const entry: ManualQueueEntry = { 99 executionKey: `session:${input.sessionId}`, 100 run: { 101 id: input.runId, 102 sessionId: input.sessionId, 103 source: input.source || 'chat', 104 internal: input.internal === true, 105 mode: input.internal ? 'collect' : 'followup', 106 status: 'queued', 107 messagePreview: input.message, 108 queuedAt: input.queuedAt ?? Date.now(), 109 }, 110 message: input.message, 111 onEvents: [], 112 signalController: new AbortController(), 113 resolve, 114 reject, 115 promise, 116 } 117 return { entry, promise } 118 } 119 120 function insertManualQueuedEntry(entry: ManualQueueEntry, promise: Promise<unknown>) { 121 const state = getRuntimeState() 122 state.queueByExecution.set(entry.executionKey, [entry as unknown]) 123 state.runs.set(entry.run.id, entry.run) 124 state.recentRunIds.push(entry.run.id) 125 state.promises.set(entry.run.id, promise) 126 } 127 128 /** Wrapper around enqueueSessionRun that captures the run promise to 129 * prevent async-after-test warnings from node:test. */ 130 function enqueue(input: Parameters<typeof mgr.enqueueSessionRun>[0]) { 131 const result = mgr.enqueueSessionRun(input) 132 const suppressed = result.promise.catch(() => {}) 133 pendingPromises.push(suppressed) 134 return result 135 } 136 137 before(async () => { 138 tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'swarmclaw-session-run-mgr-')) 139 process.env.DATA_DIR = path.join(tempDir, 'data') 140 process.env.WORKSPACE_DIR = path.join(tempDir, 'workspace') 141 process.env.SWARMCLAW_BUILD_MODE = '1' 142 143 storage = await import('@/lib/server/storage') 144 mgr = await import('@/lib/server/runtime/session-run-manager') 145 }) 146 147 function seedSession(id: string) { 148 const sessions = storage.loadSessions() 149 sessions[id] = { 150 id, 151 name: 'Test Session', 152 cwd: '/tmp', 153 user: 'tester', 154 provider: 'anthropic', 155 model: 'claude-sonnet-4-20250514', 156 claudeSessionId: null, 157 agentId: 'test-agent', 158 messages: [], 159 createdAt: Date.now(), 160 lastActiveAt: Date.now(), 161 } 162 storage.saveSessions(sessions) 163 const agents = storage.loadAgents() 164 if (!agents['test-agent']) { 165 agents['test-agent'] = { 166 id: 'test-agent', 167 name: 'Test Agent', 168 description: 'Test agent for session run manager tests', 169 provider: 'anthropic', 170 model: 'claude-sonnet-4-20250514', 171 systemPrompt: 'You are a test agent.', 172 createdAt: Date.now(), 173 updatedAt: Date.now(), 174 } 175 storage.saveAgents(agents) 176 } 177 } 178 179 after(() => { 180 if (originalEnv.DATA_DIR === undefined) delete process.env.DATA_DIR 181 else process.env.DATA_DIR = originalEnv.DATA_DIR 182 if (originalEnv.WORKSPACE_DIR === undefined) delete process.env.WORKSPACE_DIR 183 else process.env.WORKSPACE_DIR = originalEnv.WORKSPACE_DIR 184 if (originalEnv.SWARMCLAW_BUILD_MODE === undefined) delete process.env.SWARMCLAW_BUILD_MODE 185 else process.env.SWARMCLAW_BUILD_MODE = originalEnv.SWARMCLAW_BUILD_MODE 186 fs.rmSync(tempDir, { recursive: true, force: true }) 187 process.removeListener('unhandledRejection', suppressionHandler) 188 }) 189 190 afterEach(async () => { 191 // Wait for all background drain activity to settle before resetting state 192 await Promise.allSettled(pendingPromises) 193 pendingPromises.length = 0 194 resetState() 195 }) 196 197 describe('session-run-manager', () => { 198 it('backfills missing timer maps when hot-reloading over an older singleton shape', () => { 199 const output = runWithTempDataDir<{ ok: boolean }>(` 200 globalThis.__swarmclaw_session_run_manager__ = { 201 runningByExecution: new Map(), 202 queueByExecution: new Map(), 203 runs: new Map(), 204 recentRunIds: [], 205 promises: new Map(), 206 } 207 208 const mgrMod = await import('./src/lib/server/runtime/session-run-manager.ts') 209 const mgr = mgrMod.default || mgrMod 210 const result = mgr.enqueueSessionRun({ 211 sessionId: 'sess-hmr-backfill', 212 message: 'hello', 213 }) 214 await result.promise.catch(() => {}) 215 216 console.log(JSON.stringify({ ok: typeof result.runId === 'string' && result.runId.length > 0 })) 217 `, { prefix: 'swarmclaw-session-run-hmr-' }) 218 219 assert.equal(output.ok, true) 220 }) 221 222 describe('enqueueSessionRun', () => { 223 it('returns a run ID and queued position', () => { 224 const result = enqueue({ 225 sessionId: 'sess-1', 226 message: 'Hello world', 227 }) 228 229 assert.ok(result.runId, 'should have a run ID') 230 assert.equal(typeof result.runId, 'string') 231 assert.equal(typeof result.position, 'number') 232 assert.ok(result.promise instanceof Promise, 'should return a promise') 233 assert.equal(typeof result.abort, 'function') 234 assert.equal(typeof result.unsubscribe, 'function') 235 }) 236 237 it('registers the run record accessible via getRunById', () => { 238 const result = enqueue({ 239 sessionId: 'sess-2', 240 message: 'Test message', 241 source: 'chat', 242 }) 243 244 const run = mgr.getRunById(result.runId) 245 assert.ok(run, 'run should exist') 246 assert.equal(run.sessionId, 'sess-2') 247 assert.equal(run.source, 'chat') 248 assert.equal(run.messagePreview, 'Test message') 249 assert.ok(run.queuedAt > 0) 250 }) 251 252 it('persists run records and replay events in storage', () => { 253 const result = enqueue({ 254 sessionId: 'sess-persisted', 255 message: 'Persist me', 256 source: 'chat', 257 }) 258 259 const persisted = storage.loadRuntimeRuns()[result.runId] 260 assert.ok(persisted) 261 assert.equal(persisted.messagePreview, 'Persist me') 262 assert.equal(mgr.listRunEvents(result.runId).length > 0, true) 263 }) 264 265 it('truncates message preview to 140 chars', () => { 266 const longMessage = 'A'.repeat(200) 267 const result = enqueue({ 268 sessionId: 'sess-trunc', 269 message: longMessage, 270 }) 271 272 const run = mgr.getRunById(result.runId) 273 assert.ok(run) 274 assert.equal(run.messagePreview.length, 140) 275 }) 276 277 it('defaults internal to false and source to chat', () => { 278 const result = enqueue({ 279 sessionId: 'sess-defaults', 280 message: 'test', 281 }) 282 283 const run = mgr.getRunById(result.runId) 284 assert.ok(run) 285 assert.equal(run.internal, false) 286 assert.equal(run.source, 'chat') 287 }) 288 289 it('normalizes mode to followup for non-internal runs', () => { 290 const result = enqueue({ 291 sessionId: 'sess-mode', 292 message: 'test', 293 internal: false, 294 }) 295 296 const run = mgr.getRunById(result.runId) 297 assert.ok(run) 298 assert.equal(run.mode, 'followup') 299 }) 300 301 it('normalizes mode to collect for internal runs without explicit mode', () => { 302 const result = enqueue({ 303 sessionId: 'sess-mode-int', 304 message: 'test', 305 internal: true, 306 }) 307 308 const run = mgr.getRunById(result.runId) 309 assert.ok(run) 310 assert.equal(run.mode, 'collect') 311 }) 312 313 it('preserves explicit mode when provided', () => { 314 const result = enqueue({ 315 sessionId: 'sess-explicit-mode', 316 message: 'test', 317 mode: 'steer', 318 }) 319 320 const run = mgr.getRunById(result.runId) 321 assert.ok(run) 322 assert.equal(run.mode, 'steer') 323 }) 324 }) 325 326 describe('deduplication', () => { 327 it('deduplicates queued runs with the same dedupeKey', () => { 328 const first = enqueue({ 329 sessionId: 'sess-dedup', 330 message: 'first run', 331 }) 332 333 const run1 = enqueue({ 334 sessionId: 'sess-dedup', 335 message: 'deduped message', 336 dedupeKey: 'key-1', 337 }) 338 339 const run2 = enqueue({ 340 sessionId: 'sess-dedup', 341 message: 'duplicate message', 342 dedupeKey: 'key-1', 343 }) 344 345 assert.equal(run2.deduped, true, 'second run should be deduped') 346 assert.equal(run2.runId, run1.runId, 'deduped run should share the same run ID') 347 assert.ok(first.runId !== run1.runId, 'first run should be different from deduped runs') 348 }) 349 350 it('does not deduplicate runs without dedupeKey', () => { 351 enqueue({ sessionId: 'sess-no-dedup', message: 'occupier' }) 352 353 const run1 = enqueue({ sessionId: 'sess-no-dedup', message: 'msg1' }) 354 const run2 = enqueue({ sessionId: 'sess-no-dedup', message: 'msg2' }) 355 356 assert.ok(run1.runId !== run2.runId, 'runs without dedupeKey should have different IDs') 357 assert.equal(run2.deduped, undefined) 358 }) 359 360 it('does not deduplicate runs with different dedupeKeys', () => { 361 enqueue({ sessionId: 'sess-diff-keys', message: 'occupier' }) 362 363 const run1 = enqueue({ 364 sessionId: 'sess-diff-keys', 365 message: 'msg1', 366 dedupeKey: 'alpha', 367 }) 368 369 const run2 = enqueue({ 370 sessionId: 'sess-diff-keys', 371 message: 'msg2', 372 dedupeKey: 'beta', 373 }) 374 375 assert.ok(run1.runId !== run2.runId) 376 assert.equal(run2.deduped, undefined) 377 }) 378 }) 379 380 describe('collect mode coalescing', () => { 381 it('coalesces internal collect-mode messages within the time window', () => { 382 enqueue({ sessionId: 'sess-coalesce', message: 'occupier' }) 383 384 const run1 = enqueue({ 385 sessionId: 'sess-coalesce', 386 message: 'first collect', 387 internal: true, 388 source: 'heartbeat', 389 mode: 'collect', 390 }) 391 392 const run2 = enqueue({ 393 sessionId: 'sess-coalesce', 394 message: 'second collect', 395 internal: true, 396 source: 'heartbeat', 397 mode: 'collect', 398 }) 399 400 assert.equal(run2.coalesced, true, 'second collect should be coalesced') 401 assert.equal(run2.runId, run1.runId, 'coalesced run should share the same run ID') 402 }) 403 404 it('does not coalesce messages with different sources', () => { 405 enqueue({ sessionId: 'sess-no-coalesce-src', message: 'occupier' }) 406 407 const run1 = enqueue({ 408 sessionId: 'sess-no-coalesce-src', 409 message: 'first', 410 internal: true, 411 source: 'heartbeat', 412 mode: 'collect', 413 }) 414 415 const run2 = enqueue({ 416 sessionId: 'sess-no-coalesce-src', 417 message: 'second', 418 internal: true, 419 source: 'other-source', 420 mode: 'collect', 421 }) 422 423 assert.ok(run1.runId !== run2.runId, 'different sources should not coalesce') 424 }) 425 426 it('does not coalesce when there are image attachments', () => { 427 enqueue({ sessionId: 'sess-no-coalesce-img', message: 'occupier' }) 428 429 const run1 = enqueue({ 430 sessionId: 'sess-no-coalesce-img', 431 message: 'first', 432 internal: true, 433 source: 'heartbeat', 434 mode: 'collect', 435 }) 436 437 const run2 = enqueue({ 438 sessionId: 'sess-no-coalesce-img', 439 message: 'second with image', 440 internal: true, 441 source: 'heartbeat', 442 mode: 'collect', 443 imagePath: '/path/to/image.png', 444 }) 445 446 assert.ok(run1.runId !== run2.runId, 'image attachments should prevent coalescing') 447 }) 448 }) 449 450 describe('getSessionRunState / getSessionExecutionState', () => { 451 it('returns empty state for unknown session', () => { 452 const state = mgr.getSessionRunState('unknown-session') 453 assert.equal(state.runningRunId, undefined) 454 assert.equal(state.queueLength, 0) 455 }) 456 457 it('returns execution state with queue info', () => { 458 enqueue({ sessionId: 'sess-state', message: 'running' }) 459 enqueue({ sessionId: 'sess-state', message: 'queued 1' }) 460 461 const state = mgr.getSessionExecutionState('sess-state') 462 assert.equal(state.hasQueued, true) 463 assert.ok(state.queueLength >= 1) 464 }) 465 466 it('exposes queued follow-up metadata in the visible queue snapshot', () => { 467 enqueue({ sessionId: 'sess-queue-meta', message: 'running' }) 468 const queued = enqueue({ 469 sessionId: 'sess-queue-meta', 470 message: 'queued with metadata', 471 imagePath: '/tmp/cover.png', 472 imageUrl: '/api/uploads/cover.png', 473 attachedFiles: ['/tmp/spec.md', '/tmp/notes.txt'], 474 replyToId: 'msg-11', 475 source: 'chat', 476 }) 477 478 const snapshot = mgr.getSessionQueueSnapshot('sess-queue-meta') 479 assert.equal(snapshot.activeRunId != null, true) 480 assert.equal(snapshot.queueLength, 1) 481 assert.deepEqual(snapshot.items[0], { 482 runId: queued.runId, 483 sessionId: 'sess-queue-meta', 484 missionId: null, 485 text: 'queued with metadata', 486 queuedAt: snapshot.items[0]?.queuedAt, 487 position: 1, 488 imagePath: '/tmp/cover.png', 489 imageUrl: '/api/uploads/cover.png', 490 attachedFiles: ['/tmp/spec.md', '/tmp/notes.txt'], 491 replyToId: 'msg-11', 492 source: 'chat', 493 }) 494 }) 495 496 it('exposes the active user-visible turn separately from queued follow-ups', () => { 497 const running = enqueue({ 498 sessionId: 'sess-active-turn', 499 message: 'running now', 500 source: 'chat', 501 }) 502 const queued = enqueue({ 503 sessionId: 'sess-active-turn', 504 message: 'queued next', 505 source: 'chat', 506 }) 507 508 const snapshot = mgr.getSessionQueueSnapshot('sess-active-turn') 509 assert.equal(snapshot.activeRunId, running.runId) 510 assert.deepEqual(snapshot.activeTurn, { 511 runId: running.runId, 512 sessionId: 'sess-active-turn', 513 missionId: null, 514 text: 'running now', 515 queuedAt: snapshot.activeTurn?.queuedAt, 516 position: 0, 517 imagePath: undefined, 518 imageUrl: undefined, 519 attachedFiles: undefined, 520 replyToId: undefined, 521 source: 'chat', 522 }) 523 assert.deepEqual(snapshot.items.map((item) => item.runId), [queued.runId]) 524 assert.equal(snapshot.queueLength, 1) 525 }) 526 527 it('hides internal queued runs from the user-visible queue snapshot', () => { 528 enqueue({ sessionId: 'sess-visible-queue', message: 'running' }) 529 enqueue({ 530 sessionId: 'sess-visible-queue', 531 message: 'heartbeat hidden', 532 internal: true, 533 source: 'heartbeat', 534 }) 535 const visible = enqueue({ 536 sessionId: 'sess-visible-queue', 537 message: 'user follow-up', 538 internal: false, 539 source: 'chat', 540 }) 541 542 const snapshot = mgr.getSessionQueueSnapshot('sess-visible-queue') 543 assert.equal(snapshot.queueLength, 1) 544 assert.equal(snapshot.activeTurn?.runId, snapshot.activeRunId) 545 assert.deepEqual(snapshot.items.map((item) => item.runId), [visible.runId]) 546 }) 547 548 it('hides internal active runs from the active-turn snapshot field', () => { 549 const { entry, promise } = makeManualQueuedEntry({ 550 sessionId: 'sess-hidden-active', 551 runId: 'run-hidden', 552 message: 'heartbeat hidden', 553 internal: true, 554 source: 'heartbeat', 555 }) 556 entry.run.status = 'running' 557 entry.run.startedAt = Date.now() 558 const state = getRuntimeState() 559 state.runningByExecution.set(entry.executionKey, entry as unknown) 560 state.runs.set(entry.run.id, entry.run) 561 state.promises.set(entry.run.id, promise) 562 pendingPromises.push(promise.catch(() => {})) 563 564 try { 565 const snapshot = mgr.getSessionQueueSnapshot('sess-hidden-active') 566 assert.equal(snapshot.activeRunId, 'run-hidden') 567 assert.equal(snapshot.activeTurn, null) 568 assert.equal(snapshot.queueLength, 0) 569 } finally { 570 entry.resolve(undefined) 571 } 572 }) 573 574 it('reports heartbeat vs non-heartbeat queued runs', () => { 575 enqueue({ sessionId: 'sess-hb-state', message: 'occupier' }) 576 enqueue({ 577 sessionId: 'sess-hb-state', 578 message: 'hb', 579 internal: true, 580 source: 'heartbeat', 581 }) 582 enqueue({ 583 sessionId: 'sess-hb-state', 584 message: 'user', 585 internal: false, 586 source: 'chat', 587 }) 588 589 const state = mgr.getSessionExecutionState('sess-hb-state') 590 assert.equal(state.hasQueuedHeartbeat, true) 591 assert.equal(state.hasQueuedNonHeartbeat, true) 592 }) 593 594 it('publishes a shared non-heartbeat activity lease for user work', () => { 595 enqueue({ sessionId: 'sess-lease', message: 'user work', source: 'chat' }) 596 597 assert.equal(mgr.hasActiveNonHeartbeatSessionLease('sess-lease'), true) 598 }) 599 600 it('does not publish the shared activity lease for heartbeat-only work', () => { 601 enqueue({ 602 sessionId: 'sess-heartbeat-only', 603 message: 'hb', 604 internal: true, 605 source: 'heartbeat-wake', 606 }) 607 608 assert.equal(mgr.hasActiveNonHeartbeatSessionLease('sess-heartbeat-only'), false) 609 }) 610 }) 611 612 describe('listRuns', () => { 613 it('lists all runs in reverse chronological order', () => { 614 enqueue({ sessionId: 'sess-list-a', message: 'msg a' }) 615 enqueue({ sessionId: 'sess-list-b', message: 'msg b' }) 616 617 const runs = mgr.listRuns() 618 assert.ok(runs.length >= 2) 619 const idxA = runs.findIndex(r => r.sessionId === 'sess-list-a') 620 const idxB = runs.findIndex(r => r.sessionId === 'sess-list-b') 621 assert.ok(idxB < idxA, 'more recent run should be first') 622 }) 623 624 it('filters by sessionId', () => { 625 enqueue({ sessionId: 'sess-filter-a', message: 'a' }) 626 enqueue({ sessionId: 'sess-filter-b', message: 'b' }) 627 628 const runs = mgr.listRuns({ sessionId: 'sess-filter-a' }) 629 assert.ok(runs.length >= 1) 630 for (const run of runs) { 631 assert.equal(run.sessionId, 'sess-filter-a') 632 } 633 }) 634 635 it('filters by status', () => { 636 enqueue({ sessionId: 'sess-status-a', message: 'a' }) 637 enqueue({ sessionId: 'sess-status-b', message: 'b' }) 638 639 // At least one should be queued synchronously 640 const queued = mgr.listRuns({ status: 'queued' }) 641 // We just verify the filter doesn't crash and returns consistent data 642 for (const run of queued) { 643 assert.equal(run.status, 'queued') 644 } 645 }) 646 647 it('respects limit parameter', () => { 648 for (let i = 0; i < 5; i++) { 649 enqueue({ sessionId: `sess-limit-${i}`, message: `msg ${i}` }) 650 } 651 652 const runs = mgr.listRuns({ limit: 2 }) 653 assert.equal(runs.length, 2) 654 }) 655 }) 656 657 describe('cancelSessionRuns', () => { 658 it('cancels queued runs for a session', () => { 659 enqueue({ sessionId: 'sess-cancel', message: 'running' }) 660 enqueue({ sessionId: 'sess-cancel', message: 'queued 1' }) 661 enqueue({ sessionId: 'sess-cancel', message: 'queued 2' }) 662 663 const result = mgr.cancelSessionRuns('sess-cancel', 'User cancelled') 664 assert.ok(result.cancelledQueued >= 2, `should cancel at least 2 queued runs, got ${result.cancelledQueued}`) 665 }) 666 667 it('returns zero when no runs exist for session', () => { 668 const result = mgr.cancelSessionRuns('nonexistent-session') 669 assert.equal(result.cancelledQueued, 0) 670 assert.equal(result.cancelledRunning, false) 671 }) 672 673 it('does not cancel runs for other sessions', () => { 674 enqueue({ sessionId: 'sess-keep', message: 'keep me' }) 675 enqueue({ sessionId: 'sess-cancel-other', message: 'cancel me' }) 676 677 mgr.cancelSessionRuns('sess-cancel-other', 'cancelled') 678 679 const keepRuns = mgr.listRuns({ sessionId: 'sess-keep' }) 680 assert.ok(keepRuns.length >= 1, 'runs for other session should be preserved') 681 const keptRun = keepRuns.find(r => r.status !== 'cancelled') 682 assert.ok(keptRun, 'kept session run should not be cancelled') 683 }) 684 }) 685 686 describe('steer mode', () => { 687 it('cancels pending queued runs when steer mode is used', () => { 688 enqueue({ sessionId: 'sess-steer', message: 'running' }) 689 enqueue({ sessionId: 'sess-steer', message: 'queued 1' }) 690 enqueue({ sessionId: 'sess-steer', message: 'queued 2' }) 691 692 const steer = enqueue({ 693 sessionId: 'sess-steer', 694 message: 'steer message', 695 mode: 'steer', 696 }) 697 698 assert.ok(steer.runId) 699 const steerRun = mgr.getRunById(steer.runId) 700 assert.ok(steerRun) 701 assert.notEqual(steerRun.status, 'cancelled') 702 }) 703 704 it('steer marks previously queued runs as cancelled', () => { 705 enqueue({ sessionId: 'sess-steer-verify', message: 'occupier' }) 706 const q1 = enqueue({ sessionId: 'sess-steer-verify', message: 'will be cancelled' }) 707 const q2 = enqueue({ sessionId: 'sess-steer-verify', message: 'also cancelled' }) 708 709 enqueue({ 710 sessionId: 'sess-steer-verify', 711 message: 'steer', 712 mode: 'steer', 713 }) 714 715 const run1 = mgr.getRunById(q1.runId) 716 const run2 = mgr.getRunById(q2.runId) 717 assert.ok(run1) 718 assert.ok(run2) 719 assert.equal(run1.status, 'cancelled') 720 assert.equal(run2.status, 'cancelled') 721 }) 722 }) 723 724 describe('abort and unsubscribe', () => { 725 it('abort function is callable without error', () => { 726 const result = enqueue({ 727 sessionId: 'sess-abort', 728 message: 'abort me', 729 }) 730 assert.doesNotThrow(() => result.abort()) 731 }) 732 733 it('unsubscribe removes the event listener', () => { 734 const events: unknown[] = [] 735 const result = enqueue({ 736 sessionId: 'sess-unsub', 737 message: 'test', 738 onEvent: (event) => events.push(event), 739 }) 740 assert.doesNotThrow(() => result.unsubscribe()) 741 }) 742 }) 743 744 describe('callerSignal chaining', () => { 745 it('propagates an already-aborted callerSignal', () => { 746 const controller = new AbortController() 747 controller.abort() 748 749 const result = enqueue({ 750 sessionId: 'sess-pre-aborted', 751 message: 'test', 752 callerSignal: controller.signal, 753 }) 754 755 assert.doesNotThrow(() => result.abort()) 756 }) 757 758 it('chains a live callerSignal to the run', () => { 759 const controller = new AbortController() 760 761 const result = enqueue({ 762 sessionId: 'sess-live-signal', 763 message: 'test', 764 callerSignal: controller.signal, 765 }) 766 767 // Aborting the caller should work without throwing 768 assert.doesNotThrow(() => controller.abort()) 769 assert.ok(result.runId) 770 }) 771 }) 772 773 describe('run completion and drain', () => { 774 it('run eventually transitions from queued to a terminal state', async () => { 775 seedSession('sess-terminal') 776 const result = enqueue({ 777 sessionId: 'sess-terminal', 778 message: 'will fail in drain', 779 }) 780 781 // Wait for drain to process 782 await result.promise.catch(() => {}) 783 784 const run = mgr.getRunById(result.runId) 785 assert.ok(run, 'run should still exist') 786 const terminal = ['completed', 'failed', 'cancelled'] 787 assert.ok( 788 terminal.includes(run.status), 789 `run status should be terminal, got: ${run.status}`, 790 ) 791 }) 792 793 it('drain processes next queued run after current completes', async () => { 794 seedSession('sess-drain-chain') 795 const result1 = enqueue({ 796 sessionId: 'sess-drain-chain', 797 message: 'first', 798 }) 799 const result2 = enqueue({ 800 sessionId: 'sess-drain-chain', 801 message: 'second', 802 }) 803 804 // Wait for both drains to process 805 await Promise.allSettled([result1.promise, result2.promise]) 806 807 const run1 = mgr.getRunById(result1.runId) 808 const run2 = mgr.getRunById(result2.runId) 809 assert.ok(run1) 810 assert.ok(run2) 811 assert.notEqual(run1.status, 'queued', 'first run should not still be queued') 812 assert.notEqual(run2.status, 'queued', 'second run should not still be queued') 813 }) 814 815 it('failed run records error message', async () => { 816 seedSession('sess-fail-error') 817 const result = enqueue({ 818 sessionId: 'sess-fail-error', 819 message: 'will error', 820 }) 821 822 await result.promise.catch(() => {}) 823 824 const run = mgr.getRunById(result.runId) 825 assert.ok(run) 826 if (run.status === 'failed') { 827 assert.ok(run.error, 'failed run should have an error message') 828 assert.ok(run.endedAt, 'failed run should have endedAt timestamp') 829 } 830 }) 831 832 it('clears the shared activity lease after non-heartbeat work finishes', async () => { 833 seedSession('sess-lease-clear') 834 const result = enqueue({ 835 sessionId: 'sess-lease-clear', 836 message: 'will fail in drain', 837 }) 838 839 assert.equal(mgr.hasActiveNonHeartbeatSessionLease('sess-lease-clear'), true) 840 await result.promise.catch(() => {}) 841 assert.equal(mgr.hasActiveNonHeartbeatSessionLease('sess-lease-clear'), false) 842 }) 843 844 it('defers heartbeat runs while another worker advertises non-heartbeat activity', async () => { 845 seedSession('sess-remote-busy') 846 storage.tryAcquireRuntimeLock('session-non-heartbeat:sess-remote-busy', 'remote-worker', 60_000) 847 848 const heartbeat = enqueue({ 849 sessionId: 'sess-remote-busy', 850 message: 'hb', 851 internal: true, 852 source: 'heartbeat-wake', 853 }) 854 855 await new Promise((resolve) => setTimeout(resolve, 50)) 856 const queued = mgr.getRunById(heartbeat.runId) 857 assert.ok(queued) 858 assert.equal(queued.status, 'queued') 859 860 storage.releaseRuntimeLock('session-non-heartbeat:sess-remote-busy', 'remote-worker') 861 await heartbeat.promise.catch(() => {}) 862 863 const finished = mgr.getRunById(heartbeat.runId) 864 assert.ok(finished) 865 assert.notEqual(finished.status, 'queued') 866 }) 867 868 it('re-kicks a recent queued entry when the execution lane is idle', async () => { 869 seedSession('sess-rekick') 870 const runId = 'manual-rekick' 871 const { entry, promise } = makeManualQueuedEntry({ 872 sessionId: 'sess-rekick', 873 runId, 874 message: 'recover me', 875 }) 876 insertManualQueuedEntry(entry, promise) 877 878 const repair = mgr.repairSessionRunQueue('sess-rekick') 879 assert.equal(repair.recoveredQueuedRuns, 0) 880 assert.equal(repair.kickedExecutionKeys, 1) 881 882 await promise.catch(() => {}) 883 884 const run = mgr.getRunById(runId) 885 assert.ok(run) 886 assert.notEqual(run.status, 'queued') 887 }) 888 889 it('recovers stale queued runs before a fresh enqueue can get wedged behind them', async () => { 890 seedSession('sess-stale-recover') 891 const staleRunId = 'manual-stale' 892 const { entry, promise } = makeManualQueuedEntry({ 893 sessionId: 'sess-stale-recover', 894 runId: staleRunId, 895 message: 'ghost queued run', 896 queuedAt: Date.now() - 60_000, 897 }) 898 insertManualQueuedEntry(entry, promise) 899 900 const fresh = enqueue({ 901 sessionId: 'sess-stale-recover', 902 message: 'fresh message', 903 }) 904 905 const staleResult = await promise 906 assert.deepEqual(staleResult, { 907 runId: staleRunId, 908 sessionId: 'sess-stale-recover', 909 text: '', 910 persisted: false, 911 toolEvents: [], 912 error: 'Recovered stale queued run before enqueue', 913 }) 914 915 const staleRun = mgr.getRunById(staleRunId) 916 assert.ok(staleRun) 917 assert.equal(staleRun.status, 'failed') 918 919 const execution = mgr.getSessionExecutionState('sess-stale-recover') 920 assert.ok(execution.queueLength <= 1, `expected stale run to be cleared, got queueLength=${execution.queueLength}`) 921 922 await fresh.promise.catch(() => {}) 923 const freshRun = mgr.getRunById(fresh.runId) 924 assert.ok(freshRun) 925 assert.notEqual(freshRun.status, 'queued') 926 }) 927 }) 928 929 describe('cancelAllHeartbeatRuns', () => { 930 it('cancels queued heartbeat runs but keeps non-heartbeat runs', () => { 931 enqueue({ sessionId: 'sess-hb-cancel', message: 'occupier' }) 932 933 enqueue({ 934 sessionId: 'sess-hb-cancel', 935 message: 'heartbeat msg', 936 internal: true, 937 source: 'heartbeat', 938 }) 939 940 enqueue({ 941 sessionId: 'sess-hb-cancel', 942 message: 'user msg', 943 internal: false, 944 source: 'chat', 945 }) 946 947 const result = mgr.cancelAllHeartbeatRuns('Test cancellation') 948 assert.ok(result.cancelledQueued >= 1, 'should cancel at least 1 queued heartbeat') 949 }) 950 951 it('returns zeros when no heartbeat runs exist', () => { 952 const result = mgr.cancelAllHeartbeatRuns() 953 assert.equal(result.cancelledQueued, 0) 954 assert.equal(result.abortedRunning, 0) 955 }) 956 957 it('preserves non-heartbeat queued runs', () => { 958 enqueue({ sessionId: 'sess-hb-keep', message: 'occupier' }) 959 960 enqueue({ 961 sessionId: 'sess-hb-keep', 962 message: 'heartbeat', 963 internal: true, 964 source: 'heartbeat', 965 }) 966 967 const userRun = enqueue({ 968 sessionId: 'sess-hb-keep', 969 message: 'user chat', 970 internal: false, 971 source: 'chat', 972 }) 973 974 mgr.cancelAllHeartbeatRuns() 975 976 const userRunRecord = mgr.getRunById(userRun.runId) 977 assert.ok(userRunRecord) 978 assert.notEqual(userRunRecord.status, 'cancelled', 'non-heartbeat run should not be cancelled') 979 }) 980 }) 981 982 describe('restart recovery', () => { 983 it('marks stale persisted runs interrupted and only requeues background sources once', () => { 984 const output = runWithTempDataDir<{ 985 interruptedChat: boolean 986 recoveredRunCount: number 987 recoveredFromOriginal: boolean 988 }>(` 989 const storageMod = await import('./src/lib/server/storage.ts') 990 const storage = storageMod.default || storageMod 991 const now = Date.now() 992 const agents = storage.loadAgents() 993 agents.agent_test = { 994 id: 'agent_test', 995 name: 'Recovery Agent', 996 provider: 'anthropic', 997 model: 'claude-sonnet-4-20250514', 998 systemPrompt: 'Test agent', 999 } 1000 storage.saveAgents(agents) 1001 const sessions = storage.loadSessions() 1002 sessions.sess_bg = { 1003 id: 'sess_bg', 1004 agentId: 'agent_test', 1005 messages: [], 1006 createdAt: now, 1007 lastActiveAt: now, 1008 } 1009 sessions.sess_chat = { 1010 id: 'sess_chat', 1011 agentId: 'agent_test', 1012 messages: [], 1013 createdAt: now, 1014 lastActiveAt: now, 1015 } 1016 storage.saveSessions(sessions) 1017 storage.upsertRuntimeRun('run_bg', { 1018 id: 'run_bg', 1019 sessionId: 'sess_bg', 1020 source: 'heartbeat', 1021 internal: true, 1022 mode: 'collect', 1023 status: 'queued', 1024 messagePreview: 'heartbeat', 1025 queuedAt: now - 1000, 1026 recoveryPayload: { 1027 message: 'resume heartbeat', 1028 internal: true, 1029 source: 'heartbeat', 1030 mode: 'collect', 1031 }, 1032 }) 1033 storage.upsertRuntimeRun('run_chat', { 1034 id: 'run_chat', 1035 sessionId: 'sess_chat', 1036 source: 'chat', 1037 internal: false, 1038 mode: 'followup', 1039 status: 'running', 1040 messagePreview: 'chat', 1041 queuedAt: now - 1000, 1042 recoveryPayload: { 1043 message: 'user chat', 1044 internal: false, 1045 source: 'chat', 1046 mode: 'followup', 1047 }, 1048 }) 1049 const mgrMod = await import('./src/lib/server/runtime/session-run-manager.ts') 1050 const mgr = mgrMod.default || mgrMod 1051 mgr.listRuns() 1052 await new Promise((resolve) => setTimeout(resolve, 25)) 1053 const runs = Object.values(storage.loadRuntimeRuns()) 1054 const originalChat = runs.find((run) => run.id === 'run_chat') 1055 const recovered = runs.filter((run) => run.recoveredFromRestart === true) 1056 console.log(JSON.stringify({ 1057 interruptedChat: !!originalChat?.interruptedAt && originalChat?.status === 'cancelled', 1058 recoveredRunCount: recovered.length, 1059 recoveredFromOriginal: recovered[0]?.recoveredFromRunId === 'run_bg', 1060 })) 1061 `, { prefix: 'swarmclaw-session-run-recovery-' }) 1062 1063 assert.equal(output.interruptedChat, true) 1064 assert.equal(output.recoveredRunCount, 1) 1065 assert.equal(output.recoveredFromOriginal, true) 1066 }) 1067 }) 1068 1069 describe('getRunById', () => { 1070 it('returns null for non-existent run', () => { 1071 assert.equal(mgr.getRunById('nonexistent'), null) 1072 }) 1073 }) 1074 })