watch-jobs.test.ts
1 import assert from 'node:assert/strict' 2 import fs from 'node:fs' 3 import os from 'node:os' 4 import path from 'node:path' 5 import { after, before, describe, it } from 'node:test' 6 7 const originalEnv = { 8 DATA_DIR: process.env.DATA_DIR, 9 WORKSPACE_DIR: process.env.WORKSPACE_DIR, 10 SWARMCLAW_BUILD_MODE: process.env.SWARMCLAW_BUILD_MODE, 11 } 12 13 let tempDir = '' 14 let watchJobs: typeof import('@/lib/server/runtime/watch-jobs') 15 let storage: typeof import('@/lib/server/storage') 16 17 before(async () => { 18 tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'swarmclaw-watch-jobs-')) 19 process.env.DATA_DIR = path.join(tempDir, 'data') 20 process.env.WORKSPACE_DIR = path.join(tempDir, 'workspace') 21 process.env.SWARMCLAW_BUILD_MODE = '1' 22 watchJobs = await import('@/lib/server/runtime/watch-jobs') 23 storage = await import('@/lib/server/storage') 24 }) 25 26 after(() => { 27 if (originalEnv.DATA_DIR === undefined) delete process.env.DATA_DIR 28 else process.env.DATA_DIR = originalEnv.DATA_DIR 29 if (originalEnv.WORKSPACE_DIR === undefined) delete process.env.WORKSPACE_DIR 30 else process.env.WORKSPACE_DIR = originalEnv.WORKSPACE_DIR 31 if (originalEnv.SWARMCLAW_BUILD_MODE === undefined) delete process.env.SWARMCLAW_BUILD_MODE 32 else process.env.SWARMCLAW_BUILD_MODE = originalEnv.SWARMCLAW_BUILD_MODE 33 fs.rmSync(tempDir, { recursive: true, force: true }) 34 }) 35 36 describe('watch-jobs', () => { 37 it('validates required targets for durable watches', async () => { 38 await assert.rejects( 39 watchJobs.createWatchJob({ 40 type: 'http', 41 resumeMessage: 'resume', 42 target: {}, 43 condition: {}, 44 }), 45 /url target/, 46 ) 47 48 await assert.rejects( 49 watchJobs.createWatchJob({ 50 type: 'time', 51 resumeMessage: 'resume', 52 target: { source: 'test' }, 53 condition: {}, 54 }), 55 /runAt or delayMinutes/, 56 ) 57 }) 58 59 it('triggers time and task watches durably', async () => { 60 const tasks = storage.loadTasks() 61 tasks.task_done = { 62 id: 'task_done', 63 title: 'done', 64 status: 'completed', 65 result: 'ok', 66 createdAt: Date.now(), 67 updatedAt: Date.now(), 68 } 69 storage.saveTasks(tasks) 70 71 const timeJob = await watchJobs.createWatchJob({ 72 type: 'time', 73 resumeMessage: 'wake up', 74 target: { source: 'schedule_wake' }, 75 condition: {}, 76 runAt: Date.now() - 1000, 77 }) 78 const taskJob = await watchJobs.createWatchJob({ 79 type: 'task', 80 resumeMessage: 'task finished', 81 target: { taskId: 'task_done' }, 82 condition: { statusIn: ['completed'] }, 83 }) 84 85 const outcome = await watchJobs.processDueWatchJobs(Date.now()) 86 87 assert.equal(outcome.triggered >= 2, true) 88 assert.equal(watchJobs.getWatchJob(timeJob.id)?.status, 'triggered') 89 assert.equal(watchJobs.getWatchJob(taskJob.id)?.status, 'triggered') 90 assert.equal(watchJobs.getWatchJob(taskJob.id)?.result?.status, 'completed') 91 }) 92 93 it('captures file changes and webhook triggers', async () => { 94 const watchedFile = path.join(tempDir, 'watch.txt') 95 fs.writeFileSync(watchedFile, 'alpha') 96 97 const fileJob = await watchJobs.createWatchJob({ 98 type: 'file', 99 resumeMessage: 'file changed', 100 target: { path: watchedFile }, 101 condition: { changed: true }, 102 }) 103 const webhookJob = await watchJobs.createWatchJob({ 104 type: 'webhook', 105 resumeMessage: 'webhook arrived', 106 target: { webhookId: 'wh_1' }, 107 condition: { event: 'build.finished' }, 108 }) 109 110 fs.writeFileSync(watchedFile, 'beta') 111 await watchJobs.processDueWatchJobs(Date.now()) 112 const webhookMatches = watchJobs.triggerWebhookWatchJobs({ 113 webhookId: 'wh_1', 114 event: 'build.finished', 115 payloadPreview: '{"ok":true}', 116 }) 117 118 assert.equal(watchJobs.getWatchJob(fileJob.id)?.status, 'triggered') 119 assert.match(String(watchJobs.getWatchJob(fileJob.id)?.result?.preview || ''), /beta/) 120 assert.equal(webhookMatches.length, 1) 121 assert.equal(watchJobs.getWatchJob(webhookJob.id)?.status, 'triggered') 122 }) 123 124 it('wakes mailbox and approval watches from event triggers', async () => { 125 storage.upsertApproval('approval_1', { 126 id: 'approval_1', 127 category: 'human_loop', 128 title: 'Need approval', 129 description: 'Approve the action', 130 data: {}, 131 createdAt: Date.now(), 132 updatedAt: Date.now(), 133 status: 'pending', 134 }) 135 136 const mailboxJob = await watchJobs.createWatchJob({ 137 type: 'mailbox', 138 resumeMessage: 'human replied', 139 target: { sessionId: 'session_1' }, 140 condition: { type: 'human_reply', correlationId: 'corr_1' }, 141 }) 142 const approvalJob = await watchJobs.createWatchJob({ 143 type: 'approval', 144 resumeMessage: 'approval updated', 145 target: { approvalId: 'approval_1' }, 146 condition: { statusIn: ['approved'] }, 147 }) 148 149 const mailboxMatches = watchJobs.triggerMailboxWatchJobs({ 150 sessionId: 'session_1', 151 envelope: { 152 id: 'env_1', 153 type: 'human_reply', 154 payload: 'approved', 155 toSessionId: 'session_1', 156 correlationId: 'corr_1', 157 status: 'new', 158 createdAt: Date.now(), 159 }, 160 }) 161 const approvalMatches = watchJobs.triggerApprovalWatchJobs({ 162 approvalId: 'approval_1', 163 status: 'approved', 164 }) 165 166 assert.equal(mailboxMatches.length, 1) 167 assert.equal(approvalMatches.length, 1) 168 assert.equal(watchJobs.getWatchJob(mailboxJob.id)?.status, 'triggered') 169 assert.equal(watchJobs.getWatchJob(approvalJob.id)?.status, 'triggered') 170 assert.equal(watchJobs.getWatchJob(mailboxJob.id)?.result?.correlationId, 'corr_1') 171 assert.equal(watchJobs.getWatchJob(approvalJob.id)?.result?.status, 'approved') 172 }) 173 174 it('reuses an existing active mailbox watch for the same wait condition', async () => { 175 const first = await watchJobs.createWatchJob({ 176 type: 'mailbox', 177 sessionId: 'session_dup', 178 agentId: 'agent_dup', 179 createdByAgentId: 'agent_dup', 180 resumeMessage: 'human replied', 181 target: { sessionId: 'session_dup' }, 182 condition: { type: 'human_reply', correlationId: 'corr_dup' }, 183 }) 184 const second = await watchJobs.createWatchJob({ 185 type: 'mailbox', 186 sessionId: 'session_dup', 187 agentId: 'agent_dup', 188 createdByAgentId: 'agent_dup', 189 resumeMessage: 'human replied again', 190 target: { sessionId: 'session_dup' }, 191 condition: { type: 'human_reply', correlationId: 'corr_dup' }, 192 }) 193 194 assert.equal(second.id, first.id) 195 assert.equal(watchJobs.listWatchJobs({ sessionId: 'session_dup', status: 'active' }).length, 1) 196 }) 197 })