monitor-agent.test.js
1 /** 2 * Monitor Agent Tests 3 * 4 * Tests the enhanced Monitor agent capabilities including: 5 * - Throughput-based pipeline bottleneck detection 6 * - Loop detection (site retry, agent bounce, log domain) 7 * - Blocked task triage (replaces auto-cancel) 8 * - Agent queue depth monitoring 9 */ 10 11 import { test, beforeEach, afterEach } from 'node:test'; 12 import assert from 'node:assert/strict'; 13 import Database from 'better-sqlite3'; 14 import { createPgMock } from '../helpers/pg-mock.js'; // eslint-disable-line no-unused-vars 15 16 function setupTestDatabase() { 17 const db = new Database(':memory:'); 18 19 db.exec(` 20 CREATE TABLE IF NOT EXISTS sites ( 21 id INTEGER PRIMARY KEY AUTOINCREMENT, 22 domain TEXT, 23 landing_page_url TEXT, 24 status TEXT DEFAULT 'found', 25 error_message TEXT, 26 score REAL, 27 grade TEXT, 28 score_json TEXT, 29 contacts_json TEXT, 30 country_code TEXT, 31 recapture_count INTEGER DEFAULT 0, 32 recapture_at TEXT, 33 updated_at TEXT DEFAULT (datetime('now')), 34 created_at TEXT DEFAULT (datetime('now')), 35 rescored_at DATETIME 36 ); 37 38 CREATE TABLE IF NOT EXISTS agent_tasks ( 39 id INTEGER PRIMARY KEY AUTOINCREMENT, 40 task_type TEXT NOT NULL, 41 assigned_to TEXT NOT NULL, 42 created_by TEXT, 43 priority INTEGER DEFAULT 5, 44 status TEXT DEFAULT 'pending', 45 context_json TEXT, 46 parent_task_id INTEGER, 47 result_json TEXT, 48 error_message TEXT, 49 retry_count INTEGER DEFAULT 0, 50 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 51 started_at TIMESTAMP, 52 completed_at TIMESTAMP, 53 FOREIGN KEY (parent_task_id) REFERENCES agent_tasks(id) 54 ); 55 56 CREATE INDEX IF NOT EXISTS idx_agent_tasks_assigned_status 57 ON agent_tasks(assigned_to, status, priority DESC, created_at ASC); 58 59 CREATE TABLE IF NOT EXISTS pipeline_metrics ( 60 id INTEGER PRIMARY KEY AUTOINCREMENT, 61 stage_name TEXT NOT NULL, 62 sites_processed INTEGER DEFAULT 0, 63 duration_ms INTEGER DEFAULT 0, 64 started_at TEXT DEFAULT (datetime('now')), 65 finished_at TEXT DEFAULT (datetime('now')) 66 ); 67 68 CREATE TABLE IF NOT EXISTS agent_state ( 69 agent_name TEXT PRIMARY KEY, 70 status TEXT DEFAULT 'idle', 71 metrics_json TEXT, 72 last_active TEXT DEFAULT (datetime('now')) 73 ); 74 75 CREATE TABLE IF NOT EXISTS agent_logs ( 76 id INTEGER PRIMARY KEY AUTOINCREMENT, 77 task_id INTEGER, 78 agent_name TEXT NOT NULL, 79 log_level TEXT NOT NULL, 80 message TEXT NOT NULL, 81 context_json TEXT, 82 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP 83 ); 84 85 CREATE TABLE IF NOT EXISTS settings ( 86 key TEXT PRIMARY KEY, 87 value TEXT, 88 description TEXT, 89 updated_at TEXT DEFAULT (datetime('now')) 90 ); 91 92 CREATE TABLE IF NOT EXISTS human_review_queue ( 93 id INTEGER PRIMARY KEY AUTOINCREMENT, 94 file TEXT, reason TEXT, type TEXT, 95 priority TEXT DEFAULT 'medium', 96 status TEXT DEFAULT 'pending', 97 created_at TEXT DEFAULT (datetime('now')) 98 ); 99 100 CREATE TABLE IF NOT EXISTS agent_messages ( 101 id INTEGER PRIMARY KEY AUTOINCREMENT, 102 from_agent TEXT NOT NULL, 103 to_agent TEXT NOT NULL, 104 message_type TEXT NOT NULL, 105 content TEXT, 106 context_json TEXT, 107 read_at TEXT, 108 created_at TEXT DEFAULT (datetime('now')) 109 ); 110 111 CREATE TABLE IF NOT EXISTS agent_outcomes ( 112 id INTEGER PRIMARY KEY AUTOINCREMENT, 113 task_id INTEGER, 114 agent_name TEXT NOT NULL, 115 outcome TEXT NOT NULL, 116 context_json TEXT, 117 created_at TEXT DEFAULT (datetime('now')) 118 ); 119 120 CREATE TABLE IF NOT EXISTS cron_locks (lock_key TEXT PRIMARY KEY, acquired_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, description TEXT); 121 122 -- Insert monitor agent state 123 INSERT OR IGNORE INTO agent_state (agent_name, status) VALUES ('monitor', 'idle'); 124 `); 125 126 return db; 127 } 128 129 test('Monitor Agent - Pipeline Health (throughput-based)', async t => { 130 let db; 131 132 beforeEach(() => { 133 db = setupTestDatabase(); 134 }); 135 136 afterEach(() => { 137 if (db?.open) db.close(); 138 }); 139 140 await t.test('detects stalled stage with queue and no recent pipeline_metrics', () => { 141 // Create sites waiting at assets_captured (input to scoring) 142 for (let i = 0; i < 50; i++) { 143 db.prepare(`INSERT INTO sites (domain, status) VALUES (?, ?)`).run( 144 `site${i}.com`, 145 'assets_captured' 146 ); 147 } 148 149 // Add a stale pipeline_metrics entry (>30 min ago) 150 db.prepare( 151 `INSERT INTO pipeline_metrics (stage_name, sites_processed, duration_ms, started_at, finished_at) 152 VALUES (?, ?, ?, datetime('now', '-45 minutes'), datetime('now', '-40 minutes'))` 153 ).run('scoring', 10, 5000); 154 155 // Query same logic as monitor's checkPipelineHealth 156 const queueCount = db 157 .prepare('SELECT COUNT(*) as count FROM sites WHERE status = ?') 158 .get('assets_captured').count; 159 160 const lastRun = db 161 .prepare( 162 `SELECT sites_processed, duration_ms, finished_at 163 FROM pipeline_metrics WHERE stage_name = ? 164 ORDER BY started_at DESC LIMIT 1` 165 ) 166 .get('scoring'); 167 168 assert.equal(queueCount, 50); 169 assert.ok(lastRun, 'Should have pipeline metrics'); 170 171 const minutesSinceRun = (Date.now() - new Date(lastRun.finished_at).getTime()) / 60000; 172 assert.ok(minutesSinceRun > 30, 'Should detect stage as stalled (>30min)'); 173 }); 174 175 await t.test('does not alert for stages with empty queues', () => { 176 // No sites at assets_captured 177 const queueCount = db 178 .prepare('SELECT COUNT(*) as count FROM sites WHERE status = ?') 179 .get('assets_captured').count; 180 181 assert.equal(queueCount, 0, 'Empty queue should not trigger alert'); 182 }); 183 184 await t.test('detects slow stage (15-30 min)', () => { 185 // Create waiting sites 186 db.prepare('INSERT INTO sites (domain, status) VALUES (?, ?)').run('test.com', 'found'); 187 188 // Use explicit ISO timestamp 20 minutes ago (avoids SQLite UTC vs JS Date issues) 189 const twentyMinAgo = new Date(Date.now() - 20 * 60 * 1000).toISOString(); 190 const twentyFiveMinAgo = new Date(Date.now() - 25 * 60 * 1000).toISOString(); 191 192 db.prepare( 193 `INSERT INTO pipeline_metrics (stage_name, sites_processed, duration_ms, started_at, finished_at) 194 VALUES (?, ?, ?, ?, ?)` 195 ).run('assets', 5, 3000, twentyFiveMinAgo, twentyMinAgo); 196 197 const lastRun = db 198 .prepare( 199 `SELECT finished_at FROM pipeline_metrics WHERE stage_name = ? 200 ORDER BY started_at DESC LIMIT 1` 201 ) 202 .get('assets'); 203 204 const minutesSinceRun = (Date.now() - new Date(lastRun.finished_at).getTime()) / 60000; 205 assert.ok( 206 minutesSinceRun > 15, 207 `Should detect as slow (>15min), got ${minutesSinceRun.toFixed(1)}min` 208 ); 209 assert.ok( 210 minutesSinceRun < 30, 211 `Should not be stalled yet (<30min), got ${minutesSinceRun.toFixed(1)}min` 212 ); 213 }); 214 }); 215 216 test('Monitor Agent - Loop Detection', async t => { 217 let db; 218 219 beforeEach(() => { 220 db = setupTestDatabase(); 221 }); 222 223 afterEach(() => { 224 if (db?.open) db.close(); 225 }); 226 227 await t.test('detects site retry loops (recapture_count > 3)', () => { 228 db.prepare(`INSERT INTO sites (domain, status, recapture_count) VALUES (?, ?, ?)`).run( 229 'looping-site.com', 230 'failing', 231 5 232 ); 233 234 db.prepare(`INSERT INTO sites (domain, status, recapture_count) VALUES (?, ?, ?)`).run( 235 'normal-site.com', 236 'prog_scored', 237 1 238 ); 239 240 const loops = db 241 .prepare( 242 `SELECT id, domain, status, recapture_count 243 FROM sites WHERE recapture_count > 3 244 ORDER BY recapture_count DESC` 245 ) 246 .all(); 247 248 assert.equal(loops.length, 1); 249 assert.equal(loops[0].domain, 'looping-site.com'); 250 assert.equal(loops[0].recapture_count, 5); 251 }); 252 253 await t.test('detects agent task bounce loops (>3 children in 24hr)', () => { 254 // Create a parent task 255 const parentId = db 256 .prepare( 257 `INSERT INTO agent_tasks (task_type, assigned_to, status, created_at) 258 VALUES (?, ?, ?, datetime('now', '-2 hours'))` 259 ) 260 .run('classify_error', 'triage', 'completed').lastInsertRowid; 261 262 // Create 4 child tasks (bounce loop) 263 for (let i = 0; i < 4; i++) { 264 db.prepare( 265 `INSERT INTO agent_tasks (task_type, assigned_to, parent_task_id, created_at) 266 VALUES (?, ?, ?, datetime('now', '-${i} hours'))` 267 ).run('fix_bug', i % 2 === 0 ? 'developer' : 'qa', parentId); 268 } 269 270 const bounceLoops = db 271 .prepare( 272 `SELECT parent_task_id, COUNT(*) as bounce_count, 273 GROUP_CONCAT(DISTINCT assigned_to) as agents 274 FROM agent_tasks 275 WHERE parent_task_id IS NOT NULL 276 AND created_at > datetime('now', '-24 hours') 277 GROUP BY parent_task_id 278 HAVING bounce_count > 3` 279 ) 280 .all(); 281 282 assert.equal(bounceLoops.length, 1); 283 assert.equal(bounceLoops[0].parent_task_id, parentId); 284 assert.equal(bounceLoops[0].bounce_count, 4); 285 assert.ok(bounceLoops[0].agents.includes('developer')); 286 assert.ok(bounceLoops[0].agents.includes('qa')); 287 }); 288 }); 289 290 test('Monitor Agent - Blocked Task Triage', async t => { 291 let db; 292 293 beforeEach(() => { 294 db = setupTestDatabase(); 295 }); 296 297 afterEach(() => { 298 if (db?.open) db.close(); 299 }); 300 301 await t.test('finds blocked tasks older than 2 hours', () => { 302 // Create a task blocked for 3 hours 303 db.prepare( 304 `INSERT INTO agent_tasks (task_type, assigned_to, status, error_message, created_at) 305 VALUES (?, ?, ?, ?, datetime('now', '-3 hours'))` 306 ).run('fix_bug', 'developer', 'blocked', 'Cannot reproduce'); 307 308 // Create a recently blocked task (should be ignored) 309 db.prepare( 310 `INSERT INTO agent_tasks (task_type, assigned_to, status, error_message, created_at) 311 VALUES (?, ?, ?, ?, datetime('now', '-30 minutes'))` 312 ).run('fix_bug', 'developer', 'blocked', 'Working on it'); 313 314 const blockedTasks = db 315 .prepare( 316 `SELECT id, task_type, assigned_to, error_message, 317 CAST((julianday('now') - julianday(created_at)) * 24 AS REAL) as hours_blocked 318 FROM agent_tasks 319 WHERE status = 'blocked' 320 AND created_at < datetime('now', '-2 hours') 321 ORDER BY created_at ASC` 322 ) 323 .all(); 324 325 assert.equal(blockedTasks.length, 1, 'Should only find the 3hr blocked task'); 326 assert.ok(blockedTasks[0].hours_blocked > 2, 'Should be blocked >2 hours'); 327 }); 328 329 await t.test('does not create duplicate triage tasks', () => { 330 // Create a blocked task 331 const blockedId = db 332 .prepare( 333 `INSERT INTO agent_tasks (task_type, assigned_to, status, created_at) 334 VALUES (?, ?, ?, datetime('now', '-3 hours'))` 335 ) 336 .run('fix_bug', 'developer', 'blocked').lastInsertRowid; 337 338 // Create an existing triage task for it 339 db.prepare( 340 `INSERT INTO agent_tasks (task_type, assigned_to, parent_task_id, status) 341 VALUES (?, ?, ?, ?)` 342 ).run('classify_error', 'triage', blockedId, 'pending'); 343 344 const existingTriage = db 345 .prepare( 346 `SELECT id FROM agent_tasks 347 WHERE parent_task_id = ? AND task_type = 'classify_error' 348 AND status NOT IN ('completed', 'failed') 349 LIMIT 1` 350 ) 351 .get(blockedId); 352 353 assert.ok(existingTriage, 'Should find existing triage task'); 354 assert.ok(existingTriage.id > 0, 'Should have valid ID'); 355 }); 356 }); 357 358 test('Monitor Agent - Agent Queue Depth', async t => { 359 let db; 360 361 beforeEach(() => { 362 db = setupTestDatabase(); 363 }); 364 365 afterEach(() => { 366 if (db?.open) db.close(); 367 }); 368 369 await t.test('counts pending tasks per agent', () => { 370 // Add pending tasks for developer 371 for (let i = 0; i < 25; i++) { 372 db.prepare( 373 `INSERT INTO agent_tasks (task_type, assigned_to, status) 374 VALUES (?, ?, ?)` 375 ).run('fix_bug', 'developer', 'pending'); 376 } 377 378 // Add pending tasks for qa 379 for (let i = 0; i < 5; i++) { 380 db.prepare( 381 `INSERT INTO agent_tasks (task_type, assigned_to, status) 382 VALUES (?, ?, ?)` 383 ).run('run_tests', 'qa', 'pending'); 384 } 385 386 const queueDepths = db 387 .prepare( 388 `SELECT assigned_to, COUNT(*) as pending_count 389 FROM agent_tasks WHERE status = 'pending' 390 GROUP BY assigned_to` 391 ) 392 .all(); 393 394 const devQueue = queueDepths.find(q => q.assigned_to === 'developer'); 395 const qaQueue = queueDepths.find(q => q.assigned_to === 'qa'); 396 397 assert.equal(devQueue.pending_count, 25, 'Developer should have 25 pending'); 398 assert.equal(qaQueue.pending_count, 5, 'QA should have 5 pending'); 399 assert.ok(devQueue.pending_count > 20, 'Developer queue should trigger warning'); 400 assert.ok(qaQueue.pending_count <= 20, 'QA queue should be ok'); 401 }); 402 }); 403 404 test('Monitor Agent - Tiered Scheduling via Cron', async t => { 405 let db; 406 407 beforeEach(() => { 408 db = setupTestDatabase(); 409 410 // Create cron_jobs table 411 db.exec(` 412 CREATE TABLE IF NOT EXISTS cron_jobs ( 413 id INTEGER PRIMARY KEY AUTOINCREMENT, 414 name TEXT NOT NULL, 415 task_key TEXT NOT NULL UNIQUE, 416 description TEXT, 417 handler_type TEXT NOT NULL, 418 handler_value TEXT NOT NULL, 419 interval_value INTEGER NOT NULL, 420 interval_unit TEXT NOT NULL, 421 enabled INTEGER DEFAULT 1, 422 last_run_at TEXT, 423 created_at TEXT DEFAULT (datetime('now')) 424 ); 425 `); 426 }); 427 428 afterEach(() => { 429 if (db?.open) db.close(); 430 }); 431 432 await t.test('migration creates 3 tiered cron jobs', () => { 433 // Simulate the migration 434 db.exec(` 435 INSERT OR REPLACE INTO cron_jobs 436 (name, task_key, description, handler_type, handler_value, 437 interval_value, interval_unit, enabled) 438 VALUES 439 ('Process Guardian', 'processGuardian', 440 'Service restarts, circuit breaker, clearance cycle detection', 441 'function', 'processGuardian', 1, 'minutes', 1); 442 443 INSERT OR REPLACE INTO cron_jobs 444 (name, task_key, description, handler_type, handler_value, 445 interval_value, interval_unit, enabled) 446 VALUES 447 ('Pipeline Monitor', 'monitorPipeline', 448 'Bottlenecks, blocked tasks, loops, log scanning, compliance', 449 'function', 'monitorPipeline', 5, 'minutes', 1); 450 451 INSERT OR REPLACE INTO cron_jobs 452 (name, task_key, description, handler_type, handler_value, 453 interval_value, interval_unit, enabled) 454 VALUES 455 ('System Health', 'monitorSystem', 456 'Agent health, queue depth, throughput, anomalies, SLO compliance', 457 'function', 'monitorSystem', 30, 'minutes', 1); 458 `); 459 460 const jobs = db 461 .prepare( 462 'SELECT name, task_key, interval_value, interval_unit FROM cron_jobs ORDER BY interval_value' 463 ) 464 .all(); 465 466 assert.equal(jobs.length, 3, 'Should have 3 tiered cron jobs'); 467 468 assert.equal(jobs[0].task_key, 'processGuardian'); 469 assert.equal(jobs[0].interval_value, 1); 470 assert.equal(jobs[0].interval_unit, 'minutes'); 471 472 assert.equal(jobs[1].task_key, 'monitorPipeline'); 473 assert.equal(jobs[1].interval_value, 5); 474 assert.equal(jobs[1].interval_unit, 'minutes'); 475 476 assert.equal(jobs[2].task_key, 'monitorSystem'); 477 assert.equal(jobs[2].interval_value, 30); 478 assert.equal(jobs[2].interval_unit, 'minutes'); 479 }); 480 });