/ src / lib / server / runtime / watch-jobs.test.ts
watch-jobs.test.ts
  1  import assert from 'node:assert/strict'
  2  import fs from 'node:fs'
  3  import os from 'node:os'
  4  import path from 'node:path'
  5  import { after, before, describe, it } from 'node:test'
  6  
  7  const originalEnv = {
  8    DATA_DIR: process.env.DATA_DIR,
  9    WORKSPACE_DIR: process.env.WORKSPACE_DIR,
 10    SWARMCLAW_BUILD_MODE: process.env.SWARMCLAW_BUILD_MODE,
 11  }
 12  
 13  let tempDir = ''
 14  let watchJobs: typeof import('@/lib/server/runtime/watch-jobs')
 15  let storage: typeof import('@/lib/server/storage')
 16  
 17  before(async () => {
 18    tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'swarmclaw-watch-jobs-'))
 19    process.env.DATA_DIR = path.join(tempDir, 'data')
 20    process.env.WORKSPACE_DIR = path.join(tempDir, 'workspace')
 21    process.env.SWARMCLAW_BUILD_MODE = '1'
 22    watchJobs = await import('@/lib/server/runtime/watch-jobs')
 23    storage = await import('@/lib/server/storage')
 24  })
 25  
 26  after(() => {
 27    if (originalEnv.DATA_DIR === undefined) delete process.env.DATA_DIR
 28    else process.env.DATA_DIR = originalEnv.DATA_DIR
 29    if (originalEnv.WORKSPACE_DIR === undefined) delete process.env.WORKSPACE_DIR
 30    else process.env.WORKSPACE_DIR = originalEnv.WORKSPACE_DIR
 31    if (originalEnv.SWARMCLAW_BUILD_MODE === undefined) delete process.env.SWARMCLAW_BUILD_MODE
 32    else process.env.SWARMCLAW_BUILD_MODE = originalEnv.SWARMCLAW_BUILD_MODE
 33    fs.rmSync(tempDir, { recursive: true, force: true })
 34  })
 35  
 36  describe('watch-jobs', () => {
 37    it('validates required targets for durable watches', async () => {
 38      await assert.rejects(
 39        watchJobs.createWatchJob({
 40          type: 'http',
 41          resumeMessage: 'resume',
 42          target: {},
 43          condition: {},
 44        }),
 45        /url target/,
 46      )
 47  
 48      await assert.rejects(
 49        watchJobs.createWatchJob({
 50          type: 'time',
 51          resumeMessage: 'resume',
 52          target: { source: 'test' },
 53          condition: {},
 54        }),
 55        /runAt or delayMinutes/,
 56      )
 57    })
 58  
 59    it('triggers time and task watches durably', async () => {
 60      const tasks = storage.loadTasks()
 61      tasks.task_done = {
 62        id: 'task_done',
 63        title: 'done',
 64        status: 'completed',
 65        result: 'ok',
 66        createdAt: Date.now(),
 67        updatedAt: Date.now(),
 68      }
 69      storage.saveTasks(tasks)
 70  
 71      const timeJob = await watchJobs.createWatchJob({
 72        type: 'time',
 73        resumeMessage: 'wake up',
 74        target: { source: 'schedule_wake' },
 75        condition: {},
 76        runAt: Date.now() - 1000,
 77      })
 78      const taskJob = await watchJobs.createWatchJob({
 79        type: 'task',
 80        resumeMessage: 'task finished',
 81        target: { taskId: 'task_done' },
 82        condition: { statusIn: ['completed'] },
 83      })
 84  
 85      const outcome = await watchJobs.processDueWatchJobs(Date.now())
 86  
 87      assert.equal(outcome.triggered >= 2, true)
 88      assert.equal(watchJobs.getWatchJob(timeJob.id)?.status, 'triggered')
 89      assert.equal(watchJobs.getWatchJob(taskJob.id)?.status, 'triggered')
 90      assert.equal(watchJobs.getWatchJob(taskJob.id)?.result?.status, 'completed')
 91    })
 92  
 93    it('captures file changes and webhook triggers', async () => {
 94      const watchedFile = path.join(tempDir, 'watch.txt')
 95      fs.writeFileSync(watchedFile, 'alpha')
 96  
 97      const fileJob = await watchJobs.createWatchJob({
 98        type: 'file',
 99        resumeMessage: 'file changed',
100        target: { path: watchedFile },
101        condition: { changed: true },
102      })
103      const webhookJob = await watchJobs.createWatchJob({
104        type: 'webhook',
105        resumeMessage: 'webhook arrived',
106        target: { webhookId: 'wh_1' },
107        condition: { event: 'build.finished' },
108      })
109  
110      fs.writeFileSync(watchedFile, 'beta')
111      await watchJobs.processDueWatchJobs(Date.now())
112      const webhookMatches = watchJobs.triggerWebhookWatchJobs({
113        webhookId: 'wh_1',
114        event: 'build.finished',
115        payloadPreview: '{"ok":true}',
116      })
117  
118      assert.equal(watchJobs.getWatchJob(fileJob.id)?.status, 'triggered')
119      assert.match(String(watchJobs.getWatchJob(fileJob.id)?.result?.preview || ''), /beta/)
120      assert.equal(webhookMatches.length, 1)
121      assert.equal(watchJobs.getWatchJob(webhookJob.id)?.status, 'triggered')
122    })
123  
124    it('wakes mailbox and approval watches from event triggers', async () => {
125      storage.upsertApproval('approval_1', {
126        id: 'approval_1',
127        category: 'human_loop',
128        title: 'Need approval',
129        description: 'Approve the action',
130        data: {},
131        createdAt: Date.now(),
132        updatedAt: Date.now(),
133        status: 'pending',
134      })
135  
136      const mailboxJob = await watchJobs.createWatchJob({
137        type: 'mailbox',
138        resumeMessage: 'human replied',
139        target: { sessionId: 'session_1' },
140        condition: { type: 'human_reply', correlationId: 'corr_1' },
141      })
142      const approvalJob = await watchJobs.createWatchJob({
143        type: 'approval',
144        resumeMessage: 'approval updated',
145        target: { approvalId: 'approval_1' },
146        condition: { statusIn: ['approved'] },
147      })
148  
149      const mailboxMatches = watchJobs.triggerMailboxWatchJobs({
150        sessionId: 'session_1',
151        envelope: {
152          id: 'env_1',
153          type: 'human_reply',
154          payload: 'approved',
155          toSessionId: 'session_1',
156          correlationId: 'corr_1',
157          status: 'new',
158          createdAt: Date.now(),
159        },
160      })
161      const approvalMatches = watchJobs.triggerApprovalWatchJobs({
162        approvalId: 'approval_1',
163        status: 'approved',
164      })
165  
166      assert.equal(mailboxMatches.length, 1)
167      assert.equal(approvalMatches.length, 1)
168      assert.equal(watchJobs.getWatchJob(mailboxJob.id)?.status, 'triggered')
169      assert.equal(watchJobs.getWatchJob(approvalJob.id)?.status, 'triggered')
170      assert.equal(watchJobs.getWatchJob(mailboxJob.id)?.result?.correlationId, 'corr_1')
171      assert.equal(watchJobs.getWatchJob(approvalJob.id)?.result?.status, 'approved')
172    })
173  
174    it('reuses an existing active mailbox watch for the same wait condition', async () => {
175      const first = await watchJobs.createWatchJob({
176        type: 'mailbox',
177        sessionId: 'session_dup',
178        agentId: 'agent_dup',
179        createdByAgentId: 'agent_dup',
180        resumeMessage: 'human replied',
181        target: { sessionId: 'session_dup' },
182        condition: { type: 'human_reply', correlationId: 'corr_dup' },
183      })
184      const second = await watchJobs.createWatchJob({
185        type: 'mailbox',
186        sessionId: 'session_dup',
187        agentId: 'agent_dup',
188        createdByAgentId: 'agent_dup',
189        resumeMessage: 'human replied again',
190        target: { sessionId: 'session_dup' },
191        condition: { type: 'human_reply', correlationId: 'corr_dup' },
192      })
193  
194      assert.equal(second.id, first.id)
195      assert.equal(watchJobs.listWatchJobs({ sessionId: 'session_dup', status: 'active' }).length, 1)
196    })
197  })