/ src / lib / server / runtime / session-run-manager.test.ts
session-run-manager.test.ts
   1  import assert from 'node:assert/strict'
   2  import fs from 'node:fs'
   3  import os from 'node:os'
   4  import path from 'node:path'
   5  import { after, afterEach, before, describe, it } from 'node:test'
   6  import { runWithTempDataDir } from '@/lib/server/test-utils/run-with-temp-data-dir'
   7  
   8  // Suppress unhandled rejections from background drainExecution() calls
   9  // that fail because executeSessionChatTurn has no real LLM provider.
  10  const _suppressedErrors: unknown[] = []
  11  function suppressionHandler(err: unknown) { _suppressedErrors.push(err) }
  12  process.on('unhandledRejection', suppressionHandler)
  13  
  14  const originalEnv = {
  15    DATA_DIR: process.env.DATA_DIR,
  16    WORKSPACE_DIR: process.env.WORKSPACE_DIR,
  17    SWARMCLAW_BUILD_MODE: process.env.SWARMCLAW_BUILD_MODE,
  18  }
  19  
  20  let tempDir = ''
  21  let mgr: typeof import('@/lib/server/runtime/session-run-manager')
  22  let storage: typeof import('@/lib/server/storage')
  23  
  24  const globalKey = '__swarmclaw_session_run_manager__' as const
  25  type RuntimeState = {
  26    runningByExecution: Map<string, unknown>
  27    queueByExecution: Map<string, unknown[]>
  28    runs: Map<string, unknown>
  29    recentRunIds: string[]
  30    promises: Map<string, unknown>
  31    deferredDrainTimers?: Map<string, ReturnType<typeof setTimeout>>
  32    activityLeaseRenewTimers?: Map<string, ReturnType<typeof setInterval>>
  33  }
  34  
  35  type ManualQueueEntry = {
  36    executionKey: string
  37    run: {
  38      id: string
  39      sessionId: string
  40      source: string
  41      internal: boolean
  42      mode: 'followup' | 'steer' | 'collect'
  43      status: 'queued' | 'running' | 'completed' | 'failed' | 'cancelled'
  44      messagePreview: string
  45      queuedAt: number
  46      startedAt?: number
  47      endedAt?: number
  48      error?: string
  49    }
  50    message: string
  51    onEvents: Array<(event: unknown) => void>
  52    signalController: AbortController
  53    resolve: (value: unknown) => void
  54    reject: (error: Error) => void
  55    promise: Promise<unknown>
  56  }
  57  
  58  /** Pending promises from fire-and-forget drain calls. We suppress their
  59   *  rejections and await them in afterEach so node:test doesn't see
  60   *  "asynchronous activity after the test ended" warnings. */
  61  const pendingPromises: Promise<unknown>[] = []
  62  
  63  function resetState() {
  64    if (mgr && 'resetSessionRunManagerForTests' in mgr && typeof mgr.resetSessionRunManagerForTests === 'function') {
  65      mgr.resetSessionRunManagerForTests()
  66      return
  67    }
  68    const state = (globalThis as Record<string, unknown>)[globalKey] as RuntimeState | undefined
  69    if (state) {
  70      state.runningByExecution.clear()
  71      state.queueByExecution.clear()
  72      state.runs.clear()
  73      state.recentRunIds.length = 0
  74      state.promises.clear()
  75      state.deferredDrainTimers?.clear()
  76      state.activityLeaseRenewTimers?.clear()
  77    }
  78  }
  79  
  80  function getRuntimeState(): RuntimeState {
  81    return (globalThis as Record<string, unknown>)[globalKey] as RuntimeState
  82  }
  83  
  84  function makeManualQueuedEntry(input: {
  85    sessionId: string
  86    runId: string
  87    message: string
  88    source?: string
  89    internal?: boolean
  90    queuedAt?: number
  91  }): { entry: ManualQueueEntry; promise: Promise<unknown> } {
  92    let resolve!: (value: unknown) => void
  93    let reject!: (error: Error) => void
  94    const promise = new Promise<unknown>((res, rej) => {
  95      resolve = res
  96      reject = rej
  97    })
  98    const entry: ManualQueueEntry = {
  99      executionKey: `session:${input.sessionId}`,
 100      run: {
 101        id: input.runId,
 102        sessionId: input.sessionId,
 103        source: input.source || 'chat',
 104        internal: input.internal === true,
 105        mode: input.internal ? 'collect' : 'followup',
 106        status: 'queued',
 107        messagePreview: input.message,
 108        queuedAt: input.queuedAt ?? Date.now(),
 109      },
 110      message: input.message,
 111      onEvents: [],
 112      signalController: new AbortController(),
 113      resolve,
 114      reject,
 115      promise,
 116    }
 117    return { entry, promise }
 118  }
 119  
 120  function insertManualQueuedEntry(entry: ManualQueueEntry, promise: Promise<unknown>) {
 121    const state = getRuntimeState()
 122    state.queueByExecution.set(entry.executionKey, [entry as unknown])
 123    state.runs.set(entry.run.id, entry.run)
 124    state.recentRunIds.push(entry.run.id)
 125    state.promises.set(entry.run.id, promise)
 126  }
 127  
 128  /** Wrapper around enqueueSessionRun that captures the run promise to
 129   *  prevent async-after-test warnings from node:test. */
 130  function enqueue(input: Parameters<typeof mgr.enqueueSessionRun>[0]) {
 131    const result = mgr.enqueueSessionRun(input)
 132    const suppressed = result.promise.catch(() => {})
 133    pendingPromises.push(suppressed)
 134    return result
 135  }
 136  
 137  before(async () => {
 138    tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'swarmclaw-session-run-mgr-'))
 139    process.env.DATA_DIR = path.join(tempDir, 'data')
 140    process.env.WORKSPACE_DIR = path.join(tempDir, 'workspace')
 141    process.env.SWARMCLAW_BUILD_MODE = '1'
 142  
 143    storage = await import('@/lib/server/storage')
 144    mgr = await import('@/lib/server/runtime/session-run-manager')
 145  })
 146  
 147  function seedSession(id: string) {
 148    const sessions = storage.loadSessions()
 149    sessions[id] = {
 150      id,
 151      name: 'Test Session',
 152      cwd: '/tmp',
 153      user: 'tester',
 154      provider: 'anthropic',
 155      model: 'claude-sonnet-4-20250514',
 156      claudeSessionId: null,
 157      agentId: 'test-agent',
 158      messages: [],
 159      createdAt: Date.now(),
 160      lastActiveAt: Date.now(),
 161    }
 162    storage.saveSessions(sessions)
 163    const agents = storage.loadAgents()
 164    if (!agents['test-agent']) {
 165      agents['test-agent'] = {
 166        id: 'test-agent',
 167        name: 'Test Agent',
 168        description: 'Test agent for session run manager tests',
 169        provider: 'anthropic',
 170        model: 'claude-sonnet-4-20250514',
 171        systemPrompt: 'You are a test agent.',
 172        createdAt: Date.now(),
 173        updatedAt: Date.now(),
 174      }
 175      storage.saveAgents(agents)
 176    }
 177  }
 178  
 179  after(() => {
 180    if (originalEnv.DATA_DIR === undefined) delete process.env.DATA_DIR
 181    else process.env.DATA_DIR = originalEnv.DATA_DIR
 182    if (originalEnv.WORKSPACE_DIR === undefined) delete process.env.WORKSPACE_DIR
 183    else process.env.WORKSPACE_DIR = originalEnv.WORKSPACE_DIR
 184    if (originalEnv.SWARMCLAW_BUILD_MODE === undefined) delete process.env.SWARMCLAW_BUILD_MODE
 185    else process.env.SWARMCLAW_BUILD_MODE = originalEnv.SWARMCLAW_BUILD_MODE
 186    fs.rmSync(tempDir, { recursive: true, force: true })
 187    process.removeListener('unhandledRejection', suppressionHandler)
 188  })
 189  
 190  afterEach(async () => {
 191    // Wait for all background drain activity to settle before resetting state
 192    await Promise.allSettled(pendingPromises)
 193    pendingPromises.length = 0
 194    resetState()
 195  })
 196  
 197  describe('session-run-manager', () => {
 198    it('backfills missing timer maps when hot-reloading over an older singleton shape', () => {
 199      const output = runWithTempDataDir<{ ok: boolean }>(`
 200        globalThis.__swarmclaw_session_run_manager__ = {
 201          runningByExecution: new Map(),
 202          queueByExecution: new Map(),
 203          runs: new Map(),
 204          recentRunIds: [],
 205          promises: new Map(),
 206        }
 207  
 208        const mgrMod = await import('./src/lib/server/runtime/session-run-manager.ts')
 209        const mgr = mgrMod.default || mgrMod
 210        const result = mgr.enqueueSessionRun({
 211          sessionId: 'sess-hmr-backfill',
 212          message: 'hello',
 213        })
 214        await result.promise.catch(() => {})
 215  
 216        console.log(JSON.stringify({ ok: typeof result.runId === 'string' && result.runId.length > 0 }))
 217      `, { prefix: 'swarmclaw-session-run-hmr-' })
 218  
 219      assert.equal(output.ok, true)
 220    })
 221  
 222    describe('enqueueSessionRun', () => {
 223      it('returns a run ID and queued position', () => {
 224        const result = enqueue({
 225          sessionId: 'sess-1',
 226          message: 'Hello world',
 227        })
 228  
 229        assert.ok(result.runId, 'should have a run ID')
 230        assert.equal(typeof result.runId, 'string')
 231        assert.equal(typeof result.position, 'number')
 232        assert.ok(result.promise instanceof Promise, 'should return a promise')
 233        assert.equal(typeof result.abort, 'function')
 234        assert.equal(typeof result.unsubscribe, 'function')
 235      })
 236  
 237      it('registers the run record accessible via getRunById', () => {
 238        const result = enqueue({
 239          sessionId: 'sess-2',
 240          message: 'Test message',
 241          source: 'chat',
 242        })
 243  
 244        const run = mgr.getRunById(result.runId)
 245        assert.ok(run, 'run should exist')
 246        assert.equal(run.sessionId, 'sess-2')
 247        assert.equal(run.source, 'chat')
 248        assert.equal(run.messagePreview, 'Test message')
 249        assert.ok(run.queuedAt > 0)
 250      })
 251  
 252      it('persists run records and replay events in storage', () => {
 253        const result = enqueue({
 254          sessionId: 'sess-persisted',
 255          message: 'Persist me',
 256          source: 'chat',
 257        })
 258  
 259        const persisted = storage.loadRuntimeRuns()[result.runId]
 260        assert.ok(persisted)
 261        assert.equal(persisted.messagePreview, 'Persist me')
 262        assert.equal(mgr.listRunEvents(result.runId).length > 0, true)
 263      })
 264  
 265      it('truncates message preview to 140 chars', () => {
 266        const longMessage = 'A'.repeat(200)
 267        const result = enqueue({
 268          sessionId: 'sess-trunc',
 269          message: longMessage,
 270        })
 271  
 272        const run = mgr.getRunById(result.runId)
 273        assert.ok(run)
 274        assert.equal(run.messagePreview.length, 140)
 275      })
 276  
 277      it('defaults internal to false and source to chat', () => {
 278        const result = enqueue({
 279          sessionId: 'sess-defaults',
 280          message: 'test',
 281        })
 282  
 283        const run = mgr.getRunById(result.runId)
 284        assert.ok(run)
 285        assert.equal(run.internal, false)
 286        assert.equal(run.source, 'chat')
 287      })
 288  
 289      it('normalizes mode to followup for non-internal runs', () => {
 290        const result = enqueue({
 291          sessionId: 'sess-mode',
 292          message: 'test',
 293          internal: false,
 294        })
 295  
 296        const run = mgr.getRunById(result.runId)
 297        assert.ok(run)
 298        assert.equal(run.mode, 'followup')
 299      })
 300  
 301      it('normalizes mode to collect for internal runs without explicit mode', () => {
 302        const result = enqueue({
 303          sessionId: 'sess-mode-int',
 304          message: 'test',
 305          internal: true,
 306        })
 307  
 308        const run = mgr.getRunById(result.runId)
 309        assert.ok(run)
 310        assert.equal(run.mode, 'collect')
 311      })
 312  
 313      it('preserves explicit mode when provided', () => {
 314        const result = enqueue({
 315          sessionId: 'sess-explicit-mode',
 316          message: 'test',
 317          mode: 'steer',
 318        })
 319  
 320        const run = mgr.getRunById(result.runId)
 321        assert.ok(run)
 322        assert.equal(run.mode, 'steer')
 323      })
 324    })
 325  
 326    describe('deduplication', () => {
 327      it('deduplicates queued runs with the same dedupeKey', () => {
 328        const first = enqueue({
 329          sessionId: 'sess-dedup',
 330          message: 'first run',
 331        })
 332  
 333        const run1 = enqueue({
 334          sessionId: 'sess-dedup',
 335          message: 'deduped message',
 336          dedupeKey: 'key-1',
 337        })
 338  
 339        const run2 = enqueue({
 340          sessionId: 'sess-dedup',
 341          message: 'duplicate message',
 342          dedupeKey: 'key-1',
 343        })
 344  
 345        assert.equal(run2.deduped, true, 'second run should be deduped')
 346        assert.equal(run2.runId, run1.runId, 'deduped run should share the same run ID')
 347        assert.ok(first.runId !== run1.runId, 'first run should be different from deduped runs')
 348      })
 349  
 350      it('does not deduplicate runs without dedupeKey', () => {
 351        enqueue({ sessionId: 'sess-no-dedup', message: 'occupier' })
 352  
 353        const run1 = enqueue({ sessionId: 'sess-no-dedup', message: 'msg1' })
 354        const run2 = enqueue({ sessionId: 'sess-no-dedup', message: 'msg2' })
 355  
 356        assert.ok(run1.runId !== run2.runId, 'runs without dedupeKey should have different IDs')
 357        assert.equal(run2.deduped, undefined)
 358      })
 359  
 360      it('does not deduplicate runs with different dedupeKeys', () => {
 361        enqueue({ sessionId: 'sess-diff-keys', message: 'occupier' })
 362  
 363        const run1 = enqueue({
 364          sessionId: 'sess-diff-keys',
 365          message: 'msg1',
 366          dedupeKey: 'alpha',
 367        })
 368  
 369        const run2 = enqueue({
 370          sessionId: 'sess-diff-keys',
 371          message: 'msg2',
 372          dedupeKey: 'beta',
 373        })
 374  
 375        assert.ok(run1.runId !== run2.runId)
 376        assert.equal(run2.deduped, undefined)
 377      })
 378    })
 379  
 380    describe('collect mode coalescing', () => {
 381      it('coalesces internal collect-mode messages within the time window', () => {
 382        enqueue({ sessionId: 'sess-coalesce', message: 'occupier' })
 383  
 384        const run1 = enqueue({
 385          sessionId: 'sess-coalesce',
 386          message: 'first collect',
 387          internal: true,
 388          source: 'heartbeat',
 389          mode: 'collect',
 390        })
 391  
 392        const run2 = enqueue({
 393          sessionId: 'sess-coalesce',
 394          message: 'second collect',
 395          internal: true,
 396          source: 'heartbeat',
 397          mode: 'collect',
 398        })
 399  
 400        assert.equal(run2.coalesced, true, 'second collect should be coalesced')
 401        assert.equal(run2.runId, run1.runId, 'coalesced run should share the same run ID')
 402      })
 403  
 404      it('does not coalesce messages with different sources', () => {
 405        enqueue({ sessionId: 'sess-no-coalesce-src', message: 'occupier' })
 406  
 407        const run1 = enqueue({
 408          sessionId: 'sess-no-coalesce-src',
 409          message: 'first',
 410          internal: true,
 411          source: 'heartbeat',
 412          mode: 'collect',
 413        })
 414  
 415        const run2 = enqueue({
 416          sessionId: 'sess-no-coalesce-src',
 417          message: 'second',
 418          internal: true,
 419          source: 'other-source',
 420          mode: 'collect',
 421        })
 422  
 423        assert.ok(run1.runId !== run2.runId, 'different sources should not coalesce')
 424      })
 425  
 426      it('does not coalesce when there are image attachments', () => {
 427        enqueue({ sessionId: 'sess-no-coalesce-img', message: 'occupier' })
 428  
 429        const run1 = enqueue({
 430          sessionId: 'sess-no-coalesce-img',
 431          message: 'first',
 432          internal: true,
 433          source: 'heartbeat',
 434          mode: 'collect',
 435        })
 436  
 437        const run2 = enqueue({
 438          sessionId: 'sess-no-coalesce-img',
 439          message: 'second with image',
 440          internal: true,
 441          source: 'heartbeat',
 442          mode: 'collect',
 443          imagePath: '/path/to/image.png',
 444        })
 445  
 446        assert.ok(run1.runId !== run2.runId, 'image attachments should prevent coalescing')
 447      })
 448    })
 449  
 450    describe('getSessionRunState / getSessionExecutionState', () => {
 451      it('returns empty state for unknown session', () => {
 452        const state = mgr.getSessionRunState('unknown-session')
 453        assert.equal(state.runningRunId, undefined)
 454        assert.equal(state.queueLength, 0)
 455      })
 456  
 457      it('returns execution state with queue info', () => {
 458        enqueue({ sessionId: 'sess-state', message: 'running' })
 459        enqueue({ sessionId: 'sess-state', message: 'queued 1' })
 460  
 461        const state = mgr.getSessionExecutionState('sess-state')
 462        assert.equal(state.hasQueued, true)
 463        assert.ok(state.queueLength >= 1)
 464      })
 465  
 466      it('exposes queued follow-up metadata in the visible queue snapshot', () => {
 467        enqueue({ sessionId: 'sess-queue-meta', message: 'running' })
 468        const queued = enqueue({
 469          sessionId: 'sess-queue-meta',
 470          message: 'queued with metadata',
 471          imagePath: '/tmp/cover.png',
 472          imageUrl: '/api/uploads/cover.png',
 473          attachedFiles: ['/tmp/spec.md', '/tmp/notes.txt'],
 474          replyToId: 'msg-11',
 475          source: 'chat',
 476        })
 477  
 478        const snapshot = mgr.getSessionQueueSnapshot('sess-queue-meta')
 479        assert.equal(snapshot.activeRunId != null, true)
 480        assert.equal(snapshot.queueLength, 1)
 481        assert.deepEqual(snapshot.items[0], {
 482          runId: queued.runId,
 483          sessionId: 'sess-queue-meta',
 484          missionId: null,
 485          text: 'queued with metadata',
 486          queuedAt: snapshot.items[0]?.queuedAt,
 487          position: 1,
 488          imagePath: '/tmp/cover.png',
 489          imageUrl: '/api/uploads/cover.png',
 490          attachedFiles: ['/tmp/spec.md', '/tmp/notes.txt'],
 491          replyToId: 'msg-11',
 492          source: 'chat',
 493        })
 494      })
 495  
 496      it('exposes the active user-visible turn separately from queued follow-ups', () => {
 497        const running = enqueue({
 498          sessionId: 'sess-active-turn',
 499          message: 'running now',
 500          source: 'chat',
 501        })
 502        const queued = enqueue({
 503          sessionId: 'sess-active-turn',
 504          message: 'queued next',
 505          source: 'chat',
 506        })
 507  
 508        const snapshot = mgr.getSessionQueueSnapshot('sess-active-turn')
 509        assert.equal(snapshot.activeRunId, running.runId)
 510        assert.deepEqual(snapshot.activeTurn, {
 511          runId: running.runId,
 512          sessionId: 'sess-active-turn',
 513          missionId: null,
 514          text: 'running now',
 515          queuedAt: snapshot.activeTurn?.queuedAt,
 516          position: 0,
 517          imagePath: undefined,
 518          imageUrl: undefined,
 519          attachedFiles: undefined,
 520          replyToId: undefined,
 521          source: 'chat',
 522        })
 523        assert.deepEqual(snapshot.items.map((item) => item.runId), [queued.runId])
 524        assert.equal(snapshot.queueLength, 1)
 525      })
 526  
 527      it('hides internal queued runs from the user-visible queue snapshot', () => {
 528        enqueue({ sessionId: 'sess-visible-queue', message: 'running' })
 529        enqueue({
 530          sessionId: 'sess-visible-queue',
 531          message: 'heartbeat hidden',
 532          internal: true,
 533          source: 'heartbeat',
 534        })
 535        const visible = enqueue({
 536          sessionId: 'sess-visible-queue',
 537          message: 'user follow-up',
 538          internal: false,
 539          source: 'chat',
 540        })
 541  
 542        const snapshot = mgr.getSessionQueueSnapshot('sess-visible-queue')
 543        assert.equal(snapshot.queueLength, 1)
 544        assert.equal(snapshot.activeTurn?.runId, snapshot.activeRunId)
 545        assert.deepEqual(snapshot.items.map((item) => item.runId), [visible.runId])
 546      })
 547  
 548      it('hides internal active runs from the active-turn snapshot field', () => {
 549        const { entry, promise } = makeManualQueuedEntry({
 550          sessionId: 'sess-hidden-active',
 551          runId: 'run-hidden',
 552          message: 'heartbeat hidden',
 553          internal: true,
 554          source: 'heartbeat',
 555        })
 556        entry.run.status = 'running'
 557        entry.run.startedAt = Date.now()
 558        const state = getRuntimeState()
 559        state.runningByExecution.set(entry.executionKey, entry as unknown)
 560        state.runs.set(entry.run.id, entry.run)
 561        state.promises.set(entry.run.id, promise)
 562        pendingPromises.push(promise.catch(() => {}))
 563  
 564        try {
 565          const snapshot = mgr.getSessionQueueSnapshot('sess-hidden-active')
 566          assert.equal(snapshot.activeRunId, 'run-hidden')
 567          assert.equal(snapshot.activeTurn, null)
 568          assert.equal(snapshot.queueLength, 0)
 569        } finally {
 570          entry.resolve(undefined)
 571        }
 572      })
 573  
 574      it('reports heartbeat vs non-heartbeat queued runs', () => {
 575        enqueue({ sessionId: 'sess-hb-state', message: 'occupier' })
 576        enqueue({
 577          sessionId: 'sess-hb-state',
 578          message: 'hb',
 579          internal: true,
 580          source: 'heartbeat',
 581        })
 582        enqueue({
 583          sessionId: 'sess-hb-state',
 584          message: 'user',
 585          internal: false,
 586          source: 'chat',
 587        })
 588  
 589        const state = mgr.getSessionExecutionState('sess-hb-state')
 590        assert.equal(state.hasQueuedHeartbeat, true)
 591        assert.equal(state.hasQueuedNonHeartbeat, true)
 592      })
 593  
 594      it('publishes a shared non-heartbeat activity lease for user work', () => {
 595        enqueue({ sessionId: 'sess-lease', message: 'user work', source: 'chat' })
 596  
 597        assert.equal(mgr.hasActiveNonHeartbeatSessionLease('sess-lease'), true)
 598      })
 599  
 600      it('does not publish the shared activity lease for heartbeat-only work', () => {
 601        enqueue({
 602          sessionId: 'sess-heartbeat-only',
 603          message: 'hb',
 604          internal: true,
 605          source: 'heartbeat-wake',
 606        })
 607  
 608        assert.equal(mgr.hasActiveNonHeartbeatSessionLease('sess-heartbeat-only'), false)
 609      })
 610    })
 611  
 612    describe('listRuns', () => {
 613      it('lists all runs in reverse chronological order', () => {
 614        enqueue({ sessionId: 'sess-list-a', message: 'msg a' })
 615        enqueue({ sessionId: 'sess-list-b', message: 'msg b' })
 616  
 617        const runs = mgr.listRuns()
 618        assert.ok(runs.length >= 2)
 619        const idxA = runs.findIndex(r => r.sessionId === 'sess-list-a')
 620        const idxB = runs.findIndex(r => r.sessionId === 'sess-list-b')
 621        assert.ok(idxB < idxA, 'more recent run should be first')
 622      })
 623  
 624      it('filters by sessionId', () => {
 625        enqueue({ sessionId: 'sess-filter-a', message: 'a' })
 626        enqueue({ sessionId: 'sess-filter-b', message: 'b' })
 627  
 628        const runs = mgr.listRuns({ sessionId: 'sess-filter-a' })
 629        assert.ok(runs.length >= 1)
 630        for (const run of runs) {
 631          assert.equal(run.sessionId, 'sess-filter-a')
 632        }
 633      })
 634  
 635      it('filters by status', () => {
 636        enqueue({ sessionId: 'sess-status-a', message: 'a' })
 637        enqueue({ sessionId: 'sess-status-b', message: 'b' })
 638  
 639        // At least one should be queued synchronously
 640        const queued = mgr.listRuns({ status: 'queued' })
 641        // We just verify the filter doesn't crash and returns consistent data
 642        for (const run of queued) {
 643          assert.equal(run.status, 'queued')
 644        }
 645      })
 646  
 647      it('respects limit parameter', () => {
 648        for (let i = 0; i < 5; i++) {
 649          enqueue({ sessionId: `sess-limit-${i}`, message: `msg ${i}` })
 650        }
 651  
 652        const runs = mgr.listRuns({ limit: 2 })
 653        assert.equal(runs.length, 2)
 654      })
 655    })
 656  
 657    describe('cancelSessionRuns', () => {
 658      it('cancels queued runs for a session', () => {
 659        enqueue({ sessionId: 'sess-cancel', message: 'running' })
 660        enqueue({ sessionId: 'sess-cancel', message: 'queued 1' })
 661        enqueue({ sessionId: 'sess-cancel', message: 'queued 2' })
 662  
 663        const result = mgr.cancelSessionRuns('sess-cancel', 'User cancelled')
 664        assert.ok(result.cancelledQueued >= 2, `should cancel at least 2 queued runs, got ${result.cancelledQueued}`)
 665      })
 666  
 667      it('returns zero when no runs exist for session', () => {
 668        const result = mgr.cancelSessionRuns('nonexistent-session')
 669        assert.equal(result.cancelledQueued, 0)
 670        assert.equal(result.cancelledRunning, false)
 671      })
 672  
 673      it('does not cancel runs for other sessions', () => {
 674        enqueue({ sessionId: 'sess-keep', message: 'keep me' })
 675        enqueue({ sessionId: 'sess-cancel-other', message: 'cancel me' })
 676  
 677        mgr.cancelSessionRuns('sess-cancel-other', 'cancelled')
 678  
 679        const keepRuns = mgr.listRuns({ sessionId: 'sess-keep' })
 680        assert.ok(keepRuns.length >= 1, 'runs for other session should be preserved')
 681        const keptRun = keepRuns.find(r => r.status !== 'cancelled')
 682        assert.ok(keptRun, 'kept session run should not be cancelled')
 683      })
 684    })
 685  
 686    describe('steer mode', () => {
 687      it('cancels pending queued runs when steer mode is used', () => {
 688        enqueue({ sessionId: 'sess-steer', message: 'running' })
 689        enqueue({ sessionId: 'sess-steer', message: 'queued 1' })
 690        enqueue({ sessionId: 'sess-steer', message: 'queued 2' })
 691  
 692        const steer = enqueue({
 693          sessionId: 'sess-steer',
 694          message: 'steer message',
 695          mode: 'steer',
 696        })
 697  
 698        assert.ok(steer.runId)
 699        const steerRun = mgr.getRunById(steer.runId)
 700        assert.ok(steerRun)
 701        assert.notEqual(steerRun.status, 'cancelled')
 702      })
 703  
 704      it('steer marks previously queued runs as cancelled', () => {
 705        enqueue({ sessionId: 'sess-steer-verify', message: 'occupier' })
 706        const q1 = enqueue({ sessionId: 'sess-steer-verify', message: 'will be cancelled' })
 707        const q2 = enqueue({ sessionId: 'sess-steer-verify', message: 'also cancelled' })
 708  
 709        enqueue({
 710          sessionId: 'sess-steer-verify',
 711          message: 'steer',
 712          mode: 'steer',
 713        })
 714  
 715        const run1 = mgr.getRunById(q1.runId)
 716        const run2 = mgr.getRunById(q2.runId)
 717        assert.ok(run1)
 718        assert.ok(run2)
 719        assert.equal(run1.status, 'cancelled')
 720        assert.equal(run2.status, 'cancelled')
 721      })
 722    })
 723  
 724    describe('abort and unsubscribe', () => {
 725      it('abort function is callable without error', () => {
 726        const result = enqueue({
 727          sessionId: 'sess-abort',
 728          message: 'abort me',
 729        })
 730        assert.doesNotThrow(() => result.abort())
 731      })
 732  
 733      it('unsubscribe removes the event listener', () => {
 734        const events: unknown[] = []
 735        const result = enqueue({
 736          sessionId: 'sess-unsub',
 737          message: 'test',
 738          onEvent: (event) => events.push(event),
 739        })
 740        assert.doesNotThrow(() => result.unsubscribe())
 741      })
 742    })
 743  
 744    describe('callerSignal chaining', () => {
 745      it('propagates an already-aborted callerSignal', () => {
 746        const controller = new AbortController()
 747        controller.abort()
 748  
 749        const result = enqueue({
 750          sessionId: 'sess-pre-aborted',
 751          message: 'test',
 752          callerSignal: controller.signal,
 753        })
 754  
 755        assert.doesNotThrow(() => result.abort())
 756      })
 757  
 758      it('chains a live callerSignal to the run', () => {
 759        const controller = new AbortController()
 760  
 761        const result = enqueue({
 762          sessionId: 'sess-live-signal',
 763          message: 'test',
 764          callerSignal: controller.signal,
 765        })
 766  
 767        // Aborting the caller should work without throwing
 768        assert.doesNotThrow(() => controller.abort())
 769        assert.ok(result.runId)
 770      })
 771    })
 772  
 773    describe('run completion and drain', () => {
 774      it('run eventually transitions from queued to a terminal state', async () => {
 775        seedSession('sess-terminal')
 776        const result = enqueue({
 777          sessionId: 'sess-terminal',
 778          message: 'will fail in drain',
 779        })
 780  
 781        // Wait for drain to process
 782        await result.promise.catch(() => {})
 783  
 784        const run = mgr.getRunById(result.runId)
 785        assert.ok(run, 'run should still exist')
 786        const terminal = ['completed', 'failed', 'cancelled']
 787        assert.ok(
 788          terminal.includes(run.status),
 789          `run status should be terminal, got: ${run.status}`,
 790        )
 791      })
 792  
 793      it('drain processes next queued run after current completes', async () => {
 794        seedSession('sess-drain-chain')
 795        const result1 = enqueue({
 796          sessionId: 'sess-drain-chain',
 797          message: 'first',
 798        })
 799        const result2 = enqueue({
 800          sessionId: 'sess-drain-chain',
 801          message: 'second',
 802        })
 803  
 804        // Wait for both drains to process
 805        await Promise.allSettled([result1.promise, result2.promise])
 806  
 807        const run1 = mgr.getRunById(result1.runId)
 808        const run2 = mgr.getRunById(result2.runId)
 809        assert.ok(run1)
 810        assert.ok(run2)
 811        assert.notEqual(run1.status, 'queued', 'first run should not still be queued')
 812        assert.notEqual(run2.status, 'queued', 'second run should not still be queued')
 813      })
 814  
 815      it('failed run records error message', async () => {
 816        seedSession('sess-fail-error')
 817        const result = enqueue({
 818          sessionId: 'sess-fail-error',
 819          message: 'will error',
 820        })
 821  
 822        await result.promise.catch(() => {})
 823  
 824        const run = mgr.getRunById(result.runId)
 825        assert.ok(run)
 826        if (run.status === 'failed') {
 827          assert.ok(run.error, 'failed run should have an error message')
 828          assert.ok(run.endedAt, 'failed run should have endedAt timestamp')
 829        }
 830      })
 831  
 832      it('clears the shared activity lease after non-heartbeat work finishes', async () => {
 833        seedSession('sess-lease-clear')
 834        const result = enqueue({
 835          sessionId: 'sess-lease-clear',
 836          message: 'will fail in drain',
 837        })
 838  
 839        assert.equal(mgr.hasActiveNonHeartbeatSessionLease('sess-lease-clear'), true)
 840        await result.promise.catch(() => {})
 841        assert.equal(mgr.hasActiveNonHeartbeatSessionLease('sess-lease-clear'), false)
 842      })
 843  
 844      it('defers heartbeat runs while another worker advertises non-heartbeat activity', async () => {
 845        seedSession('sess-remote-busy')
 846        storage.tryAcquireRuntimeLock('session-non-heartbeat:sess-remote-busy', 'remote-worker', 60_000)
 847  
 848        const heartbeat = enqueue({
 849          sessionId: 'sess-remote-busy',
 850          message: 'hb',
 851          internal: true,
 852          source: 'heartbeat-wake',
 853        })
 854  
 855        await new Promise((resolve) => setTimeout(resolve, 50))
 856        const queued = mgr.getRunById(heartbeat.runId)
 857        assert.ok(queued)
 858        assert.equal(queued.status, 'queued')
 859  
 860        storage.releaseRuntimeLock('session-non-heartbeat:sess-remote-busy', 'remote-worker')
 861        await heartbeat.promise.catch(() => {})
 862  
 863        const finished = mgr.getRunById(heartbeat.runId)
 864        assert.ok(finished)
 865        assert.notEqual(finished.status, 'queued')
 866      })
 867  
 868      it('re-kicks a recent queued entry when the execution lane is idle', async () => {
 869        seedSession('sess-rekick')
 870        const runId = 'manual-rekick'
 871        const { entry, promise } = makeManualQueuedEntry({
 872          sessionId: 'sess-rekick',
 873          runId,
 874          message: 'recover me',
 875        })
 876        insertManualQueuedEntry(entry, promise)
 877  
 878        const repair = mgr.repairSessionRunQueue('sess-rekick')
 879        assert.equal(repair.recoveredQueuedRuns, 0)
 880        assert.equal(repair.kickedExecutionKeys, 1)
 881  
 882        await promise.catch(() => {})
 883  
 884        const run = mgr.getRunById(runId)
 885        assert.ok(run)
 886        assert.notEqual(run.status, 'queued')
 887      })
 888  
 889      it('recovers stale queued runs before a fresh enqueue can get wedged behind them', async () => {
 890        seedSession('sess-stale-recover')
 891        const staleRunId = 'manual-stale'
 892        const { entry, promise } = makeManualQueuedEntry({
 893          sessionId: 'sess-stale-recover',
 894          runId: staleRunId,
 895          message: 'ghost queued run',
 896          queuedAt: Date.now() - 60_000,
 897        })
 898        insertManualQueuedEntry(entry, promise)
 899  
 900        const fresh = enqueue({
 901          sessionId: 'sess-stale-recover',
 902          message: 'fresh message',
 903        })
 904  
 905        const staleResult = await promise
 906        assert.deepEqual(staleResult, {
 907          runId: staleRunId,
 908          sessionId: 'sess-stale-recover',
 909          text: '',
 910          persisted: false,
 911          toolEvents: [],
 912          error: 'Recovered stale queued run before enqueue',
 913        })
 914  
 915        const staleRun = mgr.getRunById(staleRunId)
 916        assert.ok(staleRun)
 917        assert.equal(staleRun.status, 'failed')
 918  
 919        const execution = mgr.getSessionExecutionState('sess-stale-recover')
 920        assert.ok(execution.queueLength <= 1, `expected stale run to be cleared, got queueLength=${execution.queueLength}`)
 921  
 922        await fresh.promise.catch(() => {})
 923        const freshRun = mgr.getRunById(fresh.runId)
 924        assert.ok(freshRun)
 925        assert.notEqual(freshRun.status, 'queued')
 926      })
 927    })
 928  
 929    describe('cancelAllHeartbeatRuns', () => {
 930      it('cancels queued heartbeat runs but keeps non-heartbeat runs', () => {
 931        enqueue({ sessionId: 'sess-hb-cancel', message: 'occupier' })
 932  
 933        enqueue({
 934          sessionId: 'sess-hb-cancel',
 935          message: 'heartbeat msg',
 936          internal: true,
 937          source: 'heartbeat',
 938        })
 939  
 940        enqueue({
 941          sessionId: 'sess-hb-cancel',
 942          message: 'user msg',
 943          internal: false,
 944          source: 'chat',
 945        })
 946  
 947        const result = mgr.cancelAllHeartbeatRuns('Test cancellation')
 948        assert.ok(result.cancelledQueued >= 1, 'should cancel at least 1 queued heartbeat')
 949      })
 950  
 951      it('returns zeros when no heartbeat runs exist', () => {
 952        const result = mgr.cancelAllHeartbeatRuns()
 953        assert.equal(result.cancelledQueued, 0)
 954        assert.equal(result.abortedRunning, 0)
 955      })
 956  
 957      it('preserves non-heartbeat queued runs', () => {
 958        enqueue({ sessionId: 'sess-hb-keep', message: 'occupier' })
 959  
 960        enqueue({
 961          sessionId: 'sess-hb-keep',
 962          message: 'heartbeat',
 963          internal: true,
 964          source: 'heartbeat',
 965        })
 966  
 967        const userRun = enqueue({
 968          sessionId: 'sess-hb-keep',
 969          message: 'user chat',
 970          internal: false,
 971          source: 'chat',
 972        })
 973  
 974        mgr.cancelAllHeartbeatRuns()
 975  
 976        const userRunRecord = mgr.getRunById(userRun.runId)
 977        assert.ok(userRunRecord)
 978        assert.notEqual(userRunRecord.status, 'cancelled', 'non-heartbeat run should not be cancelled')
 979      })
 980    })
 981  
 982    describe('restart recovery', () => {
 983      it('marks stale persisted runs interrupted and only requeues background sources once', () => {
 984        const output = runWithTempDataDir<{
 985          interruptedChat: boolean
 986          recoveredRunCount: number
 987          recoveredFromOriginal: boolean
 988        }>(`
 989          const storageMod = await import('./src/lib/server/storage.ts')
 990          const storage = storageMod.default || storageMod
 991          const now = Date.now()
 992          const agents = storage.loadAgents()
 993          agents.agent_test = {
 994            id: 'agent_test',
 995            name: 'Recovery Agent',
 996            provider: 'anthropic',
 997            model: 'claude-sonnet-4-20250514',
 998            systemPrompt: 'Test agent',
 999          }
1000          storage.saveAgents(agents)
1001          const sessions = storage.loadSessions()
1002          sessions.sess_bg = {
1003            id: 'sess_bg',
1004            agentId: 'agent_test',
1005            messages: [],
1006            createdAt: now,
1007            lastActiveAt: now,
1008          }
1009          sessions.sess_chat = {
1010            id: 'sess_chat',
1011            agentId: 'agent_test',
1012            messages: [],
1013            createdAt: now,
1014            lastActiveAt: now,
1015          }
1016          storage.saveSessions(sessions)
1017          storage.upsertRuntimeRun('run_bg', {
1018            id: 'run_bg',
1019            sessionId: 'sess_bg',
1020            source: 'heartbeat',
1021            internal: true,
1022            mode: 'collect',
1023            status: 'queued',
1024            messagePreview: 'heartbeat',
1025            queuedAt: now - 1000,
1026            recoveryPayload: {
1027              message: 'resume heartbeat',
1028              internal: true,
1029              source: 'heartbeat',
1030              mode: 'collect',
1031            },
1032          })
1033          storage.upsertRuntimeRun('run_chat', {
1034            id: 'run_chat',
1035            sessionId: 'sess_chat',
1036            source: 'chat',
1037            internal: false,
1038            mode: 'followup',
1039            status: 'running',
1040            messagePreview: 'chat',
1041            queuedAt: now - 1000,
1042            recoveryPayload: {
1043              message: 'user chat',
1044              internal: false,
1045              source: 'chat',
1046              mode: 'followup',
1047            },
1048          })
1049          const mgrMod = await import('./src/lib/server/runtime/session-run-manager.ts')
1050          const mgr = mgrMod.default || mgrMod
1051          mgr.listRuns()
1052          await new Promise((resolve) => setTimeout(resolve, 25))
1053          const runs = Object.values(storage.loadRuntimeRuns())
1054          const originalChat = runs.find((run) => run.id === 'run_chat')
1055          const recovered = runs.filter((run) => run.recoveredFromRestart === true)
1056          console.log(JSON.stringify({
1057            interruptedChat: !!originalChat?.interruptedAt && originalChat?.status === 'cancelled',
1058            recoveredRunCount: recovered.length,
1059            recoveredFromOriginal: recovered[0]?.recoveredFromRunId === 'run_bg',
1060          }))
1061        `, { prefix: 'swarmclaw-session-run-recovery-' })
1062  
1063        assert.equal(output.interruptedChat, true)
1064        assert.equal(output.recoveredRunCount, 1)
1065        assert.equal(output.recoveredFromOriginal, true)
1066      })
1067    })
1068  
1069    describe('getRunById', () => {
1070      it('returns null for non-existent run', () => {
1071        assert.equal(mgr.getRunById('nonexistent'), null)
1072      })
1073    })
1074  })