/ tests / agents / monitor-agent.test.js
monitor-agent.test.js
  1  /**
  2   * Monitor Agent Tests
  3   *
  4   * Tests the enhanced Monitor agent capabilities including:
  5   * - Throughput-based pipeline bottleneck detection
  6   * - Loop detection (site retry, agent bounce, log domain)
  7   * - Blocked task triage (replaces auto-cancel)
  8   * - Agent queue depth monitoring
  9   */
 10  
 11  import { test, beforeEach, afterEach } from 'node:test';
 12  import assert from 'node:assert/strict';
 13  import Database from 'better-sqlite3';
 14  import { createPgMock } from '../helpers/pg-mock.js'; // eslint-disable-line no-unused-vars
 15  
 16  function setupTestDatabase() {
 17    const db = new Database(':memory:');
 18  
 19    db.exec(`
 20      CREATE TABLE IF NOT EXISTS sites (
 21        id INTEGER PRIMARY KEY AUTOINCREMENT,
 22        domain TEXT,
 23        landing_page_url TEXT,
 24        status TEXT DEFAULT 'found',
 25        error_message TEXT,
 26        score REAL,
 27        grade TEXT,
 28        score_json TEXT,
 29        contacts_json TEXT,
 30        country_code TEXT,
 31        recapture_count INTEGER DEFAULT 0,
 32        recapture_at TEXT,
 33        updated_at TEXT DEFAULT (datetime('now')),
 34        created_at TEXT DEFAULT (datetime('now')),
 35        rescored_at DATETIME
 36      );
 37  
 38      CREATE TABLE IF NOT EXISTS agent_tasks (
 39        id INTEGER PRIMARY KEY AUTOINCREMENT,
 40        task_type TEXT NOT NULL,
 41        assigned_to TEXT NOT NULL,
 42        created_by TEXT,
 43        priority INTEGER DEFAULT 5,
 44        status TEXT DEFAULT 'pending',
 45        context_json TEXT,
 46        parent_task_id INTEGER,
 47        result_json TEXT,
 48        error_message TEXT,
 49        retry_count INTEGER DEFAULT 0,
 50        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
 51        started_at TIMESTAMP,
 52        completed_at TIMESTAMP,
 53        FOREIGN KEY (parent_task_id) REFERENCES agent_tasks(id)
 54      );
 55  
 56      CREATE INDEX IF NOT EXISTS idx_agent_tasks_assigned_status
 57        ON agent_tasks(assigned_to, status, priority DESC, created_at ASC);
 58  
 59      CREATE TABLE IF NOT EXISTS pipeline_metrics (
 60        id INTEGER PRIMARY KEY AUTOINCREMENT,
 61        stage_name TEXT NOT NULL,
 62        sites_processed INTEGER DEFAULT 0,
 63        duration_ms INTEGER DEFAULT 0,
 64        started_at TEXT DEFAULT (datetime('now')),
 65        finished_at TEXT DEFAULT (datetime('now'))
 66      );
 67  
 68      CREATE TABLE IF NOT EXISTS agent_state (
 69        agent_name TEXT PRIMARY KEY,
 70        status TEXT DEFAULT 'idle',
 71        metrics_json TEXT,
 72        last_active TEXT DEFAULT (datetime('now'))
 73      );
 74  
 75      CREATE TABLE IF NOT EXISTS agent_logs (
 76        id INTEGER PRIMARY KEY AUTOINCREMENT,
 77        task_id INTEGER,
 78        agent_name TEXT NOT NULL,
 79        log_level TEXT NOT NULL,
 80        message TEXT NOT NULL,
 81        context_json TEXT,
 82        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
 83      );
 84  
 85      CREATE TABLE IF NOT EXISTS settings (
 86        key TEXT PRIMARY KEY,
 87        value TEXT,
 88        description TEXT,
 89        updated_at TEXT DEFAULT (datetime('now'))
 90      );
 91  
 92      CREATE TABLE IF NOT EXISTS human_review_queue (
 93        id INTEGER PRIMARY KEY AUTOINCREMENT,
 94        file TEXT, reason TEXT, type TEXT,
 95        priority TEXT DEFAULT 'medium',
 96        status TEXT DEFAULT 'pending',
 97        created_at TEXT DEFAULT (datetime('now'))
 98      );
 99  
100      CREATE TABLE IF NOT EXISTS agent_messages (
101        id INTEGER PRIMARY KEY AUTOINCREMENT,
102        from_agent TEXT NOT NULL,
103        to_agent TEXT NOT NULL,
104        message_type TEXT NOT NULL,
105        content TEXT,
106        context_json TEXT,
107        read_at TEXT,
108        created_at TEXT DEFAULT (datetime('now'))
109      );
110  
111      CREATE TABLE IF NOT EXISTS agent_outcomes (
112        id INTEGER PRIMARY KEY AUTOINCREMENT,
113        task_id INTEGER,
114        agent_name TEXT NOT NULL,
115        outcome TEXT NOT NULL,
116        context_json TEXT,
117        created_at TEXT DEFAULT (datetime('now'))
118      );
119  
120      CREATE TABLE IF NOT EXISTS cron_locks (lock_key TEXT PRIMARY KEY, acquired_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, description TEXT);
121  
122      -- Insert monitor agent state
123      INSERT OR IGNORE INTO agent_state (agent_name, status) VALUES ('monitor', 'idle');
124    `);
125  
126    return db;
127  }
128  
129  test('Monitor Agent - Pipeline Health (throughput-based)', async t => {
130    let db;
131  
132    beforeEach(() => {
133      db = setupTestDatabase();
134    });
135  
136    afterEach(() => {
137      if (db?.open) db.close();
138    });
139  
140    await t.test('detects stalled stage with queue and no recent pipeline_metrics', () => {
141      // Create sites waiting at assets_captured (input to scoring)
142      for (let i = 0; i < 50; i++) {
143        db.prepare(`INSERT INTO sites (domain, status) VALUES (?, ?)`).run(
144          `site${i}.com`,
145          'assets_captured'
146        );
147      }
148  
149      // Add a stale pipeline_metrics entry (>30 min ago)
150      db.prepare(
151        `INSERT INTO pipeline_metrics (stage_name, sites_processed, duration_ms, started_at, finished_at)
152         VALUES (?, ?, ?, datetime('now', '-45 minutes'), datetime('now', '-40 minutes'))`
153      ).run('scoring', 10, 5000);
154  
155      // Query same logic as monitor's checkPipelineHealth
156      const queueCount = db
157        .prepare('SELECT COUNT(*) as count FROM sites WHERE status = ?')
158        .get('assets_captured').count;
159  
160      const lastRun = db
161        .prepare(
162          `SELECT sites_processed, duration_ms, finished_at
163           FROM pipeline_metrics WHERE stage_name = ?
164           ORDER BY started_at DESC LIMIT 1`
165        )
166        .get('scoring');
167  
168      assert.equal(queueCount, 50);
169      assert.ok(lastRun, 'Should have pipeline metrics');
170  
171      const minutesSinceRun = (Date.now() - new Date(lastRun.finished_at).getTime()) / 60000;
172      assert.ok(minutesSinceRun > 30, 'Should detect stage as stalled (>30min)');
173    });
174  
175    await t.test('does not alert for stages with empty queues', () => {
176      // No sites at assets_captured
177      const queueCount = db
178        .prepare('SELECT COUNT(*) as count FROM sites WHERE status = ?')
179        .get('assets_captured').count;
180  
181      assert.equal(queueCount, 0, 'Empty queue should not trigger alert');
182    });
183  
184    await t.test('detects slow stage (15-30 min)', () => {
185      // Create waiting sites
186      db.prepare('INSERT INTO sites (domain, status) VALUES (?, ?)').run('test.com', 'found');
187  
188      // Use explicit ISO timestamp 20 minutes ago (avoids SQLite UTC vs JS Date issues)
189      const twentyMinAgo = new Date(Date.now() - 20 * 60 * 1000).toISOString();
190      const twentyFiveMinAgo = new Date(Date.now() - 25 * 60 * 1000).toISOString();
191  
192      db.prepare(
193        `INSERT INTO pipeline_metrics (stage_name, sites_processed, duration_ms, started_at, finished_at)
194         VALUES (?, ?, ?, ?, ?)`
195      ).run('assets', 5, 3000, twentyFiveMinAgo, twentyMinAgo);
196  
197      const lastRun = db
198        .prepare(
199          `SELECT finished_at FROM pipeline_metrics WHERE stage_name = ?
200           ORDER BY started_at DESC LIMIT 1`
201        )
202        .get('assets');
203  
204      const minutesSinceRun = (Date.now() - new Date(lastRun.finished_at).getTime()) / 60000;
205      assert.ok(
206        minutesSinceRun > 15,
207        `Should detect as slow (>15min), got ${minutesSinceRun.toFixed(1)}min`
208      );
209      assert.ok(
210        minutesSinceRun < 30,
211        `Should not be stalled yet (<30min), got ${minutesSinceRun.toFixed(1)}min`
212      );
213    });
214  });
215  
216  test('Monitor Agent - Loop Detection', async t => {
217    let db;
218  
219    beforeEach(() => {
220      db = setupTestDatabase();
221    });
222  
223    afterEach(() => {
224      if (db?.open) db.close();
225    });
226  
227    await t.test('detects site retry loops (recapture_count > 3)', () => {
228      db.prepare(`INSERT INTO sites (domain, status, recapture_count) VALUES (?, ?, ?)`).run(
229        'looping-site.com',
230        'failing',
231        5
232      );
233  
234      db.prepare(`INSERT INTO sites (domain, status, recapture_count) VALUES (?, ?, ?)`).run(
235        'normal-site.com',
236        'prog_scored',
237        1
238      );
239  
240      const loops = db
241        .prepare(
242          `SELECT id, domain, status, recapture_count
243           FROM sites WHERE recapture_count > 3
244           ORDER BY recapture_count DESC`
245        )
246        .all();
247  
248      assert.equal(loops.length, 1);
249      assert.equal(loops[0].domain, 'looping-site.com');
250      assert.equal(loops[0].recapture_count, 5);
251    });
252  
253    await t.test('detects agent task bounce loops (>3 children in 24hr)', () => {
254      // Create a parent task
255      const parentId = db
256        .prepare(
257          `INSERT INTO agent_tasks (task_type, assigned_to, status, created_at)
258           VALUES (?, ?, ?, datetime('now', '-2 hours'))`
259        )
260        .run('classify_error', 'triage', 'completed').lastInsertRowid;
261  
262      // Create 4 child tasks (bounce loop)
263      for (let i = 0; i < 4; i++) {
264        db.prepare(
265          `INSERT INTO agent_tasks (task_type, assigned_to, parent_task_id, created_at)
266           VALUES (?, ?, ?, datetime('now', '-${i} hours'))`
267        ).run('fix_bug', i % 2 === 0 ? 'developer' : 'qa', parentId);
268      }
269  
270      const bounceLoops = db
271        .prepare(
272          `SELECT parent_task_id, COUNT(*) as bounce_count,
273                  GROUP_CONCAT(DISTINCT assigned_to) as agents
274           FROM agent_tasks
275           WHERE parent_task_id IS NOT NULL
276             AND created_at > datetime('now', '-24 hours')
277           GROUP BY parent_task_id
278           HAVING bounce_count > 3`
279        )
280        .all();
281  
282      assert.equal(bounceLoops.length, 1);
283      assert.equal(bounceLoops[0].parent_task_id, parentId);
284      assert.equal(bounceLoops[0].bounce_count, 4);
285      assert.ok(bounceLoops[0].agents.includes('developer'));
286      assert.ok(bounceLoops[0].agents.includes('qa'));
287    });
288  });
289  
290  test('Monitor Agent - Blocked Task Triage', async t => {
291    let db;
292  
293    beforeEach(() => {
294      db = setupTestDatabase();
295    });
296  
297    afterEach(() => {
298      if (db?.open) db.close();
299    });
300  
301    await t.test('finds blocked tasks older than 2 hours', () => {
302      // Create a task blocked for 3 hours
303      db.prepare(
304        `INSERT INTO agent_tasks (task_type, assigned_to, status, error_message, created_at)
305         VALUES (?, ?, ?, ?, datetime('now', '-3 hours'))`
306      ).run('fix_bug', 'developer', 'blocked', 'Cannot reproduce');
307  
308      // Create a recently blocked task (should be ignored)
309      db.prepare(
310        `INSERT INTO agent_tasks (task_type, assigned_to, status, error_message, created_at)
311         VALUES (?, ?, ?, ?, datetime('now', '-30 minutes'))`
312      ).run('fix_bug', 'developer', 'blocked', 'Working on it');
313  
314      const blockedTasks = db
315        .prepare(
316          `SELECT id, task_type, assigned_to, error_message,
317                  CAST((julianday('now') - julianday(created_at)) * 24 AS REAL) as hours_blocked
318           FROM agent_tasks
319           WHERE status = 'blocked'
320             AND created_at < datetime('now', '-2 hours')
321           ORDER BY created_at ASC`
322        )
323        .all();
324  
325      assert.equal(blockedTasks.length, 1, 'Should only find the 3hr blocked task');
326      assert.ok(blockedTasks[0].hours_blocked > 2, 'Should be blocked >2 hours');
327    });
328  
329    await t.test('does not create duplicate triage tasks', () => {
330      // Create a blocked task
331      const blockedId = db
332        .prepare(
333          `INSERT INTO agent_tasks (task_type, assigned_to, status, created_at)
334           VALUES (?, ?, ?, datetime('now', '-3 hours'))`
335        )
336        .run('fix_bug', 'developer', 'blocked').lastInsertRowid;
337  
338      // Create an existing triage task for it
339      db.prepare(
340        `INSERT INTO agent_tasks (task_type, assigned_to, parent_task_id, status)
341         VALUES (?, ?, ?, ?)`
342      ).run('classify_error', 'triage', blockedId, 'pending');
343  
344      const existingTriage = db
345        .prepare(
346          `SELECT id FROM agent_tasks
347           WHERE parent_task_id = ? AND task_type = 'classify_error'
348             AND status NOT IN ('completed', 'failed')
349           LIMIT 1`
350        )
351        .get(blockedId);
352  
353      assert.ok(existingTriage, 'Should find existing triage task');
354      assert.ok(existingTriage.id > 0, 'Should have valid ID');
355    });
356  });
357  
358  test('Monitor Agent - Agent Queue Depth', async t => {
359    let db;
360  
361    beforeEach(() => {
362      db = setupTestDatabase();
363    });
364  
365    afterEach(() => {
366      if (db?.open) db.close();
367    });
368  
369    await t.test('counts pending tasks per agent', () => {
370      // Add pending tasks for developer
371      for (let i = 0; i < 25; i++) {
372        db.prepare(
373          `INSERT INTO agent_tasks (task_type, assigned_to, status)
374           VALUES (?, ?, ?)`
375        ).run('fix_bug', 'developer', 'pending');
376      }
377  
378      // Add pending tasks for qa
379      for (let i = 0; i < 5; i++) {
380        db.prepare(
381          `INSERT INTO agent_tasks (task_type, assigned_to, status)
382           VALUES (?, ?, ?)`
383        ).run('run_tests', 'qa', 'pending');
384      }
385  
386      const queueDepths = db
387        .prepare(
388          `SELECT assigned_to, COUNT(*) as pending_count
389           FROM agent_tasks WHERE status = 'pending'
390           GROUP BY assigned_to`
391        )
392        .all();
393  
394      const devQueue = queueDepths.find(q => q.assigned_to === 'developer');
395      const qaQueue = queueDepths.find(q => q.assigned_to === 'qa');
396  
397      assert.equal(devQueue.pending_count, 25, 'Developer should have 25 pending');
398      assert.equal(qaQueue.pending_count, 5, 'QA should have 5 pending');
399      assert.ok(devQueue.pending_count > 20, 'Developer queue should trigger warning');
400      assert.ok(qaQueue.pending_count <= 20, 'QA queue should be ok');
401    });
402  });
403  
404  test('Monitor Agent - Tiered Scheduling via Cron', async t => {
405    let db;
406  
407    beforeEach(() => {
408      db = setupTestDatabase();
409  
410      // Create cron_jobs table
411      db.exec(`
412        CREATE TABLE IF NOT EXISTS cron_jobs (
413          id INTEGER PRIMARY KEY AUTOINCREMENT,
414          name TEXT NOT NULL,
415          task_key TEXT NOT NULL UNIQUE,
416          description TEXT,
417          handler_type TEXT NOT NULL,
418          handler_value TEXT NOT NULL,
419          interval_value INTEGER NOT NULL,
420          interval_unit TEXT NOT NULL,
421          enabled INTEGER DEFAULT 1,
422          last_run_at TEXT,
423          created_at TEXT DEFAULT (datetime('now'))
424        );
425      `);
426    });
427  
428    afterEach(() => {
429      if (db?.open) db.close();
430    });
431  
432    await t.test('migration creates 3 tiered cron jobs', () => {
433      // Simulate the migration
434      db.exec(`
435        INSERT OR REPLACE INTO cron_jobs
436          (name, task_key, description, handler_type, handler_value,
437           interval_value, interval_unit, enabled)
438        VALUES
439          ('Process Guardian', 'processGuardian',
440           'Service restarts, circuit breaker, clearance cycle detection',
441           'function', 'processGuardian', 1, 'minutes', 1);
442  
443        INSERT OR REPLACE INTO cron_jobs
444          (name, task_key, description, handler_type, handler_value,
445           interval_value, interval_unit, enabled)
446        VALUES
447          ('Pipeline Monitor', 'monitorPipeline',
448           'Bottlenecks, blocked tasks, loops, log scanning, compliance',
449           'function', 'monitorPipeline', 5, 'minutes', 1);
450  
451        INSERT OR REPLACE INTO cron_jobs
452          (name, task_key, description, handler_type, handler_value,
453           interval_value, interval_unit, enabled)
454        VALUES
455          ('System Health', 'monitorSystem',
456           'Agent health, queue depth, throughput, anomalies, SLO compliance',
457           'function', 'monitorSystem', 30, 'minutes', 1);
458      `);
459  
460      const jobs = db
461        .prepare(
462          'SELECT name, task_key, interval_value, interval_unit FROM cron_jobs ORDER BY interval_value'
463        )
464        .all();
465  
466      assert.equal(jobs.length, 3, 'Should have 3 tiered cron jobs');
467  
468      assert.equal(jobs[0].task_key, 'processGuardian');
469      assert.equal(jobs[0].interval_value, 1);
470      assert.equal(jobs[0].interval_unit, 'minutes');
471  
472      assert.equal(jobs[1].task_key, 'monitorPipeline');
473      assert.equal(jobs[1].interval_value, 5);
474      assert.equal(jobs[1].interval_unit, 'minutes');
475  
476      assert.equal(jobs[2].task_key, 'monitorSystem');
477      assert.equal(jobs[2].interval_value, 30);
478      assert.equal(jobs[2].interval_unit, 'minutes');
479    });
480  });