monitor.js
1 /** 2 * Monitor Agent 3 * 4 * The system's "immune system" - proactively scans logs, detects anomalies, 5 * monitors process compliance, and tracks agent health. 6 */ 7 8 import { BaseAgent } from './base-agent.js'; 9 import { execSync } from 'child_process'; 10 import fs from 'fs/promises'; 11 import { join } from 'path'; 12 import { run, getOne, getAll } from '../utils/db.js'; 13 import { addReviewItem } from '../utils/human-review-queue.js'; 14 import { checkSLOCompliance, getSLOSummary } from './utils/slo-tracker.js'; 15 16 export class MonitorAgent extends BaseAgent { 17 constructor() { 18 super('monitor', ['base.md', 'monitor.md']); 19 this.lastReadPositions = {}; // Track incremental log reads 20 this.loadFilePositions(); // Load persisted positions from database (fire-and-forget) 21 } 22 23 /** 24 * Load file read positions from settings table 25 * 26 * @returns {Promise<void>} 27 */ 28 async loadFilePositions() { 29 try { 30 const row = await getOne( 31 'SELECT value FROM ops.settings WHERE key = $1', 32 ['monitor_file_positions'] 33 ); 34 35 if (row) { 36 this.lastReadPositions = JSON.parse(row.value); 37 } 38 } catch (error) { 39 // If settings don't exist or JSON parse fails, start fresh 40 this.lastReadPositions = {}; 41 } 42 } 43 44 /** 45 * Save file read positions to settings table 46 * 47 * @returns {Promise<void>} 48 */ 49 async saveFilePositions() { 50 try { 51 await run( 52 `INSERT INTO ops.settings (key, value, description, updated_at) 53 VALUES ($1, $2, $3, NOW()) 54 ON CONFLICT(key) DO UPDATE SET 55 value = EXCLUDED.value, 56 updated_at = EXCLUDED.updated_at`, 57 [ 58 'monitor_file_positions', 59 JSON.stringify(this.lastReadPositions), 60 'Monitor agent file read positions for incremental log scanning', 61 ] 62 ); 63 } catch (error) { 64 // Log error but don't fail - worst case we re-read logs 65 console.error('Failed to save file positions:', error.message); 66 } 67 } 68 69 /** 70 * Process a monitor task 71 * 72 * @param {Object} task - Task object 73 * @returns {Promise<void>} 74 */ 75 async processTask(task) { 76 // Self-heal: reset orphaned 'running' tasks of monitor types that got stuck after pipeline restart. 77 // Any monitor task stuck in 'running' for >10 min was orphaned (agent was killed mid-execution). 78 // Reset to 'pending' so the dedup check doesn't block future runs of the same task type. 79 try { 80 const monitorTaskTypes = [ 81 'check_pipeline_health', 82 'check_loops', 83 'check_agent_health', 84 'scan_logs', 85 'check_slo_compliance', 86 'check_blocked_tasks', 87 'check_process_compliance', 88 'check_rate_limits', 89 'detect_anomaly', 90 ]; 91 const placeholders = monitorTaskTypes.map((_, i) => `$${i + 1}`).join(','); 92 await run( 93 `UPDATE tel.agent_tasks SET status='pending' 94 WHERE task_type IN (${placeholders}) 95 AND status='running' 96 AND id != $${monitorTaskTypes.length + 1} 97 AND created_at < NOW() - INTERVAL '10 minutes'`, 98 [...monitorTaskTypes, task.id] 99 ); 100 } catch { 101 // Non-fatal — cleanup best effort 102 } 103 104 try { 105 // Parse context_json if needed 106 const context = 107 task.context_json && typeof task.context_json === 'string' 108 ? JSON.parse(task.context_json) 109 : task.context_json || {}; 110 111 // Ensure context is attached to task for handlers 112 task.context_json = context; 113 114 switch (task.task_type) { 115 case 'scan_logs': 116 await this.scanLogs(task); 117 break; 118 119 case 'check_agent_health': 120 await this.checkAgentHealth(task); 121 break; 122 123 case 'check_process_compliance': 124 await this.checkProcessCompliance(task); 125 break; 126 127 case 'detect_anomaly': 128 await this.detectAnomaly(task); 129 break; 130 131 case 'check_pipeline_health': 132 await this.checkPipelineHealth(task); 133 break; 134 135 case 'check_slo_compliance': 136 await this.checkSLOCompliance(task); 137 break; 138 139 case 'check_loops': 140 await this.checkLoops(task); 141 break; 142 143 case 'check_blocked_tasks': 144 await this.checkBlockedTasks(task); 145 break; 146 147 case 'check_rate_limits': 148 await this.checkRateLimitPatterns(task); 149 break; 150 151 // Task types assigned to wrong agent - delegate 152 case 'implement_feature': 153 case 'fix_bug': 154 case 'bootstrap_monitor': 155 await this.delegateToCorrectAgent(task); 156 break; 157 158 default: 159 // Unknown task types - delegate to correct agent via task routing 160 await this.log('warn', 'Unknown task type received, delegating', { 161 task_id: task.id, 162 task_type: task.task_type, 163 }); 164 await this.delegateToCorrectAgent(task); 165 } 166 } catch (error) { 167 await this.log( 168 'error', 169 `${this.agentName.charAt(0).toUpperCase() + this.agentName.slice(1)} task ${task.id} failed: ${error.message}`, 170 { 171 task_id: task.id, 172 task_type: task.task_type, 173 error: error.message, 174 stack: error.stack, 175 } 176 ); 177 throw error; // Re-throw so task manager can handle 178 } 179 } 180 /** 181 * Scan log files for errors and anomalies 182 * 183 * @param {Object} task - Task object 184 * @returns {Promise<void>} 185 */ 186 async scanLogs(task) { 187 const { days = 1, skip_retried = false } = task.context_json || {}; 188 189 await this.log('info', 'Starting log scan', { 190 task_id: task.id, 191 days, 192 }); 193 194 const logDir = process.env.LOGS_DIR || './logs/'; 195 const today = new Date().toISOString().slice(0, 10); 196 const logFiles = ['pipeline', 'outreach', 'inbound', 'cron', 'utils', 'agents']; 197 198 let totalErrors = 0; 199 let loopsDetected = 0; 200 201 for (const domain of logFiles) { 202 const filePath = `${logDir}${domain}-${today}.log`; 203 204 try { 205 // Read incrementally to avoid re-processing 206 const errors = await this.readIncrementally(filePath, /\[ERROR\]|\[FATAL\]/); 207 208 if (errors.length > 0) { 209 totalErrors += errors.length; 210 211 // Group by error message 212 const grouped = this.groupByMessage(errors); 213 214 // Detect looping (same error >3x in 1 hour) 215 for (const [message, occurrences] of Object.entries(grouped)) { 216 if (occurrences.length > 3 && this.withinOneHour(occurrences)) { 217 loopsDetected++; 218 219 await this.log('warn', 'Error loop detected', { 220 task_id: task.id, 221 message: message.substring(0, 200), 222 count: occurrences.length, 223 log_file: filePath, 224 }); 225 226 // Create triage task 227 await this.createTask({ 228 task_type: 'classify_error', 229 assigned_to: 'triage', 230 priority: 8, 231 context: { 232 error_type: 'looping_error', 233 error_message: message, 234 count: occurrences.length, 235 log_file: filePath, 236 frequency: occurrences.length, 237 }, 238 }); 239 } 240 } 241 } 242 } catch (error) { 243 if (error.code !== 'ENOENT') { 244 await this.log('error', 'Log scan failed', { 245 task_id: task.id, 246 file: filePath, 247 error: error.message, 248 }); 249 } 250 } 251 } 252 253 // Scan for API-specific error rates across all log files 254 const apiErrorResults = await this.scanApiErrorRates(task); 255 256 // Ensure recurring tasks are scheduled 257 await this.ensureRecurringTasks(); 258 259 await this.log('info', 'Log scan complete', { 260 task_id: task.id, 261 total_errors: totalErrors, 262 loops_detected: loopsDetected, 263 api_error_alerts: apiErrorResults.alertsCreated, 264 }); 265 266 await this.completeTask(task.id, { 267 total_errors: totalErrors, 268 loops_detected: loopsDetected, 269 api_error_rates: apiErrorResults.rates, 270 api_error_alerts: apiErrorResults.alertsCreated, 271 }); 272 } 273 274 /** 275 * Scan log files for API-specific error rates. 276 * Classifies API from log content (HTTP status codes, API names). 277 * Creates classify_error tasks when any API exceeds 10% error rate. 278 * 279 * @param {Object} task - Parent task for logging 280 * @returns {Promise<{rates: Object, alertsCreated: number}>} 281 */ 282 async scanApiErrorRates(task) { 283 const logDir = process.env.LOGS_DIR || './logs/'; 284 const today = new Date().toISOString().slice(0, 10); 285 286 // API classification patterns: map log content patterns to API names 287 const apiPatterns = [ 288 { 289 api: 'openrouter', 290 patterns: [/openrouter/i, /callLLM/i, /LLM.*error/i, /model.*response/i], 291 }, 292 { api: 'zenrows', patterns: [/zenrows/i, /SERP/i, /scrape/i] }, 293 { api: 'twilio', patterns: [/twilio/i, /SMS.*send/i, /sms.*deliver/i] }, 294 { api: 'resend', patterns: [/resend/i, /email.*send/i, /email.*deliver/i] }, 295 { 296 api: 'playwright', 297 patterns: [/playwright/i, /browser.*error/i, /chromium/i, /page.*crash/i], 298 }, 299 ]; 300 301 // HTTP error patterns 302 const httpErrorPattern = /\b(4\d{2}|5\d{2})\b/; 303 const errorIndicators = /\[ERROR\]|\[FATAL\]|error|failed|timeout|ECONNRESET|ECONNREFUSED/i; 304 const successIndicators = 305 /\[INFO\].*(?:success|completed|processed|sent|delivered|scored|enriched)/i; 306 307 // Count successes and errors per API 308 const apiCounts = {}; 309 const apiSampleErrors = {}; 310 311 const logFiles = ['pipeline', 'outreach', 'inbound', 'cron', 'utils', 'agents']; 312 313 for (const domain of logFiles) { 314 const filePath = `${logDir}${domain}-${today}.log`; 315 316 try { 317 const lines = await this.readIncrementalForApiScan(filePath); 318 for (const line of lines) { 319 // Determine which API this line relates to 320 let matchedApi = null; 321 for (const { api, patterns } of apiPatterns) { 322 if (patterns.some(p => p.test(line))) { 323 matchedApi = api; 324 break; 325 } 326 } 327 if (!matchedApi) continue; 328 329 if (!apiCounts[matchedApi]) apiCounts[matchedApi] = { success: 0, error: 0 }; 330 331 const isError = errorIndicators.test(line) || httpErrorPattern.test(line); 332 const isSuccess = successIndicators.test(line); 333 334 if (isError && !isSuccess) { 335 apiCounts[matchedApi].error++; 336 // Keep up to 5 sample errors per API 337 if (!apiSampleErrors[matchedApi]) apiSampleErrors[matchedApi] = []; 338 if (apiSampleErrors[matchedApi].length < 5) { 339 apiSampleErrors[matchedApi].push(line.substring(0, 200)); 340 } 341 } else if (isSuccess) { 342 apiCounts[matchedApi].success++; 343 } 344 } 345 } catch (error) { 346 if (error.code !== 'ENOENT') { 347 await this.log('error', 'API error rate scan failed for file', { 348 task_id: task.id, 349 file: filePath, 350 error: error.message, 351 }); 352 } 353 } 354 } 355 356 // Calculate rates and alert on >10% 357 const rates = {}; 358 let alertsCreated = 0; 359 const ERROR_RATE_THRESHOLD = 0.1; 360 361 for (const [api, counts] of Object.entries(apiCounts)) { 362 const total = counts.success + counts.error; 363 if (total === 0) continue; 364 const errorRate = counts.error / total; 365 rates[api] = { 366 total, 367 errors: counts.error, 368 successes: counts.success, 369 errorRate: `${Math.round(errorRate * 100)}%`, 370 }; 371 372 if (errorRate > ERROR_RATE_THRESHOLD && counts.error >= 3) { 373 // Check if we already have an active alert for this API 374 const existingAlert = await getOne( 375 `SELECT id FROM tel.agent_tasks 376 WHERE task_type = 'classify_error' 377 AND context_json::text LIKE $1 378 AND status NOT IN ('completed', 'failed') 379 LIMIT 1`, 380 [`%api_error_rate%${api}%`] 381 ); 382 383 if (!existingAlert) { 384 await this.createTask({ 385 task_type: 'classify_error', 386 assigned_to: 'triage', 387 priority: 9, 388 context: { 389 error_type: 'api_error_rate', 390 api, 391 error_rate: rates[api].errorRate, 392 total_requests: total, 393 error_count: counts.error, 394 sample_errors: apiSampleErrors[api] || [], 395 }, 396 }); 397 alertsCreated++; 398 399 await this.log('warn', 'High API error rate detected', { 400 task_id: task.id, 401 api, 402 error_rate: rates[api].errorRate, 403 total, 404 errors: counts.error, 405 }); 406 } 407 } 408 } 409 410 return { rates, alertsCreated }; 411 } 412 413 /** 414 * Read log file for API error rate scanning. 415 * Uses the same incremental position tracking as readIncrementally, 416 * but with a separate position namespace to avoid conflicts. 417 * 418 * @param {string} filePath - Log file path 419 * @returns {Promise<string[]>} All lines from new content 420 */ 421 async readIncrementalForApiScan(filePath) { 422 const posKey = `api_scan:${filePath}`; 423 try { 424 const stats = await fs.stat(filePath); 425 let lastPos = this.lastReadPositions[posKey] || 0; 426 427 if (stats.size < lastPos) { 428 lastPos = 0; 429 this.lastReadPositions[posKey] = 0; 430 } 431 432 if (stats.size === lastPos) return []; 433 434 const fd = await fs.open(filePath, 'r'); 435 const buffer = Buffer.alloc(stats.size - lastPos); 436 await fd.read(buffer, 0, buffer.length, lastPos); 437 await fd.close(); 438 439 this.lastReadPositions[posKey] = stats.size; 440 await this.saveFilePositions(); 441 442 return buffer.toString('utf8').split('\n').filter(Boolean); 443 } catch (error) { 444 if (error.code === 'ENOENT') return []; 445 throw error; 446 } 447 } 448 449 /** 450 * Read log file incrementally 451 * 452 * @param {string} filePath - Log file path 453 * @param {RegExp} pattern - Pattern to match 454 * @returns {Promise<string[]>} - Matching lines 455 */ 456 async readIncrementally(filePath, pattern) { 457 try { 458 const stats = await fs.stat(filePath); 459 let lastPos = this.lastReadPositions[filePath] || 0; 460 461 // Detect log cycling: if current file size is smaller than last position, 462 // the log file was rotated and we should start from the beginning 463 if (stats.size < lastPos) { 464 await this.log('info', 'Log file cycling detected', { 465 file: filePath, 466 old_size: lastPos, 467 new_size: stats.size, 468 action: 'reset_position', 469 }); 470 lastPos = 0; 471 this.lastReadPositions[filePath] = 0; 472 } 473 474 // Only read new content 475 const fd = await fs.open(filePath, 'r'); 476 const buffer = Buffer.alloc(stats.size - lastPos); 477 await fd.read(buffer, 0, buffer.length, lastPos); 478 await fd.close(); 479 480 const newContent = buffer.toString('utf8'); 481 this.lastReadPositions[filePath] = stats.size; 482 483 // Persist positions to database after each read 484 await this.saveFilePositions(); 485 486 // Find matches 487 const matches = []; 488 const lines = newContent.split('\n'); 489 490 for (const line of lines) { 491 if (pattern.test(line) && !this.isTestError(line)) { 492 matches.push(line); 493 } 494 } 495 496 return matches; 497 } catch (error) { 498 if (error.code === 'ENOENT') { 499 // File doesn't exist yet 500 return []; 501 } 502 throw error; 503 } 504 } 505 506 /** 507 * Check if a log line originated from test code (suppress from prod monitoring) 508 * Filters: test file paths, test fixture domains, placeholder values 509 */ 510 isTestError(line) { 511 return ( 512 /\.test\.js|tests\/|test-fixtures|__mocks__|node_modules\/.*mocha|node_modules\/.*jest/i.test( 513 line 514 ) || 515 /example\.com|failing\.com|test\.com|site 99999|outreach #\d+|\+1234567890|\+15005550/i.test( 516 line 517 ) 518 ); 519 } 520 521 /** 522 * Group error lines by message 523 * 524 * @param {string[]} errors - Error lines 525 * @returns {Object} - Grouped by message 526 */ 527 groupByMessage(errors) { 528 const groups = {}; 529 530 for (const line of errors) { 531 // Extract error message (after timestamp and level) 532 const match = line.match(/\[ERROR\]\s+(.+)/); 533 if (match) { 534 const message = match[1]; 535 if (!groups[message]) groups[message] = []; 536 groups[message].push(line); 537 } 538 } 539 540 return groups; 541 } 542 543 /** 544 * Check if occurrences are within one hour 545 * 546 * @param {string[]} occurrences - Error lines with timestamps 547 * @returns {boolean} - True if within 1 hour 548 */ 549 withinOneHour(occurrences) { 550 if (occurrences.length < 2) return false; 551 552 // Parse timestamps 553 const timestamps = occurrences 554 .map(line => { 555 const match = line.match(/\[([^\]]+)\]/); 556 return match ? new Date(match[1]) : null; 557 }) 558 .filter(Boolean); 559 560 if (timestamps.length < 2) return false; 561 562 const oldest = Math.min(...timestamps.map(d => d.getTime())); 563 const newest = Math.max(...timestamps.map(d => d.getTime())); 564 565 return newest - oldest <= 60 * 60 * 1000; // 1 hour 566 } 567 568 /** 569 * Check agent health and success/failure rates 570 * 571 * @param {Object} task - Task object 572 * @returns {Promise<void>} 573 */ 574 async checkAgentHealth(task) { 575 await this.log('info', 'Checking agent health', { 576 task_id: task.id, 577 }); 578 579 const stats = await getAll( 580 `SELECT assigned_to, 581 COUNT(*) as total, 582 SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed, 583 SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed 584 FROM tel.agent_tasks 585 WHERE created_at > NOW() - INTERVAL '24 hours' 586 GROUP BY assigned_to` 587 ); 588 589 const unhealthyAgents = []; 590 591 for (const agent of stats) { 592 const failureRate = agent.total > 0 ? agent.failed / agent.total : 0; 593 const successRate = agent.total > 0 ? agent.completed / agent.total : 0; 594 595 // Circuit breaker: >30% failure rate 596 if (failureRate > 0.3) { 597 await this.log('error', 'Agent failure rate exceeded threshold', { 598 task_id: task.id, 599 agent: agent.assigned_to, 600 failure_rate: failureRate, 601 total: agent.total, 602 failed: agent.failed, 603 }); 604 605 // Disable agent 606 await run( 607 `UPDATE tel.agent_state 608 SET status = 'blocked', metrics_json = $1 609 WHERE agent_name = $2`, 610 [ 611 JSON.stringify({ 612 failure_rate: failureRate, 613 threshold: 0.3, 614 disabled_at: new Date().toISOString(), 615 }), 616 agent.assigned_to, 617 ] 618 ); 619 620 // Escalate to human 621 await addReviewItem({ 622 file: 'Agent System', 623 reason: `Agent ${agent.assigned_to} failure rate ${(failureRate * 100).toFixed(0)}% exceeds 30% threshold`, 624 type: 'critical', 625 priority: 'critical', 626 }); 627 628 unhealthyAgents.push(agent.assigned_to); 629 } 630 // Warning threshold: 15-30% failure rate 631 else if (failureRate > 0.15) { 632 await this.log('warn', 'Agent failure rate elevated', { 633 task_id: task.id, 634 agent: agent.assigned_to, 635 failure_rate: failureRate, 636 }); 637 } 638 639 // Update metrics 640 await run( 641 `UPDATE tel.agent_state 642 SET metrics_json = $1, last_active = NOW() 643 WHERE agent_name = $2`, 644 [ 645 JSON.stringify({ 646 success_rate: successRate, 647 failure_rate: failureRate, 648 total_tasks_24h: agent.total, 649 last_health_check: new Date().toISOString(), 650 }), 651 agent.assigned_to, 652 ] 653 ); 654 } 655 656 // Check agent queue depth (pending task count per agent) 657 const queueDepths = await getAll( 658 `SELECT assigned_to, COUNT(*) as pending_count 659 FROM tel.agent_tasks 660 WHERE status = 'pending' 661 GROUP BY assigned_to` 662 ); 663 664 const overloadedAgents = []; 665 for (const queue of queueDepths) { 666 if (queue.pending_count > 50) { 667 await this.log('warn', 'Agent queue critical', { 668 task_id: task.id, 669 agent: queue.assigned_to, 670 pending_count: queue.pending_count, 671 }); 672 overloadedAgents.push({ 673 agent: queue.assigned_to, 674 pending: queue.pending_count, 675 severity: 'critical', 676 }); 677 } else if (queue.pending_count > 20) { 678 await this.log('warn', 'Agent queue elevated', { 679 task_id: task.id, 680 agent: queue.assigned_to, 681 pending_count: queue.pending_count, 682 }); 683 overloadedAgents.push({ 684 agent: queue.assigned_to, 685 pending: queue.pending_count, 686 severity: 'warning', 687 }); 688 } 689 } 690 691 await this.completeTask(task.id, { 692 agents_checked: stats.length, 693 unhealthy_agents: unhealthyAgents, 694 queue_depths: queueDepths, 695 overloaded_agents: overloadedAgents, 696 }); 697 } 698 699 /** 700 * Check process compliance (stage transitions) 701 * 702 * @param {Object} task - Task object 703 * @returns {Promise<void>} 704 */ 705 async checkProcessCompliance(task) { 706 await this.log('info', 'Checking process compliance', { 707 task_id: task.id, 708 }); 709 710 const EXPECTED_FLOW = [ 711 'found', 712 'assets_captured', 713 'prog_scored', 714 'semantic_scored', 715 'vision_scored', 716 'enriched', 717 'proposals_drafted', 718 'outreach_partial', 719 'outreach_sent', 720 ]; 721 722 // Find sites updated in last hour with invalid status 723 const recentSites = await getAll( 724 `SELECT id, domain, status, error_message, updated_at 725 FROM sites 726 WHERE updated_at > NOW() - INTERVAL '1 hour' 727 AND status NOT IN ('ignored', 'failing', 'high_score', 'found', 'assets_captured', 'prog_scored', 'semantic_scored', 'vision_scored', 'enriched', 'proposals_drafted', 'outreach_partial', 'outreach_sent') 728 LIMIT 10` 729 ); 730 731 for (const site of recentSites) { 732 await this.log('warn', 'Invalid site status detected', { 733 task_id: task.id, 734 site_id: site.id, 735 domain: site.domain, 736 status: site.status, 737 }); 738 739 // Create triage task 740 await this.createTask({ 741 task_type: 'classify_error', 742 assigned_to: 'triage', 743 priority: 7, 744 context: { 745 error_type: 'invalid_status', 746 error_message: `Site ${site.id} has invalid status: ${site.status}`, 747 site_id: site.id, 748 domain: site.domain, 749 }, 750 }); 751 } 752 753 // Check for stuck sites (no progress >24 hours) 754 const stuckSites = await getAll( 755 `SELECT id, domain, status, updated_at 756 FROM sites 757 WHERE updated_at < NOW() - INTERVAL '24 hours' 758 AND status NOT IN ('ignored', 'failing', 'high_score', 'outreach_sent') 759 LIMIT 10` 760 ); 761 762 if (stuckSites.length > 0) { 763 await this.log('warn', 'Found stuck sites', { 764 task_id: task.id, 765 count: stuckSites.length, 766 }); 767 768 // Add to human review queue 769 for (const site of stuckSites.slice(0, 5)) { 770 await addReviewItem({ 771 file: 'Database', 772 reason: `Site ${site.id} (${site.domain}) stuck in ${site.status} for >24 hours`, 773 type: 'process', 774 priority: 'medium', 775 }); 776 } 777 } 778 779 await this.completeTask(task.id, { 780 invalid_statuses: recentSites.length, 781 stuck_sites: stuckSites.length, 782 }); 783 } 784 785 /** 786 * Detect resource anomalies 787 * 788 * @param {Object} task - Task object 789 * @returns {Promise<void>} 790 */ 791 async detectAnomaly(task) { 792 await this.log('info', 'Detecting anomalies', { 793 task_id: task.id, 794 }); 795 796 const anomalies = []; 797 798 // Check log file sizes 799 const logDir = process.env.LOGS_DIR || './logs/'; 800 try { 801 const logFiles = await fs.readdir(logDir); 802 803 for (const file of logFiles) { 804 if (!file.endsWith('.log')) continue; 805 806 const stats = await fs.stat(`${logDir}${file}`); 807 const sizeMB = stats.size / 1024 / 1024; 808 809 if (sizeMB > 100) { 810 anomalies.push({ 811 type: 'large_log_file', 812 file, 813 size_mb: sizeMB, 814 severity: sizeMB > 500 ? 'critical' : 'warning', 815 }); 816 817 if (sizeMB > 500) { 818 await addReviewItem({ 819 file: `logs/${file}`, 820 reason: `Log file exceeds 500MB (${sizeMB.toFixed(0)}MB)`, 821 type: 'critical', 822 priority: 'high', 823 }); 824 } 825 } 826 } 827 } catch (error) { 828 await this.log('warn', 'Failed to check log files', { 829 error: error.message, 830 }); 831 } 832 833 // Check database size (PostgreSQL — check pg_database_size instead of file stat) 834 try { 835 const dbSizeResult = await getOne( 836 `SELECT pg_database_size(current_database()) as size_bytes` 837 ); 838 const dbSizeMB = dbSizeResult ? dbSizeResult.size_bytes / 1024 / 1024 : 0; 839 840 if (dbSizeMB > 1000) { 841 anomalies.push({ 842 type: 'large_database', 843 size_mb: dbSizeMB, 844 severity: dbSizeMB > 5000 ? 'critical' : 'warning', 845 }); 846 847 if (dbSizeMB > 5000) { 848 await addReviewItem({ 849 file: 'Database', 850 reason: `Database exceeds 5GB (${(dbSizeMB / 1024).toFixed(1)}GB) - consider VACUUM + archival`, 851 type: 'maintenance', 852 priority: 'high', 853 }); 854 } 855 } 856 } catch (error) { 857 await this.log('warn', 'Failed to check database size', { 858 error: error.message, 859 }); 860 } 861 862 // Check disk usage 863 try { 864 const df = execSync('df -h .', { encoding: 'utf8' }); 865 const match = df.match(/(\d+)%/); 866 867 if (match) { 868 const usage = parseInt(match[1]); 869 870 if (usage > 80) { 871 anomalies.push({ 872 type: 'disk_usage', 873 usage_percent: usage, 874 severity: usage > 90 ? 'critical' : 'warning', 875 }); 876 877 if (usage > 90) { 878 await addReviewItem({ 879 file: 'System', 880 reason: `Disk usage ${usage}% - cleanup required`, 881 type: 'critical', 882 priority: 'critical', 883 }); 884 } 885 } 886 } 887 } catch (error) { 888 await this.log('warn', 'Failed to check disk usage', { 889 error: error.message, 890 }); 891 } 892 893 await this.completeTask(task.id, { 894 anomalies, 895 anomaly_count: anomalies.length, 896 }); 897 } 898 899 /** 900 * Check pipeline health - throughput-based bottleneck detection 901 * Uses pipeline_metrics table for time-since-last-activity alerting. 902 * 903 * @param {Object} task - Task object 904 * @returns {Promise<void>} 905 */ 906 async checkPipelineHealth(task) { 907 const { check_error_rates = true } = task.context_json || {}; 908 909 await this.log('info', 'Checking pipeline health', { 910 task_id: task.id, 911 }); 912 913 const issues = []; 914 915 // Throughput-based bottleneck detection using pipeline_metrics 916 const stages = [ 917 { name: 'assets', inputStatus: 'found', outputStatus: 'assets_captured' }, 918 { name: 'scoring', inputStatus: 'assets_captured', outputStatus: 'prog_scored' }, 919 { name: 'rescoring', inputStatus: 'prog_scored', outputStatus: 'semantic_scored' }, 920 { name: 'enrich', inputStatus: 'semantic_scored', outputStatus: 'enriched' }, 921 { name: 'proposals', inputStatus: 'enriched', outputStatus: 'proposals_drafted' }, 922 { name: 'outreach', inputStatus: 'proposals_drafted', outputStatus: 'outreach_partial' }, 923 ]; 924 925 const skipStages = (process.env.SKIP_STAGES || '') 926 .split(',') 927 .map(s => s.trim().toLowerCase()) 928 .filter(Boolean); 929 930 // Read currently rate-limited stages so we don't fire false-positive stall alerts 931 const rateLimitedStages = new Set(); 932 try { 933 const rateLimitsFile = join(process.cwd(), 'logs/rate-limits.json'); 934 const raw = JSON.parse(await fs.readFile(rateLimitsFile, 'utf8')); 935 for (const entry of Object.values(raw)) { 936 for (const s of entry.stages || []) { 937 rateLimitedStages.add(s.toLowerCase()); 938 } 939 } 940 } catch { 941 // File missing or empty — no active rate limits 942 } 943 944 for (const stage of stages) { 945 // Skip intentionally disabled stages — stall alerts would be false positives 946 if (skipStages.includes(stage.name)) continue; 947 948 // Skip rate-limited stages — stall is expected while waiting for quota reset 949 if (rateLimitedStages.has(stage.name)) continue; 950 951 const queueRow = await getOne( 952 'SELECT COUNT(*) as count FROM sites WHERE status = $1', 953 [stage.inputStatus] 954 ); 955 const queueCount = queueRow ? Number(queueRow.count) : 0; 956 957 // Skip stages with empty queues 958 if (queueCount === 0) continue; 959 960 // Check last run from pipeline_metrics 961 const lastRun = await getOne( 962 `SELECT sites_processed, duration_ms, finished_at 963 FROM tel.pipeline_metrics WHERE stage_name = $1 964 ORDER BY started_at DESC LIMIT 1`, 965 [stage.name] 966 ); 967 968 // Check for crash loop: stage runs but consistently processes 0 sites 969 // This catches crashes that complete instantly (last_run is recent, stall detector misses them) 970 const recentRuns = await getAll( 971 `SELECT sites_processed, finished_at FROM tel.pipeline_metrics 972 WHERE stage_name = $1 ORDER BY started_at DESC LIMIT 5`, 973 [stage.name] 974 ); 975 976 // Crash loop = rapid consecutive runs with 0 sites. Check that the 5 runs span <2 minutes. 977 // Legitimate idle polling (outreach at 60s intervals) spans ~4 min for 5 runs — safely above. 978 // Genuine crash loops cycle in seconds and span <30s for 5 runs. 979 const crashSpanMs = 980 recentRuns.length >= 2 981 ? new Date(recentRuns[0].finished_at).getTime() - 982 new Date(recentRuns[recentRuns.length - 1].finished_at).getTime() 983 : Infinity; 984 985 const crashLoopDetected = 986 recentRuns.length >= 3 && 987 recentRuns.every(r => r.sites_processed === 0) && 988 recentRuns[0]?.finished_at && 989 (Date.now() - new Date(recentRuns[0].finished_at).getTime()) / 60000 < 120 && // ran recently 990 crashSpanMs < 2 * 60 * 1000; // 5 runs within 2 minutes = genuine rapid loop 991 992 if (crashLoopDetected) { 993 // Scan stage-specific log for the error 994 let crashError = null; 995 try { 996 const today = new Date().toISOString().slice(0, 10); 997 const stageLogPath = join(process.cwd(), `logs/${stage.name}-${today}.log`); 998 const stageLogContent = await fs.readFile(stageLogPath, 'utf8'); 999 const errorPattern = 1000 /\[ERROR\].*stage failed[^\n]*\n(?:[\s\S]{0,200}"message":\s*"([^"]{0,300})")?/g; 1001 const matches = [...stageLogContent.matchAll(errorPattern)]; 1002 if (matches.length >= 2) { 1003 crashError = matches[matches.length - 1][1] || 'Unknown crash error'; 1004 } 1005 } catch { 1006 // Log not readable 1007 } 1008 1009 await this.createTask({ 1010 task_type: 'classify_error', 1011 assigned_to: 'triage', 1012 priority: 9, 1013 context: { 1014 error_type: 'stage_crash_loop', 1015 error_message: `${stage.name} stage crash loop: ran ${recentRuns.length} times processing 0 sites with ${queueCount} queued. Last error: ${crashError || 'check logs'}`, 1016 stage: stage.name, 1017 queue_count: queueCount, 1018 consecutive_zero_runs: recentRuns.length, 1019 stack_trace: crashError || null, 1020 }, 1021 }); 1022 1023 await addReviewItem({ 1024 file: 'Pipeline', 1025 reason: `${stage.name} crash loop: ${recentRuns.length} consecutive runs with 0 sites processed despite ${queueCount} queued`, 1026 type: 'critical', 1027 priority: 'critical', 1028 }); 1029 } 1030 1031 if (lastRun?.finished_at) { 1032 const minutesSinceRun = (Date.now() - new Date(lastRun.finished_at).getTime()) / 60000; 1033 1034 if (minutesSinceRun > 30) { 1035 issues.push({ 1036 type: 'stage_stalled', 1037 stage: stage.name, 1038 queue_count: queueCount, 1039 minutes_since_run: Math.round(minutesSinceRun), 1040 severity: 'critical', 1041 }); 1042 1043 await this.log('warn', 'Stage stalled with queue waiting', { 1044 task_id: task.id, 1045 stage: stage.name, 1046 queue_count: queueCount, 1047 minutes_since_run: Math.round(minutesSinceRun), 1048 }); 1049 1050 // Check if the stall is caused by a crash: scan pipeline log for recent "✗ StageName failed" errors 1051 let lastCrashError = null; 1052 try { 1053 const today = new Date().toISOString().slice(0, 10); 1054 const logPath = join(process.cwd(), `logs/pipeline-${today}.log`); 1055 const logContent = await fs.readFile(logPath, 'utf8'); 1056 const stageLabel = stage.name.charAt(0).toUpperCase() + stage.name.slice(1); 1057 const crashPattern = new RegExp( 1058 `✗ ${stageLabel} failed:[\\s\\S]{0,20}"stack":\\s*"([^"]{0,400})"`, 1059 'g' 1060 ); 1061 const matches = [...logContent.matchAll(crashPattern)]; 1062 if (matches.length >= 2) { 1063 // Stage has crashed multiple times — it's a code crash, not a stall 1064 lastCrashError = matches[matches.length - 1][1].replace(/\\n/g, '\n'); 1065 } 1066 } catch { 1067 // Log not readable — proceed without crash context 1068 } 1069 1070 await this.createTask({ 1071 task_type: 'classify_error', 1072 assigned_to: 'triage', 1073 priority: 9, 1074 context: { 1075 error_type: lastCrashError ? 'stage_crash_loop' : 'stage_stalled', 1076 error_message: lastCrashError 1077 ? `${stage.name} stage crashing on every run: ${lastCrashError.substring(0, 300)}` 1078 : `${stage.name} stalled ${Math.round(minutesSinceRun)}min with ${queueCount} queued`, 1079 stage: stage.name, 1080 queue_count: queueCount, 1081 stack_trace: lastCrashError || null, 1082 }, 1083 }); 1084 1085 await addReviewItem({ 1086 file: 'Pipeline', 1087 reason: `${stage.name} stage stalled ${Math.round(minutesSinceRun)}min with ${queueCount} sites waiting`, 1088 type: 'critical', 1089 priority: 'critical', 1090 }); 1091 } else if (minutesSinceRun > 15) { 1092 issues.push({ 1093 type: 'stage_slow', 1094 stage: stage.name, 1095 queue_count: queueCount, 1096 minutes_since_run: Math.round(minutesSinceRun), 1097 severity: 'warning', 1098 }); 1099 1100 await this.log('warn', 'Stage slow with queue waiting', { 1101 task_id: task.id, 1102 stage: stage.name, 1103 queue_count: queueCount, 1104 minutes_since_run: Math.round(minutesSinceRun), 1105 }); 1106 } 1107 } else if (queueCount > 100) { 1108 // No pipeline_metrics for this stage but large queue 1109 issues.push({ 1110 type: 'stage_no_metrics', 1111 stage: stage.name, 1112 queue_count: queueCount, 1113 severity: 'warning', 1114 }); 1115 } 1116 } 1117 1118 // Check error rates per stage 1119 if (check_error_rates) { 1120 const failingByStage = await getAll( 1121 `SELECT error_message, COUNT(*) as count 1122 FROM sites 1123 WHERE status = 'failing' 1124 AND error_message IS NOT NULL 1125 AND updated_at > NOW() - INTERVAL '24 hours' 1126 GROUP BY error_message 1127 HAVING COUNT(*) > 5 1128 ORDER BY count DESC 1129 LIMIT 10` 1130 ); 1131 1132 for (const error of failingByStage) { 1133 issues.push({ 1134 type: 'high_error_rate', 1135 error_message: error.error_message.substring(0, 200), 1136 count: error.count, 1137 severity: error.count > 50 ? 'high' : 'medium', 1138 }); 1139 1140 await this.log('warn', 'High error rate detected', { 1141 task_id: task.id, 1142 error: error.error_message.substring(0, 100), 1143 count: error.count, 1144 }); 1145 1146 await this.createTask({ 1147 task_type: 'classify_error', 1148 assigned_to: 'triage', 1149 priority: error.count > 50 ? 9 : 7, 1150 context: { 1151 error_type: 'high_frequency_error', 1152 error_message: error.error_message, 1153 frequency: error.count, 1154 }, 1155 }); 1156 } 1157 } 1158 1159 // Check for repeated proposal JSON parse failures (LLM returns plain text for large contact lists) 1160 try { 1161 const proposalLogPath = join( 1162 process.cwd(), 1163 `logs/proposals-${new Date().toISOString().slice(0, 10)}.log` 1164 ); 1165 const proposalLog = await fs.readFile(proposalLogPath, 'utf8').catch(() => ''); 1166 const recentLog = proposalLog.slice(-50000); // last ~50KB 1167 const invalidFormatMatches = [...recentLog.matchAll(/Invalid proposal response format/g)]; 1168 const parseFailMatches = [...recentLog.matchAll(/Failed to parse JSON, returning fallback/g)]; 1169 const totalFails = invalidFormatMatches.length + parseFailMatches.length; 1170 if (totalFails >= 5) { 1171 issues.push({ 1172 type: 'proposal_json_parse_failures', 1173 count: totalFails, 1174 severity: totalFails >= 20 ? 'high' : 'medium', 1175 }); 1176 await this.createTask({ 1177 task_type: 'fix_bug', 1178 assigned_to: 'developer', 1179 priority: 8, 1180 context: { 1181 error_type: 'proposal_json_parse_failures', 1182 error_message: `LLM returns plain text instead of JSON for large contact batches (${totalFails} failures in proposals log today)`, 1183 stack_trace: 1184 'storeProposalVariant in proposal-generator-v2.js — LLM ignores json_mode when contact count is high (25+). Fix: batch contacts into chunks of ≤15 per LLM call.', 1185 file_path: 'src/proposal-generator-v2.js', 1186 severity: 'medium', 1187 }, 1188 }); 1189 } 1190 } catch { 1191 // Non-fatal — proposal log check is best-effort 1192 } 1193 1194 // Check for bogus phone numbers stored as pending SMS outreaches 1195 // LLM hallucinations: numbers like +0401040, +01520153 (start with +0, not valid E.164) 1196 try { 1197 const bogusRow = await getOne( 1198 `SELECT COUNT(*) as count FROM messages 1199 WHERE direction='outbound' AND contact_method='sms' AND approval_status='pending' 1200 AND contact_uri !~ '^[+][1-9]'` 1201 ); 1202 const bogusSmsCnt = bogusRow ? Number(bogusRow.count) : 0; 1203 1204 if (bogusSmsCnt >= 10) { 1205 issues.push({ 1206 type: 'bogus_sms_phone_numbers', 1207 count: bogusSmsCnt, 1208 severity: bogusSmsCnt >= 50 ? 'high' : 'medium', 1209 }); 1210 await this.createTask({ 1211 task_type: 'fix_bug', 1212 assigned_to: 'developer', 1213 priority: 8, 1214 context: { 1215 error_type: 'bogus_sms_phone_numbers', 1216 error_message: `${bogusSmsCnt} pending SMS messages have invalid phone numbers (not E.164: start with +0 or similar LLM hallucination patterns)`, 1217 stack_trace: 1218 'getAllContacts() in src/contacts/prioritize.js passes landline numbers through without E.164 validation. ' + 1219 'Fix: add regex /^\\+[1-9]\\d{7,14}$/ guard before mobile/landline split. ' + 1220 "Cleanup: UPDATE messages SET delivery_status='failed', error_message='Invalid phone number' WHERE direction='outbound' AND contact_method='sms' AND approval_status='pending' AND contact_uri !~ '^\\+[1-9]'", 1221 file_path: 'src/contacts/prioritize.js', 1222 severity: 'medium', 1223 }, 1224 }); 1225 } 1226 } catch { 1227 // Non-fatal 1228 } 1229 1230 // Check for country calling code mismatches in pending SMS outreaches 1231 // e.g. +60 (Malaysia) stored for NZ site — passes E.164 regex but wrong country 1232 try { 1233 const mismatchRow = await getOne( 1234 `SELECT COUNT(*) as count FROM messages o 1235 JOIN sites s ON o.site_id = s.id 1236 WHERE o.direction='outbound' AND o.contact_method='sms' AND o.approval_status='pending' 1237 AND s.country_code IN ('NZ','AU','US','CA','GB','IE','IN','SG','ZA') 1238 AND NOT ( 1239 (s.country_code IN ('US','CA') AND o.contact_uri LIKE '+1%') 1240 OR (s.country_code='NZ' AND o.contact_uri LIKE '+64%') 1241 OR (s.country_code='AU' AND o.contact_uri LIKE '+61%') 1242 OR (s.country_code='GB' AND o.contact_uri LIKE '+44%') 1243 OR (s.country_code='IE' AND o.contact_uri LIKE '+353%') 1244 OR (s.country_code='IN' AND o.contact_uri LIKE '+91%') 1245 OR (s.country_code='SG' AND o.contact_uri LIKE '+65%') 1246 OR (s.country_code='ZA' AND o.contact_uri LIKE '+27%') 1247 )` 1248 ); 1249 const mismatchCnt = mismatchRow ? Number(mismatchRow.count) : 0; 1250 1251 if (mismatchCnt >= 10) { 1252 issues.push({ 1253 type: 'sms_country_code_mismatch', 1254 count: mismatchCnt, 1255 severity: mismatchCnt >= 50 ? 'high' : 'medium', 1256 }); 1257 await this.createTask({ 1258 task_type: 'fix_bug', 1259 assigned_to: 'developer', 1260 priority: 8, 1261 context: { 1262 error_type: 'sms_country_code_mismatch', 1263 error_message: `${mismatchCnt} pending SMS messages have wrong country calling code (e.g. +60 Malaysia for NZ site). These will fail with Twilio.`, 1264 stack_trace: 1265 'getAllContacts() in src/contacts/prioritize.js now validates country.phoneFormat against phone prefix. ' + 1266 "Cleanup: UPDATE messages SET delivery_status='failed', error_message='Invalid phone: country code mismatch' WHERE direction='outbound' AND ... (see monitor.js for full SQL)", 1267 file_path: 'src/contacts/prioritize.js', 1268 severity: 'medium', 1269 }, 1270 }); 1271 } 1272 } catch { 1273 // Non-fatal 1274 } 1275 1276 // Check for landline numbers stored as pending/approved SMS outreaches 1277 // These will fail — SMS requires mobile numbers. Landlines were previously added 1278 // as fallback in prioritize.js but the fallback has now been removed. 1279 try { 1280 const landlineRow = await getOne( 1281 `SELECT COUNT(*) as count FROM messages o 1282 JOIN sites s ON o.site_id = s.id 1283 WHERE o.direction='outbound' AND o.contact_method='sms' AND o.approval_status IN ('pending','approved') 1284 AND ( 1285 (s.country_code='AU' AND o.contact_uri NOT LIKE '+614%') 1286 OR (s.country_code='NZ' AND o.contact_uri LIKE '+64%' 1287 AND o.contact_uri NOT LIKE '+6421%' AND o.contact_uri NOT LIKE '+6422%' 1288 AND o.contact_uri NOT LIKE '+6427%' AND o.contact_uri NOT LIKE '+6428%' 1289 AND o.contact_uri NOT LIKE '+6429%' AND o.contact_uri NOT LIKE '+642%') 1290 OR (s.country_code='GB' AND o.contact_uri LIKE '+44%' AND o.contact_uri NOT LIKE '+447%') 1291 OR (s.country_code='IE' AND o.contact_uri LIKE '+353%' AND o.contact_uri NOT LIKE '+3538%') 1292 )` 1293 ); 1294 const landlineCnt = landlineRow ? Number(landlineRow.count) : 0; 1295 if (landlineCnt >= 5) { 1296 issues.push({ 1297 type: 'sms_landline_numbers', 1298 severity: 'medium', 1299 message: `${landlineCnt} pending/approved SMS messages are landline numbers that will fail`, 1300 fix: "UPDATE messages SET delivery_status='failed', error_message='Landline number — SMS delivery not supported' WHERE direction='outbound' AND ... (join with sites for country filter)", 1301 context: { 1302 error_type: 'sms_landline_numbers', 1303 error_message: `${landlineCnt} SMS messages target landline numbers (AU/NZ/GB/IE). SMS will fail. The landline fallback was removed from prioritize.js but historical records remain.`, 1304 stack_trace: 1305 'Landline fallback removed from getAllContacts() in src/contacts/prioritize.js. ' + 1306 "Cleanup: UPDATE messages SET delivery_status='failed', error_message='Landline number' WHERE direction='outbound' AND ... (filter by country+non-mobile prefix)", 1307 file_path: 'src/contacts/prioritize.js', 1308 severity: 'medium', 1309 }, 1310 }); 1311 } 1312 } catch { 1313 // Non-fatal 1314 } 1315 1316 // Check Twilio balance — alert if < $10 (enough for ~1,265 SMS at $0.0079 each) 1317 // Blind spot: outreach keeps sending until balance hits $0, causing cascading failures. 1318 try { 1319 const sid = process.env.TWILIO_ACCOUNT_SID; 1320 const token = process.env.TWILIO_AUTH_TOKEN; 1321 if (sid && token) { 1322 const auth = Buffer.from(`${sid}:${token}`).toString('base64'); 1323 const res = await fetch(`https://api.twilio.com/2010-04-01/Accounts/${sid}/Balance.json`, { 1324 headers: { Authorization: `Basic ${auth}` }, 1325 }); 1326 if (res.ok) { 1327 const data = await res.json(); 1328 const balance = parseFloat(data.balance); 1329 if (!isNaN(balance) && balance < 10) { 1330 issues.push({ 1331 type: 'twilio_balance_low', 1332 severity: balance < 5 ? 'high' : 'medium', 1333 message: `Twilio balance is $${balance.toFixed(2)} — SMS outreach will stop when it hits $0`, 1334 fix: 'Top up Twilio balance at console.twilio.com', 1335 context: { 1336 error_type: 'twilio_balance_low', 1337 error_message: `Twilio balance is $${balance.toFixed(2)} USD. At ~$0.0079/SMS, this covers ~${Math.floor(balance / 0.0079)} more SMS. Add credits at console.twilio.com before outreach stops.`, 1338 stack_trace: 'Twilio balance check in monitor.js checkPipelineHealth()', 1339 file_path: 'src/outreach/sms.js', 1340 severity: balance < 5 ? 'high' : 'medium', 1341 }, 1342 }); 1343 } 1344 } 1345 } 1346 } catch { 1347 // Non-fatal — Twilio balance check is best-effort 1348 } 1349 1350 await this.log('info', 'Pipeline health check complete', { 1351 task_id: task.id, 1352 issues_found: issues.length, 1353 }); 1354 1355 await this.completeTask(task.id, { 1356 issues, 1357 total_issues: issues.length, 1358 stalled: issues.filter(i => i.type === 'stage_stalled').length, 1359 slow: issues.filter(i => i.type === 'stage_slow').length, 1360 error_rates: issues.filter(i => i.type === 'high_error_rate').length, 1361 }); 1362 } 1363 1364 /** 1365 * Get numeric order for pipeline stages 1366 * 1367 * @param {string} status - Status name 1368 * @returns {number} - Order (higher = later in pipeline) 1369 */ 1370 getStageOrder(status) { 1371 const order = { 1372 found: 1, 1373 assets_captured: 2, 1374 prog_scored: 3, 1375 semantic_scored: 4, 1376 vision_scored: 4, 1377 enriched: 5, 1378 proposals_drafted: 6, 1379 outreach_partial: 7, 1380 outreach_sent: 8, 1381 }; 1382 1383 return order[status] || 0; 1384 } 1385 1386 /** 1387 * Check Service-Level Objectives (SLO) compliance 1388 * 1389 * @param {Object} task - Task object 1390 * @returns {Promise<void>} 1391 */ 1392 async checkSLOCompliance(task) { 1393 await this.log('info', 'Checking SLO compliance', { 1394 task_id: task.id, 1395 }); 1396 1397 const summary = getSLOSummary(); 1398 const violations = checkSLOCompliance(); 1399 1400 if (violations.length > 0) { 1401 await this.log('warn', 'SLO violations detected', { 1402 task_id: task.id, 1403 total_violations: violations.length, 1404 compliance_rate: summary.compliance_rate, 1405 }); 1406 1407 // Create Architect tasks for each violation 1408 for (const violation of violations) { 1409 await this.log('warn', 'SLO violation', { 1410 task_id: task.id, 1411 stage: violation.slo.stage_name, 1412 description: violation.violation_description, 1413 severity: violation.violation_severity, 1414 actual_p95: violation.actual.p95, 1415 target: violation.slo.target_duration_minutes, 1416 }); 1417 1418 // Determine priority based on severity 1419 const priority = 1420 { 1421 critical: 10, 1422 high: 8, 1423 medium: 6, 1424 low: 5, 1425 }[violation.violation_severity] || 6; 1426 1427 // Create Architect task to investigate and optimize 1428 await this.createTask({ 1429 task_type: 'design_optimization', 1430 assigned_to: 'architect', 1431 priority, 1432 context: { 1433 optimization_type: 'slo_violation', 1434 stage_name: violation.slo.stage_name, 1435 description: violation.violation_description, 1436 current_p95: violation.actual.p95, 1437 target_duration: violation.slo.target_duration_minutes, 1438 severity: violation.violation_severity, 1439 sample_size: violation.actual.totalSites, 1440 }, 1441 }); 1442 1443 // Add to human review for critical violations 1444 if (violation.violation_severity === 'critical') { 1445 await addReviewItem({ 1446 file: 'Pipeline Performance', 1447 reason: violation.violation_description, 1448 type: 'critical', 1449 priority: 'critical', 1450 }); 1451 } 1452 } 1453 } else { 1454 await this.log('info', 'All SLOs compliant', { 1455 task_id: task.id, 1456 total_slos: summary.total_slos, 1457 }); 1458 } 1459 1460 await this.completeTask(task.id, { 1461 total_slos: summary.total_slos, 1462 violations: violations.length, 1463 compliance_rate: summary.compliance_rate, 1464 violations_detail: violations, 1465 }); 1466 } 1467 1468 /** 1469 * Check for loops - site retry loops, agent task bounce, log domain frequency 1470 * 1471 * @param {Object} task - Task object 1472 * @returns {Promise<void>} 1473 */ 1474 async checkLoops(task) { 1475 await this.log('info', 'Checking for loops', { task_id: task.id }); 1476 1477 const loops = []; 1478 1479 // A. Site retry loops (recapture_count > 3) 1480 const siteLoops = await getAll( 1481 `SELECT id, domain, status, recapture_count 1482 FROM sites 1483 WHERE recapture_count > 3 1484 ORDER BY recapture_count DESC LIMIT 20` 1485 ); 1486 1487 for (const site of siteLoops) { 1488 loops.push({ 1489 type: 'site_retry_loop', 1490 site_id: site.id, 1491 domain: site.domain, 1492 recapture_count: site.recapture_count, 1493 status: site.status, 1494 }); 1495 } 1496 1497 if (siteLoops.length > 0) { 1498 await this.log('warn', 'Site retry loops detected', { 1499 task_id: task.id, 1500 count: siteLoops.length, 1501 worst: siteLoops[0]?.domain, 1502 worst_count: siteLoops[0]?.recapture_count, 1503 }); 1504 } 1505 1506 // B. Agent task bounce loops (parent spawned >3 children in 24hr) 1507 const bounceLoops = await getAll( 1508 `SELECT parent_task_id, COUNT(*) as bounce_count, 1509 STRING_AGG(DISTINCT assigned_to::text, ',') as agents 1510 FROM tel.agent_tasks 1511 WHERE parent_task_id IS NOT NULL 1512 AND created_at > NOW() - INTERVAL '24 hours' 1513 GROUP BY parent_task_id 1514 HAVING COUNT(*) > 3` 1515 ); 1516 1517 for (const bounce of bounceLoops) { 1518 loops.push({ 1519 type: 'agent_bounce_loop', 1520 parent_task_id: bounce.parent_task_id, 1521 bounce_count: bounce.bounce_count, 1522 agents: bounce.agents, 1523 }); 1524 } 1525 1526 if (bounceLoops.length > 0) { 1527 await this.log('warn', 'Agent task bounce loops detected', { 1528 task_id: task.id, 1529 count: bounceLoops.length, 1530 }); 1531 } 1532 1533 // C. Log-based domain frequency (>10 appearances in recent pipeline log) 1534 const logDir = process.env.LOGS_DIR || './logs/'; 1535 const today = new Date().toISOString().slice(0, 10); 1536 const logPath = `${logDir}pipeline-${today}.log`; 1537 let domainLoops = []; 1538 1539 try { 1540 const content = await fs.readFile(logPath, 'utf8'); 1541 const lines = content.split('\n').slice(-500); // Last 500 lines 1542 const domainCounts = {}; 1543 1544 for (const line of lines) { 1545 // Extract domain from log lines (common patterns) 1546 const domainMatch = line.match(/(?:domain|site|url)[=: ]+([a-zA-Z0-9.-]+\.[a-z]{2,})/i); 1547 if (domainMatch) { 1548 const domain = domainMatch[1].toLowerCase(); 1549 domainCounts[domain] = (domainCounts[domain] || 0) + 1; 1550 } 1551 } 1552 1553 domainLoops = Object.entries(domainCounts) 1554 .filter(([, count]) => count > 10) 1555 .map(([domain, count]) => ({ domain, count })) 1556 .sort((a, b) => b.count - a.count); 1557 1558 for (const dl of domainLoops) { 1559 loops.push({ 1560 type: 'log_domain_loop', 1561 domain: dl.domain, 1562 log_count: dl.count, 1563 }); 1564 } 1565 } catch { 1566 // Log file may not exist 1567 } 1568 1569 await this.completeTask(task.id, { 1570 total_loops: loops.length, 1571 site_retry_loops: siteLoops.length, 1572 agent_bounce_loops: bounceLoops.length, 1573 log_domain_loops: domainLoops.length, 1574 loops, 1575 }); 1576 } 1577 1578 /** 1579 * Check blocked agent tasks and create triage tasks for investigation. 1580 * Replaces the old watchdog auto-cancel approach with proper triage. 1581 * 1582 * @param {Object} task - Task object 1583 * @returns {Promise<void>} 1584 */ 1585 async checkBlockedTasks(task) { 1586 await this.log('info', 'Checking blocked tasks', { task_id: task.id }); 1587 1588 const blockedTasks = await getAll( 1589 `SELECT id, task_type, assigned_to, context_json, error_message, created_at, 1590 EXTRACT(EPOCH FROM (NOW() - created_at))/3600 as hours_blocked 1591 FROM tel.agent_tasks 1592 WHERE status = 'blocked' 1593 AND created_at < NOW() - INTERVAL '2 hours' 1594 ORDER BY created_at ASC` 1595 ); 1596 1597 let triageCreated = 0; 1598 let patternFixCreated = 0; 1599 1600 // Group blocked tasks by error message prefix (first 60 chars) for pattern detection 1601 const errorGroups = {}; 1602 for (const blocked of blockedTasks) { 1603 const errPrefix = (blocked.error_message || '').substring(0, 60); 1604 if (!errPrefix) continue; 1605 if (!errorGroups[errPrefix]) errorGroups[errPrefix] = []; 1606 errorGroups[errPrefix].push(blocked); 1607 } 1608 1609 // Track which task IDs are covered by a pattern group fix_bug task 1610 const coveredByPattern = new Set(); 1611 1612 // For groups with >3 tasks sharing the same error prefix, create a single fix_bug task 1613 for (const [errPrefix, tasks] of Object.entries(errorGroups)) { 1614 if (tasks.length <= 3) continue; 1615 1616 // Check if a fix_bug task already exists for this error pattern 1617 const existingFix = await getOne( 1618 `SELECT id FROM tel.agent_tasks 1619 WHERE task_type = 'fix_bug' 1620 AND context_json::text LIKE $1 1621 AND status NOT IN ('completed', 'failed') 1622 LIMIT 1`, 1623 [`%${errPrefix.substring(0, 40)}%`] 1624 ); 1625 1626 if (!existingFix) { 1627 await this.createTask({ 1628 task_type: 'fix_bug', 1629 assigned_to: 'developer', 1630 priority: 9, 1631 context: { 1632 error_type: 'pattern_blocked_tasks', 1633 error_prefix: errPrefix, 1634 affected_task_count: tasks.length, 1635 affected_task_ids: tasks.map(t => t.id), 1636 sample_error: tasks[0].error_message, 1637 task_types: [...new Set(tasks.map(t => t.task_type))], 1638 }, 1639 }); 1640 patternFixCreated++; 1641 1642 await this.log('warn', 'Pattern detected in blocked tasks', { 1643 task_id: task.id, 1644 error_prefix: errPrefix, 1645 affected_count: tasks.length, 1646 }); 1647 } 1648 1649 for (const t of tasks) coveredByPattern.add(t.id); 1650 } 1651 1652 // For remaining blocked tasks not covered by a pattern, create individual triage tasks 1653 for (const blocked of blockedTasks) { 1654 if (coveredByPattern.has(blocked.id)) continue; 1655 1656 const existingTriage = await getOne( 1657 `SELECT id FROM tel.agent_tasks 1658 WHERE parent_task_id = $1 AND task_type = 'triage_error' 1659 AND status NOT IN ('completed', 'failed') 1660 LIMIT 1`, 1661 [blocked.id] 1662 ); 1663 1664 if (!existingTriage) { 1665 await this.createTask({ 1666 task_type: 'classify_error', 1667 assigned_to: 'triage', 1668 priority: 8, 1669 parent_task_id: blocked.id, 1670 context: { 1671 error_type: 'blocked_task', 1672 blocked_task_id: blocked.id, 1673 blocked_task_type: blocked.task_type, 1674 blocked_agent: blocked.assigned_to, 1675 hours_blocked: Math.round(blocked.hours_blocked), 1676 error_message: 1677 blocked.error_message || 1678 `Task #${blocked.id} (${blocked.task_type}) blocked for ${Math.round(blocked.hours_blocked)}h`, 1679 }, 1680 }); 1681 triageCreated++; 1682 } 1683 } 1684 1685 await this.log('info', 'Blocked task check complete', { 1686 task_id: task.id, 1687 total_blocked: blockedTasks.length, 1688 triage_created: triageCreated, 1689 pattern_fix_created: patternFixCreated, 1690 covered_by_pattern: coveredByPattern.size, 1691 }); 1692 1693 await this.completeTask(task.id, { 1694 total_blocked: blockedTasks.length, 1695 triage_created: triageCreated, 1696 pattern_fix_created: patternFixCreated, 1697 covered_by_pattern: coveredByPattern.size, 1698 }); 1699 } 1700 1701 /** 1702 * Analyse the rate-limit event log and surface patterns that indicate the 1703 * scheduler config needs tuning. 1704 * 1705 * Reads logs/rate-limit-events.jsonl (written by rate-limit-scheduler.js). 1706 * Detects three signal types (last 7 days): 1707 * 1708 * wait_too_short — Same API rate-limited again within 2 h of a previous clear. 1709 * Suggests the configured wait window is shorter than the 1710 * actual API quota window (e.g. use 'daily' instead of 'hourly'). 1711 * 1712 * false_positive — Circuit breaker cleared (API recovered) with >50 % of the 1713 * scheduled wait still remaining. Suggests timeout-based 1714 * detection fired for a genuine outage, not a quota exhaustion. 1715 * Consider raising the detectFromTimeouts threshold, or using 1716 * a shorter limitType (e.g. 'hourly' instead of 'daily'). 1717 * 1718 * high_frequency — Same API rate-limited >3 times in 24 h. 1719 * Suggests usage is consistently hitting the quota ceiling; 1720 * consider reducing concurrency or spreading requests over time. 1721 * 1722 * Also surfaces any currently active rate limits so they appear in the 1723 * agent task history for human reviewers. 1724 * 1725 * @param {object} task - agent task context 1726 */ 1727 async checkRateLimitPatterns(task) { 1728 await this.log('info', 'Checking rate limit patterns', { task_id: task.id }); 1729 1730 const eventsFilePath = join(process.cwd(), 'logs/rate-limit-events.jsonl'); 1731 const sevenDaysAgo = Date.now() - 7 * 24 * 60 * 60 * 1000; 1732 1733 // Parse the event log 1734 let events = []; 1735 try { 1736 const raw = await fs.readFile(eventsFilePath, 'utf8'); 1737 events = raw 1738 .split('\n') 1739 .filter(Boolean) 1740 .map(line => { 1741 try { 1742 return JSON.parse(line); 1743 } catch { 1744 return null; 1745 } 1746 }) 1747 .filter(e => e && new Date(e.timestamp).getTime() >= sevenDaysAgo); 1748 } catch { 1749 // File may not exist yet — no events to analyse 1750 } 1751 1752 const findings = []; 1753 1754 if (events.length > 0) { 1755 // ── Signal 1: wait_too_short ───────────────────────────────────────── 1756 // Same API: set event arrives within 2 h of a clear/expired event 1757 const TWO_HOURS = 2 * 60 * 60 * 1000; 1758 const byApi = {}; 1759 for (const e of events) { 1760 if (!byApi[e.api]) byApi[e.api] = []; 1761 byApi[e.api].push(e); 1762 } 1763 1764 for (const [api, apiEvents] of Object.entries(byApi)) { 1765 const sorted = apiEvents.sort((a, b) => new Date(a.timestamp) - new Date(b.timestamp)); 1766 let lastClear = null; 1767 for (const e of sorted) { 1768 if (e.type === 'clear' || e.type === 'expired') { 1769 lastClear = new Date(e.timestamp).getTime(); 1770 } else if (e.type === 'set' && lastClear !== null) { 1771 const gap = new Date(e.timestamp).getTime() - lastClear; 1772 if (gap < TWO_HOURS) { 1773 findings.push({ 1774 signal: 'wait_too_short', 1775 api, 1776 gapMinutes: Math.round(gap / 60_000), 1777 suggestion: `${api}: rate limit re-triggered only ${Math.round(gap / 60_000)} min after previous clear. Consider increasing limitType from 'hourly' to 'daily', or reduce ZENROWS_CONCURRENCY.`, 1778 }); 1779 break; // one finding per API is enough 1780 } 1781 } 1782 } 1783 } 1784 1785 // ── Signal 2: false_positive ───────────────────────────────────────── 1786 // clear event with earlyByMs > 50% of scheduledWaitMs 1787 for (const e of events) { 1788 if (e.type === 'clear' && e.earlyByMs > 0 && e.actualWaitMs > 0) { 1789 const scheduledWaitMs = e.actualWaitMs + e.earlyByMs; 1790 const earlyPct = (e.earlyByMs / scheduledWaitMs) * 100; 1791 if (earlyPct > 50) { 1792 findings.push({ 1793 signal: 'false_positive', 1794 api: e.api, 1795 earlyPct: Math.round(earlyPct), 1796 earlyByMinutes: Math.round(e.earlyByMs / 60_000), 1797 suggestion: `${e.api}: breaker closed ${Math.round(e.earlyByMs / 60_000)} min early (${Math.round(earlyPct)}% of scheduled wait unused). Possible outage misclassified as quota exhaustion. Consider using a shorter limitType or raising detectFromTimeouts threshold.`, 1798 }); 1799 } 1800 } 1801 } 1802 1803 // ── Signal 3: high_frequency ───────────────────────────────────────── 1804 // Same API: >3 'set' events in any rolling 24-hour window 1805 const DAY = 24 * 60 * 60 * 1000; 1806 for (const [api, apiEvents] of Object.entries(byApi)) { 1807 const sets = apiEvents 1808 .filter(e => e.type === 'set') 1809 .map(e => new Date(e.timestamp).getTime()); 1810 if (sets.length > 3) { 1811 // Slide a 24h window 1812 for (let i = 0; i < sets.length; i++) { 1813 const windowEnd = sets[i] + DAY; 1814 const inWindow = sets.filter(t => t >= sets[i] && t <= windowEnd).length; 1815 if (inWindow > 3) { 1816 findings.push({ 1817 signal: 'high_frequency', 1818 api, 1819 triggersIn24h: inWindow, 1820 suggestion: `${api}: rate-limited ${inWindow} times in 24 h. Usage consistently hitting quota ceiling. Consider reducing concurrency (ZENROWS_CONCURRENCY) or scheduling scraping to spread load across the day.`, 1821 }); 1822 break; 1823 } 1824 } 1825 } 1826 } 1827 } 1828 1829 // Also surface any currently active limits 1830 const rateLimitsFile = join(process.cwd(), 'logs/rate-limits.json'); 1831 let activeLimits = []; 1832 try { 1833 const raw = JSON.parse(await fs.readFile(rateLimitsFile, 'utf8')); 1834 const now = Date.now(); 1835 activeLimits = Object.entries(raw) 1836 .filter(([, v]) => Number(v.resetAt) > now) 1837 .map(([api, v]) => ({ 1838 api, 1839 stages: v.stages, 1840 reason: v.reason, 1841 resetAt: new Date(Number(v.resetAt)).toISOString(), 1842 waitMinutes: Math.ceil((Number(v.resetAt) - now) / 60_000), 1843 })); 1844 } catch { 1845 /* no active limits or file missing */ 1846 } 1847 1848 if (findings.length > 0) { 1849 await this.log('warn', 'Rate limit pattern issues detected', { 1850 task_id: task.id, 1851 findings: findings.length, 1852 details: findings.map(f => f.suggestion), 1853 }); 1854 1855 // Add to human review queue so the suggestion isn't missed 1856 try { 1857 await addReviewItem({ 1858 category: 'rate_limit_tuning', 1859 priority: 'medium', 1860 title: `Rate limit scheduler needs tuning (${findings.length} signal${findings.length > 1 ? 's' : ''})`, 1861 description: findings.map(f => `• ${f.suggestion}`).join('\n'), 1862 data: { findings, activeLimits }, 1863 }); 1864 } catch { 1865 /* human review queue may not be available in all environments */ 1866 } 1867 } else { 1868 await this.log('info', 'Rate limit patterns look healthy', { 1869 task_id: task.id, 1870 eventsAnalysed: events.length, 1871 }); 1872 } 1873 1874 await this.completeTask(task.id, { 1875 events_analysed: events.length, 1876 findings: findings.length, 1877 active_limits: activeLimits.length, 1878 signals: findings.map(f => ({ signal: f.signal, api: f.api })), 1879 }); 1880 } 1881 1882 /** 1883 * Ensure recurring monitor tasks are scheduled 1884 * 1885 * @returns {Promise<void>} 1886 */ 1887 async ensureRecurringTasks() { 1888 const recurringTasks = [ 1889 { task_type: 'scan_logs', priority: 5, interval_minutes: 5 }, 1890 { task_type: 'check_agent_health', priority: 6, interval_minutes: 30 }, 1891 { task_type: 'check_process_compliance', priority: 5, interval_minutes: 15 }, 1892 { task_type: 'detect_anomaly', priority: 4, interval_minutes: 60 }, 1893 { task_type: 'check_pipeline_health', priority: 7, interval_minutes: 10 }, 1894 { task_type: 'check_slo_compliance', priority: 7, interval_minutes: 30 }, 1895 { task_type: 'check_rate_limits', priority: 4, interval_minutes: 360 }, // Every 6 hours 1896 ]; 1897 1898 for (const taskDef of recurringTasks) { 1899 // Check if task already pending/running 1900 const exists = await getOne( 1901 `SELECT 1 FROM tel.agent_tasks 1902 WHERE assigned_to = 'monitor' 1903 AND task_type = $1 1904 AND status IN ('pending', 'running')`, 1905 [taskDef.task_type] 1906 ); 1907 1908 if (!exists) { 1909 // Check if enough time has passed since last completion 1910 const lastCompleted = await getOne( 1911 `SELECT completed_at FROM tel.agent_tasks 1912 WHERE assigned_to = 'monitor' 1913 AND task_type = $1 1914 AND status = 'completed' 1915 ORDER BY completed_at DESC 1916 LIMIT 1`, 1917 [taskDef.task_type] 1918 ); 1919 1920 const shouldCreate = 1921 !lastCompleted || 1922 Date.now() - new Date(lastCompleted.completed_at).getTime() > 1923 taskDef.interval_minutes * 60 * 1000; 1924 1925 if (shouldCreate) { 1926 await this.createTask({ 1927 task_type: taskDef.task_type, 1928 assigned_to: 'monitor', 1929 priority: taskDef.priority, 1930 context: { recurring: true }, 1931 }); 1932 } 1933 } 1934 } 1935 } 1936 }