/ src / agents / monitor.js
monitor.js
   1  /**
   2   * Monitor Agent
   3   *
   4   * The system's "immune system" - proactively scans logs, detects anomalies,
   5   * monitors process compliance, and tracks agent health.
   6   */
   7  
   8  import { BaseAgent } from './base-agent.js';
   9  import { execSync } from 'child_process';
  10  import fs from 'fs/promises';
  11  import { join } from 'path';
  12  import { run, getOne, getAll } from '../utils/db.js';
  13  import { addReviewItem } from '../utils/human-review-queue.js';
  14  import { checkSLOCompliance, getSLOSummary } from './utils/slo-tracker.js';
  15  
  16  export class MonitorAgent extends BaseAgent {
  17    constructor() {
  18      super('monitor', ['base.md', 'monitor.md']);
  19      this.lastReadPositions = {}; // Track incremental log reads
  20      this.loadFilePositions(); // Load persisted positions from database (fire-and-forget)
  21    }
  22  
  23    /**
  24     * Load file read positions from settings table
  25     *
  26     * @returns {Promise<void>}
  27     */
  28    async loadFilePositions() {
  29      try {
  30        const row = await getOne(
  31          'SELECT value FROM ops.settings WHERE key = $1',
  32          ['monitor_file_positions']
  33        );
  34  
  35        if (row) {
  36          this.lastReadPositions = JSON.parse(row.value);
  37        }
  38      } catch (error) {
  39        // If settings don't exist or JSON parse fails, start fresh
  40        this.lastReadPositions = {};
  41      }
  42    }
  43  
  44    /**
  45     * Save file read positions to settings table
  46     *
  47     * @returns {Promise<void>}
  48     */
  49    async saveFilePositions() {
  50      try {
  51        await run(
  52          `INSERT INTO ops.settings (key, value, description, updated_at)
  53           VALUES ($1, $2, $3, NOW())
  54           ON CONFLICT(key) DO UPDATE SET
  55             value = EXCLUDED.value,
  56             updated_at = EXCLUDED.updated_at`,
  57          [
  58            'monitor_file_positions',
  59            JSON.stringify(this.lastReadPositions),
  60            'Monitor agent file read positions for incremental log scanning',
  61          ]
  62        );
  63      } catch (error) {
  64        // Log error but don't fail - worst case we re-read logs
  65        console.error('Failed to save file positions:', error.message);
  66      }
  67    }
  68  
  69    /**
  70     * Process a monitor task
  71     *
  72     * @param {Object} task - Task object
  73     * @returns {Promise<void>}
  74     */
  75    async processTask(task) {
  76      // Self-heal: reset orphaned 'running' tasks of monitor types that got stuck after pipeline restart.
  77      // Any monitor task stuck in 'running' for >10 min was orphaned (agent was killed mid-execution).
  78      // Reset to 'pending' so the dedup check doesn't block future runs of the same task type.
  79      try {
  80        const monitorTaskTypes = [
  81          'check_pipeline_health',
  82          'check_loops',
  83          'check_agent_health',
  84          'scan_logs',
  85          'check_slo_compliance',
  86          'check_blocked_tasks',
  87          'check_process_compliance',
  88          'check_rate_limits',
  89          'detect_anomaly',
  90        ];
  91        const placeholders = monitorTaskTypes.map((_, i) => `$${i + 1}`).join(',');
  92        await run(
  93          `UPDATE tel.agent_tasks SET status='pending'
  94           WHERE task_type IN (${placeholders})
  95             AND status='running'
  96             AND id != $${monitorTaskTypes.length + 1}
  97             AND created_at < NOW() - INTERVAL '10 minutes'`,
  98          [...monitorTaskTypes, task.id]
  99        );
 100      } catch {
 101        // Non-fatal — cleanup best effort
 102      }
 103  
 104      try {
 105        // Parse context_json if needed
 106        const context =
 107          task.context_json && typeof task.context_json === 'string'
 108            ? JSON.parse(task.context_json)
 109            : task.context_json || {};
 110  
 111        // Ensure context is attached to task for handlers
 112        task.context_json = context;
 113  
 114        switch (task.task_type) {
 115          case 'scan_logs':
 116            await this.scanLogs(task);
 117            break;
 118  
 119          case 'check_agent_health':
 120            await this.checkAgentHealth(task);
 121            break;
 122  
 123          case 'check_process_compliance':
 124            await this.checkProcessCompliance(task);
 125            break;
 126  
 127          case 'detect_anomaly':
 128            await this.detectAnomaly(task);
 129            break;
 130  
 131          case 'check_pipeline_health':
 132            await this.checkPipelineHealth(task);
 133            break;
 134  
 135          case 'check_slo_compliance':
 136            await this.checkSLOCompliance(task);
 137            break;
 138  
 139          case 'check_loops':
 140            await this.checkLoops(task);
 141            break;
 142  
 143          case 'check_blocked_tasks':
 144            await this.checkBlockedTasks(task);
 145            break;
 146  
 147          case 'check_rate_limits':
 148            await this.checkRateLimitPatterns(task);
 149            break;
 150  
 151          // Task types assigned to wrong agent - delegate
 152          case 'implement_feature':
 153          case 'fix_bug':
 154          case 'bootstrap_monitor':
 155            await this.delegateToCorrectAgent(task);
 156            break;
 157  
 158          default:
 159            // Unknown task types - delegate to correct agent via task routing
 160            await this.log('warn', 'Unknown task type received, delegating', {
 161              task_id: task.id,
 162              task_type: task.task_type,
 163            });
 164            await this.delegateToCorrectAgent(task);
 165        }
 166      } catch (error) {
 167        await this.log(
 168          'error',
 169          `${this.agentName.charAt(0).toUpperCase() + this.agentName.slice(1)} task ${task.id} failed: ${error.message}`,
 170          {
 171            task_id: task.id,
 172            task_type: task.task_type,
 173            error: error.message,
 174            stack: error.stack,
 175          }
 176        );
 177        throw error; // Re-throw so task manager can handle
 178      }
 179    }
 180    /**
 181     * Scan log files for errors and anomalies
 182     *
 183     * @param {Object} task - Task object
 184     * @returns {Promise<void>}
 185     */
 186    async scanLogs(task) {
 187      const { days = 1, skip_retried = false } = task.context_json || {};
 188  
 189      await this.log('info', 'Starting log scan', {
 190        task_id: task.id,
 191        days,
 192      });
 193  
 194      const logDir = process.env.LOGS_DIR || './logs/';
 195      const today = new Date().toISOString().slice(0, 10);
 196      const logFiles = ['pipeline', 'outreach', 'inbound', 'cron', 'utils', 'agents'];
 197  
 198      let totalErrors = 0;
 199      let loopsDetected = 0;
 200  
 201      for (const domain of logFiles) {
 202        const filePath = `${logDir}${domain}-${today}.log`;
 203  
 204        try {
 205          // Read incrementally to avoid re-processing
 206          const errors = await this.readIncrementally(filePath, /\[ERROR\]|\[FATAL\]/);
 207  
 208          if (errors.length > 0) {
 209            totalErrors += errors.length;
 210  
 211            // Group by error message
 212            const grouped = this.groupByMessage(errors);
 213  
 214            // Detect looping (same error >3x in 1 hour)
 215            for (const [message, occurrences] of Object.entries(grouped)) {
 216              if (occurrences.length > 3 && this.withinOneHour(occurrences)) {
 217                loopsDetected++;
 218  
 219                await this.log('warn', 'Error loop detected', {
 220                  task_id: task.id,
 221                  message: message.substring(0, 200),
 222                  count: occurrences.length,
 223                  log_file: filePath,
 224                });
 225  
 226                // Create triage task
 227                await this.createTask({
 228                  task_type: 'classify_error',
 229                  assigned_to: 'triage',
 230                  priority: 8,
 231                  context: {
 232                    error_type: 'looping_error',
 233                    error_message: message,
 234                    count: occurrences.length,
 235                    log_file: filePath,
 236                    frequency: occurrences.length,
 237                  },
 238                });
 239              }
 240            }
 241          }
 242        } catch (error) {
 243          if (error.code !== 'ENOENT') {
 244            await this.log('error', 'Log scan failed', {
 245              task_id: task.id,
 246              file: filePath,
 247              error: error.message,
 248            });
 249          }
 250        }
 251      }
 252  
 253      // Scan for API-specific error rates across all log files
 254      const apiErrorResults = await this.scanApiErrorRates(task);
 255  
 256      // Ensure recurring tasks are scheduled
 257      await this.ensureRecurringTasks();
 258  
 259      await this.log('info', 'Log scan complete', {
 260        task_id: task.id,
 261        total_errors: totalErrors,
 262        loops_detected: loopsDetected,
 263        api_error_alerts: apiErrorResults.alertsCreated,
 264      });
 265  
 266      await this.completeTask(task.id, {
 267        total_errors: totalErrors,
 268        loops_detected: loopsDetected,
 269        api_error_rates: apiErrorResults.rates,
 270        api_error_alerts: apiErrorResults.alertsCreated,
 271      });
 272    }
 273  
 274    /**
 275     * Scan log files for API-specific error rates.
 276     * Classifies API from log content (HTTP status codes, API names).
 277     * Creates classify_error tasks when any API exceeds 10% error rate.
 278     *
 279     * @param {Object} task - Parent task for logging
 280     * @returns {Promise<{rates: Object, alertsCreated: number}>}
 281     */
 282    async scanApiErrorRates(task) {
 283      const logDir = process.env.LOGS_DIR || './logs/';
 284      const today = new Date().toISOString().slice(0, 10);
 285  
 286      // API classification patterns: map log content patterns to API names
 287      const apiPatterns = [
 288        {
 289          api: 'openrouter',
 290          patterns: [/openrouter/i, /callLLM/i, /LLM.*error/i, /model.*response/i],
 291        },
 292        { api: 'zenrows', patterns: [/zenrows/i, /SERP/i, /scrape/i] },
 293        { api: 'twilio', patterns: [/twilio/i, /SMS.*send/i, /sms.*deliver/i] },
 294        { api: 'resend', patterns: [/resend/i, /email.*send/i, /email.*deliver/i] },
 295        {
 296          api: 'playwright',
 297          patterns: [/playwright/i, /browser.*error/i, /chromium/i, /page.*crash/i],
 298        },
 299      ];
 300  
 301      // HTTP error patterns
 302      const httpErrorPattern = /\b(4\d{2}|5\d{2})\b/;
 303      const errorIndicators = /\[ERROR\]|\[FATAL\]|error|failed|timeout|ECONNRESET|ECONNREFUSED/i;
 304      const successIndicators =
 305        /\[INFO\].*(?:success|completed|processed|sent|delivered|scored|enriched)/i;
 306  
 307      // Count successes and errors per API
 308      const apiCounts = {};
 309      const apiSampleErrors = {};
 310  
 311      const logFiles = ['pipeline', 'outreach', 'inbound', 'cron', 'utils', 'agents'];
 312  
 313      for (const domain of logFiles) {
 314        const filePath = `${logDir}${domain}-${today}.log`;
 315  
 316        try {
 317          const lines = await this.readIncrementalForApiScan(filePath);
 318          for (const line of lines) {
 319            // Determine which API this line relates to
 320            let matchedApi = null;
 321            for (const { api, patterns } of apiPatterns) {
 322              if (patterns.some(p => p.test(line))) {
 323                matchedApi = api;
 324                break;
 325              }
 326            }
 327            if (!matchedApi) continue;
 328  
 329            if (!apiCounts[matchedApi]) apiCounts[matchedApi] = { success: 0, error: 0 };
 330  
 331            const isError = errorIndicators.test(line) || httpErrorPattern.test(line);
 332            const isSuccess = successIndicators.test(line);
 333  
 334            if (isError && !isSuccess) {
 335              apiCounts[matchedApi].error++;
 336              // Keep up to 5 sample errors per API
 337              if (!apiSampleErrors[matchedApi]) apiSampleErrors[matchedApi] = [];
 338              if (apiSampleErrors[matchedApi].length < 5) {
 339                apiSampleErrors[matchedApi].push(line.substring(0, 200));
 340              }
 341            } else if (isSuccess) {
 342              apiCounts[matchedApi].success++;
 343            }
 344          }
 345        } catch (error) {
 346          if (error.code !== 'ENOENT') {
 347            await this.log('error', 'API error rate scan failed for file', {
 348              task_id: task.id,
 349              file: filePath,
 350              error: error.message,
 351            });
 352          }
 353        }
 354      }
 355  
 356      // Calculate rates and alert on >10%
 357      const rates = {};
 358      let alertsCreated = 0;
 359      const ERROR_RATE_THRESHOLD = 0.1;
 360  
 361      for (const [api, counts] of Object.entries(apiCounts)) {
 362        const total = counts.success + counts.error;
 363        if (total === 0) continue;
 364        const errorRate = counts.error / total;
 365        rates[api] = {
 366          total,
 367          errors: counts.error,
 368          successes: counts.success,
 369          errorRate: `${Math.round(errorRate * 100)}%`,
 370        };
 371  
 372        if (errorRate > ERROR_RATE_THRESHOLD && counts.error >= 3) {
 373          // Check if we already have an active alert for this API
 374          const existingAlert = await getOne(
 375            `SELECT id FROM tel.agent_tasks
 376             WHERE task_type = 'classify_error'
 377               AND context_json::text LIKE $1
 378               AND status NOT IN ('completed', 'failed')
 379             LIMIT 1`,
 380            [`%api_error_rate%${api}%`]
 381          );
 382  
 383          if (!existingAlert) {
 384            await this.createTask({
 385              task_type: 'classify_error',
 386              assigned_to: 'triage',
 387              priority: 9,
 388              context: {
 389                error_type: 'api_error_rate',
 390                api,
 391                error_rate: rates[api].errorRate,
 392                total_requests: total,
 393                error_count: counts.error,
 394                sample_errors: apiSampleErrors[api] || [],
 395              },
 396            });
 397            alertsCreated++;
 398  
 399            await this.log('warn', 'High API error rate detected', {
 400              task_id: task.id,
 401              api,
 402              error_rate: rates[api].errorRate,
 403              total,
 404              errors: counts.error,
 405            });
 406          }
 407        }
 408      }
 409  
 410      return { rates, alertsCreated };
 411    }
 412  
 413    /**
 414     * Read log file for API error rate scanning.
 415     * Uses the same incremental position tracking as readIncrementally,
 416     * but with a separate position namespace to avoid conflicts.
 417     *
 418     * @param {string} filePath - Log file path
 419     * @returns {Promise<string[]>} All lines from new content
 420     */
 421    async readIncrementalForApiScan(filePath) {
 422      const posKey = `api_scan:${filePath}`;
 423      try {
 424        const stats = await fs.stat(filePath);
 425        let lastPos = this.lastReadPositions[posKey] || 0;
 426  
 427        if (stats.size < lastPos) {
 428          lastPos = 0;
 429          this.lastReadPositions[posKey] = 0;
 430        }
 431  
 432        if (stats.size === lastPos) return [];
 433  
 434        const fd = await fs.open(filePath, 'r');
 435        const buffer = Buffer.alloc(stats.size - lastPos);
 436        await fd.read(buffer, 0, buffer.length, lastPos);
 437        await fd.close();
 438  
 439        this.lastReadPositions[posKey] = stats.size;
 440        await this.saveFilePositions();
 441  
 442        return buffer.toString('utf8').split('\n').filter(Boolean);
 443      } catch (error) {
 444        if (error.code === 'ENOENT') return [];
 445        throw error;
 446      }
 447    }
 448  
 449    /**
 450     * Read log file incrementally
 451     *
 452     * @param {string} filePath - Log file path
 453     * @param {RegExp} pattern - Pattern to match
 454     * @returns {Promise<string[]>} - Matching lines
 455     */
 456    async readIncrementally(filePath, pattern) {
 457      try {
 458        const stats = await fs.stat(filePath);
 459        let lastPos = this.lastReadPositions[filePath] || 0;
 460  
 461        // Detect log cycling: if current file size is smaller than last position,
 462        // the log file was rotated and we should start from the beginning
 463        if (stats.size < lastPos) {
 464          await this.log('info', 'Log file cycling detected', {
 465            file: filePath,
 466            old_size: lastPos,
 467            new_size: stats.size,
 468            action: 'reset_position',
 469          });
 470          lastPos = 0;
 471          this.lastReadPositions[filePath] = 0;
 472        }
 473  
 474        // Only read new content
 475        const fd = await fs.open(filePath, 'r');
 476        const buffer = Buffer.alloc(stats.size - lastPos);
 477        await fd.read(buffer, 0, buffer.length, lastPos);
 478        await fd.close();
 479  
 480        const newContent = buffer.toString('utf8');
 481        this.lastReadPositions[filePath] = stats.size;
 482  
 483        // Persist positions to database after each read
 484        await this.saveFilePositions();
 485  
 486        // Find matches
 487        const matches = [];
 488        const lines = newContent.split('\n');
 489  
 490        for (const line of lines) {
 491          if (pattern.test(line) && !this.isTestError(line)) {
 492            matches.push(line);
 493          }
 494        }
 495  
 496        return matches;
 497      } catch (error) {
 498        if (error.code === 'ENOENT') {
 499          // File doesn't exist yet
 500          return [];
 501        }
 502        throw error;
 503      }
 504    }
 505  
 506    /**
 507     * Check if a log line originated from test code (suppress from prod monitoring)
 508     * Filters: test file paths, test fixture domains, placeholder values
 509     */
 510    isTestError(line) {
 511      return (
 512        /\.test\.js|tests\/|test-fixtures|__mocks__|node_modules\/.*mocha|node_modules\/.*jest/i.test(
 513          line
 514        ) ||
 515        /example\.com|failing\.com|test\.com|site 99999|outreach #\d+|\+1234567890|\+15005550/i.test(
 516          line
 517        )
 518      );
 519    }
 520  
 521    /**
 522     * Group error lines by message
 523     *
 524     * @param {string[]} errors - Error lines
 525     * @returns {Object} - Grouped by message
 526     */
 527    groupByMessage(errors) {
 528      const groups = {};
 529  
 530      for (const line of errors) {
 531        // Extract error message (after timestamp and level)
 532        const match = line.match(/\[ERROR\]\s+(.+)/);
 533        if (match) {
 534          const message = match[1];
 535          if (!groups[message]) groups[message] = [];
 536          groups[message].push(line);
 537        }
 538      }
 539  
 540      return groups;
 541    }
 542  
 543    /**
 544     * Check if occurrences are within one hour
 545     *
 546     * @param {string[]} occurrences - Error lines with timestamps
 547     * @returns {boolean} - True if within 1 hour
 548     */
 549    withinOneHour(occurrences) {
 550      if (occurrences.length < 2) return false;
 551  
 552      // Parse timestamps
 553      const timestamps = occurrences
 554        .map(line => {
 555          const match = line.match(/\[([^\]]+)\]/);
 556          return match ? new Date(match[1]) : null;
 557        })
 558        .filter(Boolean);
 559  
 560      if (timestamps.length < 2) return false;
 561  
 562      const oldest = Math.min(...timestamps.map(d => d.getTime()));
 563      const newest = Math.max(...timestamps.map(d => d.getTime()));
 564  
 565      return newest - oldest <= 60 * 60 * 1000; // 1 hour
 566    }
 567  
 568    /**
 569     * Check agent health and success/failure rates
 570     *
 571     * @param {Object} task - Task object
 572     * @returns {Promise<void>}
 573     */
 574    async checkAgentHealth(task) {
 575      await this.log('info', 'Checking agent health', {
 576        task_id: task.id,
 577      });
 578  
 579      const stats = await getAll(
 580        `SELECT assigned_to,
 581          COUNT(*) as total,
 582          SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed,
 583          SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed
 584        FROM tel.agent_tasks
 585        WHERE created_at > NOW() - INTERVAL '24 hours'
 586        GROUP BY assigned_to`
 587      );
 588  
 589      const unhealthyAgents = [];
 590  
 591      for (const agent of stats) {
 592        const failureRate = agent.total > 0 ? agent.failed / agent.total : 0;
 593        const successRate = agent.total > 0 ? agent.completed / agent.total : 0;
 594  
 595        // Circuit breaker: >30% failure rate
 596        if (failureRate > 0.3) {
 597          await this.log('error', 'Agent failure rate exceeded threshold', {
 598            task_id: task.id,
 599            agent: agent.assigned_to,
 600            failure_rate: failureRate,
 601            total: agent.total,
 602            failed: agent.failed,
 603          });
 604  
 605          // Disable agent
 606          await run(
 607            `UPDATE tel.agent_state
 608             SET status = 'blocked', metrics_json = $1
 609             WHERE agent_name = $2`,
 610            [
 611              JSON.stringify({
 612                failure_rate: failureRate,
 613                threshold: 0.3,
 614                disabled_at: new Date().toISOString(),
 615              }),
 616              agent.assigned_to,
 617            ]
 618          );
 619  
 620          // Escalate to human
 621          await addReviewItem({
 622            file: 'Agent System',
 623            reason: `Agent ${agent.assigned_to} failure rate ${(failureRate * 100).toFixed(0)}% exceeds 30% threshold`,
 624            type: 'critical',
 625            priority: 'critical',
 626          });
 627  
 628          unhealthyAgents.push(agent.assigned_to);
 629        }
 630        // Warning threshold: 15-30% failure rate
 631        else if (failureRate > 0.15) {
 632          await this.log('warn', 'Agent failure rate elevated', {
 633            task_id: task.id,
 634            agent: agent.assigned_to,
 635            failure_rate: failureRate,
 636          });
 637        }
 638  
 639        // Update metrics
 640        await run(
 641          `UPDATE tel.agent_state
 642           SET metrics_json = $1, last_active = NOW()
 643           WHERE agent_name = $2`,
 644          [
 645            JSON.stringify({
 646              success_rate: successRate,
 647              failure_rate: failureRate,
 648              total_tasks_24h: agent.total,
 649              last_health_check: new Date().toISOString(),
 650            }),
 651            agent.assigned_to,
 652          ]
 653        );
 654      }
 655  
 656      // Check agent queue depth (pending task count per agent)
 657      const queueDepths = await getAll(
 658        `SELECT assigned_to, COUNT(*) as pending_count
 659         FROM tel.agent_tasks
 660         WHERE status = 'pending'
 661         GROUP BY assigned_to`
 662      );
 663  
 664      const overloadedAgents = [];
 665      for (const queue of queueDepths) {
 666        if (queue.pending_count > 50) {
 667          await this.log('warn', 'Agent queue critical', {
 668            task_id: task.id,
 669            agent: queue.assigned_to,
 670            pending_count: queue.pending_count,
 671          });
 672          overloadedAgents.push({
 673            agent: queue.assigned_to,
 674            pending: queue.pending_count,
 675            severity: 'critical',
 676          });
 677        } else if (queue.pending_count > 20) {
 678          await this.log('warn', 'Agent queue elevated', {
 679            task_id: task.id,
 680            agent: queue.assigned_to,
 681            pending_count: queue.pending_count,
 682          });
 683          overloadedAgents.push({
 684            agent: queue.assigned_to,
 685            pending: queue.pending_count,
 686            severity: 'warning',
 687          });
 688        }
 689      }
 690  
 691      await this.completeTask(task.id, {
 692        agents_checked: stats.length,
 693        unhealthy_agents: unhealthyAgents,
 694        queue_depths: queueDepths,
 695        overloaded_agents: overloadedAgents,
 696      });
 697    }
 698  
 699    /**
 700     * Check process compliance (stage transitions)
 701     *
 702     * @param {Object} task - Task object
 703     * @returns {Promise<void>}
 704     */
 705    async checkProcessCompliance(task) {
 706      await this.log('info', 'Checking process compliance', {
 707        task_id: task.id,
 708      });
 709  
 710      const EXPECTED_FLOW = [
 711        'found',
 712        'assets_captured',
 713        'prog_scored',
 714        'semantic_scored',
 715        'vision_scored',
 716        'enriched',
 717        'proposals_drafted',
 718        'outreach_partial',
 719        'outreach_sent',
 720      ];
 721  
 722      // Find sites updated in last hour with invalid status
 723      const recentSites = await getAll(
 724        `SELECT id, domain, status, error_message, updated_at
 725         FROM sites
 726         WHERE updated_at > NOW() - INTERVAL '1 hour'
 727         AND status NOT IN ('ignored', 'failing', 'high_score', 'found', 'assets_captured', 'prog_scored', 'semantic_scored', 'vision_scored', 'enriched', 'proposals_drafted', 'outreach_partial', 'outreach_sent')
 728         LIMIT 10`
 729      );
 730  
 731      for (const site of recentSites) {
 732        await this.log('warn', 'Invalid site status detected', {
 733          task_id: task.id,
 734          site_id: site.id,
 735          domain: site.domain,
 736          status: site.status,
 737        });
 738  
 739        // Create triage task
 740        await this.createTask({
 741          task_type: 'classify_error',
 742          assigned_to: 'triage',
 743          priority: 7,
 744          context: {
 745            error_type: 'invalid_status',
 746            error_message: `Site ${site.id} has invalid status: ${site.status}`,
 747            site_id: site.id,
 748            domain: site.domain,
 749          },
 750        });
 751      }
 752  
 753      // Check for stuck sites (no progress >24 hours)
 754      const stuckSites = await getAll(
 755        `SELECT id, domain, status, updated_at
 756         FROM sites
 757         WHERE updated_at < NOW() - INTERVAL '24 hours'
 758         AND status NOT IN ('ignored', 'failing', 'high_score', 'outreach_sent')
 759         LIMIT 10`
 760      );
 761  
 762      if (stuckSites.length > 0) {
 763        await this.log('warn', 'Found stuck sites', {
 764          task_id: task.id,
 765          count: stuckSites.length,
 766        });
 767  
 768        // Add to human review queue
 769        for (const site of stuckSites.slice(0, 5)) {
 770          await addReviewItem({
 771            file: 'Database',
 772            reason: `Site ${site.id} (${site.domain}) stuck in ${site.status} for >24 hours`,
 773            type: 'process',
 774            priority: 'medium',
 775          });
 776        }
 777      }
 778  
 779      await this.completeTask(task.id, {
 780        invalid_statuses: recentSites.length,
 781        stuck_sites: stuckSites.length,
 782      });
 783    }
 784  
 785    /**
 786     * Detect resource anomalies
 787     *
 788     * @param {Object} task - Task object
 789     * @returns {Promise<void>}
 790     */
 791    async detectAnomaly(task) {
 792      await this.log('info', 'Detecting anomalies', {
 793        task_id: task.id,
 794      });
 795  
 796      const anomalies = [];
 797  
 798      // Check log file sizes
 799      const logDir = process.env.LOGS_DIR || './logs/';
 800      try {
 801        const logFiles = await fs.readdir(logDir);
 802  
 803        for (const file of logFiles) {
 804          if (!file.endsWith('.log')) continue;
 805  
 806          const stats = await fs.stat(`${logDir}${file}`);
 807          const sizeMB = stats.size / 1024 / 1024;
 808  
 809          if (sizeMB > 100) {
 810            anomalies.push({
 811              type: 'large_log_file',
 812              file,
 813              size_mb: sizeMB,
 814              severity: sizeMB > 500 ? 'critical' : 'warning',
 815            });
 816  
 817            if (sizeMB > 500) {
 818              await addReviewItem({
 819                file: `logs/${file}`,
 820                reason: `Log file exceeds 500MB (${sizeMB.toFixed(0)}MB)`,
 821                type: 'critical',
 822                priority: 'high',
 823              });
 824            }
 825          }
 826        }
 827      } catch (error) {
 828        await this.log('warn', 'Failed to check log files', {
 829          error: error.message,
 830        });
 831      }
 832  
 833      // Check database size (PostgreSQL — check pg_database_size instead of file stat)
 834      try {
 835        const dbSizeResult = await getOne(
 836          `SELECT pg_database_size(current_database()) as size_bytes`
 837        );
 838        const dbSizeMB = dbSizeResult ? dbSizeResult.size_bytes / 1024 / 1024 : 0;
 839  
 840        if (dbSizeMB > 1000) {
 841          anomalies.push({
 842            type: 'large_database',
 843            size_mb: dbSizeMB,
 844            severity: dbSizeMB > 5000 ? 'critical' : 'warning',
 845          });
 846  
 847          if (dbSizeMB > 5000) {
 848            await addReviewItem({
 849              file: 'Database',
 850              reason: `Database exceeds 5GB (${(dbSizeMB / 1024).toFixed(1)}GB) - consider VACUUM + archival`,
 851              type: 'maintenance',
 852              priority: 'high',
 853            });
 854          }
 855        }
 856      } catch (error) {
 857        await this.log('warn', 'Failed to check database size', {
 858          error: error.message,
 859        });
 860      }
 861  
 862      // Check disk usage
 863      try {
 864        const df = execSync('df -h .', { encoding: 'utf8' });
 865        const match = df.match(/(\d+)%/);
 866  
 867        if (match) {
 868          const usage = parseInt(match[1]);
 869  
 870          if (usage > 80) {
 871            anomalies.push({
 872              type: 'disk_usage',
 873              usage_percent: usage,
 874              severity: usage > 90 ? 'critical' : 'warning',
 875            });
 876  
 877            if (usage > 90) {
 878              await addReviewItem({
 879                file: 'System',
 880                reason: `Disk usage ${usage}% - cleanup required`,
 881                type: 'critical',
 882                priority: 'critical',
 883              });
 884            }
 885          }
 886        }
 887      } catch (error) {
 888        await this.log('warn', 'Failed to check disk usage', {
 889          error: error.message,
 890        });
 891      }
 892  
 893      await this.completeTask(task.id, {
 894        anomalies,
 895        anomaly_count: anomalies.length,
 896      });
 897    }
 898  
 899    /**
 900     * Check pipeline health - throughput-based bottleneck detection
 901     * Uses pipeline_metrics table for time-since-last-activity alerting.
 902     *
 903     * @param {Object} task - Task object
 904     * @returns {Promise<void>}
 905     */
 906    async checkPipelineHealth(task) {
 907      const { check_error_rates = true } = task.context_json || {};
 908  
 909      await this.log('info', 'Checking pipeline health', {
 910        task_id: task.id,
 911      });
 912  
 913      const issues = [];
 914  
 915      // Throughput-based bottleneck detection using pipeline_metrics
 916      const stages = [
 917        { name: 'assets', inputStatus: 'found', outputStatus: 'assets_captured' },
 918        { name: 'scoring', inputStatus: 'assets_captured', outputStatus: 'prog_scored' },
 919        { name: 'rescoring', inputStatus: 'prog_scored', outputStatus: 'semantic_scored' },
 920        { name: 'enrich', inputStatus: 'semantic_scored', outputStatus: 'enriched' },
 921        { name: 'proposals', inputStatus: 'enriched', outputStatus: 'proposals_drafted' },
 922        { name: 'outreach', inputStatus: 'proposals_drafted', outputStatus: 'outreach_partial' },
 923      ];
 924  
 925      const skipStages = (process.env.SKIP_STAGES || '')
 926        .split(',')
 927        .map(s => s.trim().toLowerCase())
 928        .filter(Boolean);
 929  
 930      // Read currently rate-limited stages so we don't fire false-positive stall alerts
 931      const rateLimitedStages = new Set();
 932      try {
 933        const rateLimitsFile = join(process.cwd(), 'logs/rate-limits.json');
 934        const raw = JSON.parse(await fs.readFile(rateLimitsFile, 'utf8'));
 935        for (const entry of Object.values(raw)) {
 936          for (const s of entry.stages || []) {
 937            rateLimitedStages.add(s.toLowerCase());
 938          }
 939        }
 940      } catch {
 941        // File missing or empty — no active rate limits
 942      }
 943  
 944      for (const stage of stages) {
 945        // Skip intentionally disabled stages — stall alerts would be false positives
 946        if (skipStages.includes(stage.name)) continue;
 947  
 948        // Skip rate-limited stages — stall is expected while waiting for quota reset
 949        if (rateLimitedStages.has(stage.name)) continue;
 950  
 951        const queueRow = await getOne(
 952          'SELECT COUNT(*) as count FROM sites WHERE status = $1',
 953          [stage.inputStatus]
 954        );
 955        const queueCount = queueRow ? Number(queueRow.count) : 0;
 956  
 957        // Skip stages with empty queues
 958        if (queueCount === 0) continue;
 959  
 960        // Check last run from pipeline_metrics
 961        const lastRun = await getOne(
 962          `SELECT sites_processed, duration_ms, finished_at
 963           FROM tel.pipeline_metrics WHERE stage_name = $1
 964           ORDER BY started_at DESC LIMIT 1`,
 965          [stage.name]
 966        );
 967  
 968        // Check for crash loop: stage runs but consistently processes 0 sites
 969        // This catches crashes that complete instantly (last_run is recent, stall detector misses them)
 970        const recentRuns = await getAll(
 971          `SELECT sites_processed, finished_at FROM tel.pipeline_metrics
 972           WHERE stage_name = $1 ORDER BY started_at DESC LIMIT 5`,
 973          [stage.name]
 974        );
 975  
 976        // Crash loop = rapid consecutive runs with 0 sites. Check that the 5 runs span <2 minutes.
 977        // Legitimate idle polling (outreach at 60s intervals) spans ~4 min for 5 runs — safely above.
 978        // Genuine crash loops cycle in seconds and span <30s for 5 runs.
 979        const crashSpanMs =
 980          recentRuns.length >= 2
 981            ? new Date(recentRuns[0].finished_at).getTime() -
 982              new Date(recentRuns[recentRuns.length - 1].finished_at).getTime()
 983            : Infinity;
 984  
 985        const crashLoopDetected =
 986          recentRuns.length >= 3 &&
 987          recentRuns.every(r => r.sites_processed === 0) &&
 988          recentRuns[0]?.finished_at &&
 989          (Date.now() - new Date(recentRuns[0].finished_at).getTime()) / 60000 < 120 && // ran recently
 990          crashSpanMs < 2 * 60 * 1000; // 5 runs within 2 minutes = genuine rapid loop
 991  
 992        if (crashLoopDetected) {
 993          // Scan stage-specific log for the error
 994          let crashError = null;
 995          try {
 996            const today = new Date().toISOString().slice(0, 10);
 997            const stageLogPath = join(process.cwd(), `logs/${stage.name}-${today}.log`);
 998            const stageLogContent = await fs.readFile(stageLogPath, 'utf8');
 999            const errorPattern =
1000              /\[ERROR\].*stage failed[^\n]*\n(?:[\s\S]{0,200}"message":\s*"([^"]{0,300})")?/g;
1001            const matches = [...stageLogContent.matchAll(errorPattern)];
1002            if (matches.length >= 2) {
1003              crashError = matches[matches.length - 1][1] || 'Unknown crash error';
1004            }
1005          } catch {
1006            // Log not readable
1007          }
1008  
1009          await this.createTask({
1010            task_type: 'classify_error',
1011            assigned_to: 'triage',
1012            priority: 9,
1013            context: {
1014              error_type: 'stage_crash_loop',
1015              error_message: `${stage.name} stage crash loop: ran ${recentRuns.length} times processing 0 sites with ${queueCount} queued. Last error: ${crashError || 'check logs'}`,
1016              stage: stage.name,
1017              queue_count: queueCount,
1018              consecutive_zero_runs: recentRuns.length,
1019              stack_trace: crashError || null,
1020            },
1021          });
1022  
1023          await addReviewItem({
1024            file: 'Pipeline',
1025            reason: `${stage.name} crash loop: ${recentRuns.length} consecutive runs with 0 sites processed despite ${queueCount} queued`,
1026            type: 'critical',
1027            priority: 'critical',
1028          });
1029        }
1030  
1031        if (lastRun?.finished_at) {
1032          const minutesSinceRun = (Date.now() - new Date(lastRun.finished_at).getTime()) / 60000;
1033  
1034          if (minutesSinceRun > 30) {
1035            issues.push({
1036              type: 'stage_stalled',
1037              stage: stage.name,
1038              queue_count: queueCount,
1039              minutes_since_run: Math.round(minutesSinceRun),
1040              severity: 'critical',
1041            });
1042  
1043            await this.log('warn', 'Stage stalled with queue waiting', {
1044              task_id: task.id,
1045              stage: stage.name,
1046              queue_count: queueCount,
1047              minutes_since_run: Math.round(minutesSinceRun),
1048            });
1049  
1050            // Check if the stall is caused by a crash: scan pipeline log for recent "✗ StageName failed" errors
1051            let lastCrashError = null;
1052            try {
1053              const today = new Date().toISOString().slice(0, 10);
1054              const logPath = join(process.cwd(), `logs/pipeline-${today}.log`);
1055              const logContent = await fs.readFile(logPath, 'utf8');
1056              const stageLabel = stage.name.charAt(0).toUpperCase() + stage.name.slice(1);
1057              const crashPattern = new RegExp(
1058                `✗ ${stageLabel} failed:[\\s\\S]{0,20}"stack":\\s*"([^"]{0,400})"`,
1059                'g'
1060              );
1061              const matches = [...logContent.matchAll(crashPattern)];
1062              if (matches.length >= 2) {
1063                // Stage has crashed multiple times — it's a code crash, not a stall
1064                lastCrashError = matches[matches.length - 1][1].replace(/\\n/g, '\n');
1065              }
1066            } catch {
1067              // Log not readable — proceed without crash context
1068            }
1069  
1070            await this.createTask({
1071              task_type: 'classify_error',
1072              assigned_to: 'triage',
1073              priority: 9,
1074              context: {
1075                error_type: lastCrashError ? 'stage_crash_loop' : 'stage_stalled',
1076                error_message: lastCrashError
1077                  ? `${stage.name} stage crashing on every run: ${lastCrashError.substring(0, 300)}`
1078                  : `${stage.name} stalled ${Math.round(minutesSinceRun)}min with ${queueCount} queued`,
1079                stage: stage.name,
1080                queue_count: queueCount,
1081                stack_trace: lastCrashError || null,
1082              },
1083            });
1084  
1085            await addReviewItem({
1086              file: 'Pipeline',
1087              reason: `${stage.name} stage stalled ${Math.round(minutesSinceRun)}min with ${queueCount} sites waiting`,
1088              type: 'critical',
1089              priority: 'critical',
1090            });
1091          } else if (minutesSinceRun > 15) {
1092            issues.push({
1093              type: 'stage_slow',
1094              stage: stage.name,
1095              queue_count: queueCount,
1096              minutes_since_run: Math.round(minutesSinceRun),
1097              severity: 'warning',
1098            });
1099  
1100            await this.log('warn', 'Stage slow with queue waiting', {
1101              task_id: task.id,
1102              stage: stage.name,
1103              queue_count: queueCount,
1104              minutes_since_run: Math.round(minutesSinceRun),
1105            });
1106          }
1107        } else if (queueCount > 100) {
1108          // No pipeline_metrics for this stage but large queue
1109          issues.push({
1110            type: 'stage_no_metrics',
1111            stage: stage.name,
1112            queue_count: queueCount,
1113            severity: 'warning',
1114          });
1115        }
1116      }
1117  
1118      // Check error rates per stage
1119      if (check_error_rates) {
1120        const failingByStage = await getAll(
1121          `SELECT error_message, COUNT(*) as count
1122           FROM sites
1123           WHERE status = 'failing'
1124           AND error_message IS NOT NULL
1125           AND updated_at > NOW() - INTERVAL '24 hours'
1126           GROUP BY error_message
1127           HAVING COUNT(*) > 5
1128           ORDER BY count DESC
1129           LIMIT 10`
1130        );
1131  
1132        for (const error of failingByStage) {
1133          issues.push({
1134            type: 'high_error_rate',
1135            error_message: error.error_message.substring(0, 200),
1136            count: error.count,
1137            severity: error.count > 50 ? 'high' : 'medium',
1138          });
1139  
1140          await this.log('warn', 'High error rate detected', {
1141            task_id: task.id,
1142            error: error.error_message.substring(0, 100),
1143            count: error.count,
1144          });
1145  
1146          await this.createTask({
1147            task_type: 'classify_error',
1148            assigned_to: 'triage',
1149            priority: error.count > 50 ? 9 : 7,
1150            context: {
1151              error_type: 'high_frequency_error',
1152              error_message: error.error_message,
1153              frequency: error.count,
1154            },
1155          });
1156        }
1157      }
1158  
1159      // Check for repeated proposal JSON parse failures (LLM returns plain text for large contact lists)
1160      try {
1161        const proposalLogPath = join(
1162          process.cwd(),
1163          `logs/proposals-${new Date().toISOString().slice(0, 10)}.log`
1164        );
1165        const proposalLog = await fs.readFile(proposalLogPath, 'utf8').catch(() => '');
1166        const recentLog = proposalLog.slice(-50000); // last ~50KB
1167        const invalidFormatMatches = [...recentLog.matchAll(/Invalid proposal response format/g)];
1168        const parseFailMatches = [...recentLog.matchAll(/Failed to parse JSON, returning fallback/g)];
1169        const totalFails = invalidFormatMatches.length + parseFailMatches.length;
1170        if (totalFails >= 5) {
1171          issues.push({
1172            type: 'proposal_json_parse_failures',
1173            count: totalFails,
1174            severity: totalFails >= 20 ? 'high' : 'medium',
1175          });
1176          await this.createTask({
1177            task_type: 'fix_bug',
1178            assigned_to: 'developer',
1179            priority: 8,
1180            context: {
1181              error_type: 'proposal_json_parse_failures',
1182              error_message: `LLM returns plain text instead of JSON for large contact batches (${totalFails} failures in proposals log today)`,
1183              stack_trace:
1184                'storeProposalVariant in proposal-generator-v2.js — LLM ignores json_mode when contact count is high (25+). Fix: batch contacts into chunks of ≤15 per LLM call.',
1185              file_path: 'src/proposal-generator-v2.js',
1186              severity: 'medium',
1187            },
1188          });
1189        }
1190      } catch {
1191        // Non-fatal — proposal log check is best-effort
1192      }
1193  
1194      // Check for bogus phone numbers stored as pending SMS outreaches
1195      // LLM hallucinations: numbers like +0401040, +01520153 (start with +0, not valid E.164)
1196      try {
1197        const bogusRow = await getOne(
1198          `SELECT COUNT(*) as count FROM messages
1199           WHERE direction='outbound' AND contact_method='sms' AND approval_status='pending'
1200           AND contact_uri !~ '^[+][1-9]'`
1201        );
1202        const bogusSmsCnt = bogusRow ? Number(bogusRow.count) : 0;
1203  
1204        if (bogusSmsCnt >= 10) {
1205          issues.push({
1206            type: 'bogus_sms_phone_numbers',
1207            count: bogusSmsCnt,
1208            severity: bogusSmsCnt >= 50 ? 'high' : 'medium',
1209          });
1210          await this.createTask({
1211            task_type: 'fix_bug',
1212            assigned_to: 'developer',
1213            priority: 8,
1214            context: {
1215              error_type: 'bogus_sms_phone_numbers',
1216              error_message: `${bogusSmsCnt} pending SMS messages have invalid phone numbers (not E.164: start with +0 or similar LLM hallucination patterns)`,
1217              stack_trace:
1218                'getAllContacts() in src/contacts/prioritize.js passes landline numbers through without E.164 validation. ' +
1219                'Fix: add regex /^\\+[1-9]\\d{7,14}$/ guard before mobile/landline split. ' +
1220                "Cleanup: UPDATE messages SET delivery_status='failed', error_message='Invalid phone number' WHERE direction='outbound' AND contact_method='sms' AND approval_status='pending' AND contact_uri !~ '^\\+[1-9]'",
1221              file_path: 'src/contacts/prioritize.js',
1222              severity: 'medium',
1223            },
1224          });
1225        }
1226      } catch {
1227        // Non-fatal
1228      }
1229  
1230      // Check for country calling code mismatches in pending SMS outreaches
1231      // e.g. +60 (Malaysia) stored for NZ site — passes E.164 regex but wrong country
1232      try {
1233        const mismatchRow = await getOne(
1234          `SELECT COUNT(*) as count FROM messages o
1235           JOIN sites s ON o.site_id = s.id
1236           WHERE o.direction='outbound' AND o.contact_method='sms' AND o.approval_status='pending'
1237             AND s.country_code IN ('NZ','AU','US','CA','GB','IE','IN','SG','ZA')
1238             AND NOT (
1239               (s.country_code IN ('US','CA') AND o.contact_uri LIKE '+1%')
1240               OR (s.country_code='NZ' AND o.contact_uri LIKE '+64%')
1241               OR (s.country_code='AU' AND o.contact_uri LIKE '+61%')
1242               OR (s.country_code='GB' AND o.contact_uri LIKE '+44%')
1243               OR (s.country_code='IE' AND o.contact_uri LIKE '+353%')
1244               OR (s.country_code='IN' AND o.contact_uri LIKE '+91%')
1245               OR (s.country_code='SG' AND o.contact_uri LIKE '+65%')
1246               OR (s.country_code='ZA' AND o.contact_uri LIKE '+27%')
1247             )`
1248        );
1249        const mismatchCnt = mismatchRow ? Number(mismatchRow.count) : 0;
1250  
1251        if (mismatchCnt >= 10) {
1252          issues.push({
1253            type: 'sms_country_code_mismatch',
1254            count: mismatchCnt,
1255            severity: mismatchCnt >= 50 ? 'high' : 'medium',
1256          });
1257          await this.createTask({
1258            task_type: 'fix_bug',
1259            assigned_to: 'developer',
1260            priority: 8,
1261            context: {
1262              error_type: 'sms_country_code_mismatch',
1263              error_message: `${mismatchCnt} pending SMS messages have wrong country calling code (e.g. +60 Malaysia for NZ site). These will fail with Twilio.`,
1264              stack_trace:
1265                'getAllContacts() in src/contacts/prioritize.js now validates country.phoneFormat against phone prefix. ' +
1266                "Cleanup: UPDATE messages SET delivery_status='failed', error_message='Invalid phone: country code mismatch' WHERE direction='outbound' AND ... (see monitor.js for full SQL)",
1267              file_path: 'src/contacts/prioritize.js',
1268              severity: 'medium',
1269            },
1270          });
1271        }
1272      } catch {
1273        // Non-fatal
1274      }
1275  
1276      // Check for landline numbers stored as pending/approved SMS outreaches
1277      // These will fail — SMS requires mobile numbers. Landlines were previously added
1278      // as fallback in prioritize.js but the fallback has now been removed.
1279      try {
1280        const landlineRow = await getOne(
1281          `SELECT COUNT(*) as count FROM messages o
1282           JOIN sites s ON o.site_id = s.id
1283           WHERE o.direction='outbound' AND o.contact_method='sms' AND o.approval_status IN ('pending','approved')
1284             AND (
1285               (s.country_code='AU' AND o.contact_uri NOT LIKE '+614%')
1286               OR (s.country_code='NZ' AND o.contact_uri LIKE '+64%'
1287                   AND o.contact_uri NOT LIKE '+6421%' AND o.contact_uri NOT LIKE '+6422%'
1288                   AND o.contact_uri NOT LIKE '+6427%' AND o.contact_uri NOT LIKE '+6428%'
1289                   AND o.contact_uri NOT LIKE '+6429%' AND o.contact_uri NOT LIKE '+642%')
1290               OR (s.country_code='GB' AND o.contact_uri LIKE '+44%' AND o.contact_uri NOT LIKE '+447%')
1291               OR (s.country_code='IE' AND o.contact_uri LIKE '+353%' AND o.contact_uri NOT LIKE '+3538%')
1292             )`
1293        );
1294        const landlineCnt = landlineRow ? Number(landlineRow.count) : 0;
1295        if (landlineCnt >= 5) {
1296          issues.push({
1297            type: 'sms_landline_numbers',
1298            severity: 'medium',
1299            message: `${landlineCnt} pending/approved SMS messages are landline numbers that will fail`,
1300            fix: "UPDATE messages SET delivery_status='failed', error_message='Landline number — SMS delivery not supported' WHERE direction='outbound' AND ... (join with sites for country filter)",
1301            context: {
1302              error_type: 'sms_landline_numbers',
1303              error_message: `${landlineCnt} SMS messages target landline numbers (AU/NZ/GB/IE). SMS will fail. The landline fallback was removed from prioritize.js but historical records remain.`,
1304              stack_trace:
1305                'Landline fallback removed from getAllContacts() in src/contacts/prioritize.js. ' +
1306                "Cleanup: UPDATE messages SET delivery_status='failed', error_message='Landline number' WHERE direction='outbound' AND ... (filter by country+non-mobile prefix)",
1307              file_path: 'src/contacts/prioritize.js',
1308              severity: 'medium',
1309            },
1310          });
1311        }
1312      } catch {
1313        // Non-fatal
1314      }
1315  
1316      // Check Twilio balance — alert if < $10 (enough for ~1,265 SMS at $0.0079 each)
1317      // Blind spot: outreach keeps sending until balance hits $0, causing cascading failures.
1318      try {
1319        const sid = process.env.TWILIO_ACCOUNT_SID;
1320        const token = process.env.TWILIO_AUTH_TOKEN;
1321        if (sid && token) {
1322          const auth = Buffer.from(`${sid}:${token}`).toString('base64');
1323          const res = await fetch(`https://api.twilio.com/2010-04-01/Accounts/${sid}/Balance.json`, {
1324            headers: { Authorization: `Basic ${auth}` },
1325          });
1326          if (res.ok) {
1327            const data = await res.json();
1328            const balance = parseFloat(data.balance);
1329            if (!isNaN(balance) && balance < 10) {
1330              issues.push({
1331                type: 'twilio_balance_low',
1332                severity: balance < 5 ? 'high' : 'medium',
1333                message: `Twilio balance is $${balance.toFixed(2)} — SMS outreach will stop when it hits $0`,
1334                fix: 'Top up Twilio balance at console.twilio.com',
1335                context: {
1336                  error_type: 'twilio_balance_low',
1337                  error_message: `Twilio balance is $${balance.toFixed(2)} USD. At ~$0.0079/SMS, this covers ~${Math.floor(balance / 0.0079)} more SMS. Add credits at console.twilio.com before outreach stops.`,
1338                  stack_trace: 'Twilio balance check in monitor.js checkPipelineHealth()',
1339                  file_path: 'src/outreach/sms.js',
1340                  severity: balance < 5 ? 'high' : 'medium',
1341                },
1342              });
1343            }
1344          }
1345        }
1346      } catch {
1347        // Non-fatal — Twilio balance check is best-effort
1348      }
1349  
1350      await this.log('info', 'Pipeline health check complete', {
1351        task_id: task.id,
1352        issues_found: issues.length,
1353      });
1354  
1355      await this.completeTask(task.id, {
1356        issues,
1357        total_issues: issues.length,
1358        stalled: issues.filter(i => i.type === 'stage_stalled').length,
1359        slow: issues.filter(i => i.type === 'stage_slow').length,
1360        error_rates: issues.filter(i => i.type === 'high_error_rate').length,
1361      });
1362    }
1363  
1364    /**
1365     * Get numeric order for pipeline stages
1366     *
1367     * @param {string} status - Status name
1368     * @returns {number} - Order (higher = later in pipeline)
1369     */
1370    getStageOrder(status) {
1371      const order = {
1372        found: 1,
1373        assets_captured: 2,
1374        prog_scored: 3,
1375        semantic_scored: 4,
1376        vision_scored: 4,
1377        enriched: 5,
1378        proposals_drafted: 6,
1379        outreach_partial: 7,
1380        outreach_sent: 8,
1381      };
1382  
1383      return order[status] || 0;
1384    }
1385  
1386    /**
1387     * Check Service-Level Objectives (SLO) compliance
1388     *
1389     * @param {Object} task - Task object
1390     * @returns {Promise<void>}
1391     */
1392    async checkSLOCompliance(task) {
1393      await this.log('info', 'Checking SLO compliance', {
1394        task_id: task.id,
1395      });
1396  
1397      const summary = getSLOSummary();
1398      const violations = checkSLOCompliance();
1399  
1400      if (violations.length > 0) {
1401        await this.log('warn', 'SLO violations detected', {
1402          task_id: task.id,
1403          total_violations: violations.length,
1404          compliance_rate: summary.compliance_rate,
1405        });
1406  
1407        // Create Architect tasks for each violation
1408        for (const violation of violations) {
1409          await this.log('warn', 'SLO violation', {
1410            task_id: task.id,
1411            stage: violation.slo.stage_name,
1412            description: violation.violation_description,
1413            severity: violation.violation_severity,
1414            actual_p95: violation.actual.p95,
1415            target: violation.slo.target_duration_minutes,
1416          });
1417  
1418          // Determine priority based on severity
1419          const priority =
1420            {
1421              critical: 10,
1422              high: 8,
1423              medium: 6,
1424              low: 5,
1425            }[violation.violation_severity] || 6;
1426  
1427          // Create Architect task to investigate and optimize
1428          await this.createTask({
1429            task_type: 'design_optimization',
1430            assigned_to: 'architect',
1431            priority,
1432            context: {
1433              optimization_type: 'slo_violation',
1434              stage_name: violation.slo.stage_name,
1435              description: violation.violation_description,
1436              current_p95: violation.actual.p95,
1437              target_duration: violation.slo.target_duration_minutes,
1438              severity: violation.violation_severity,
1439              sample_size: violation.actual.totalSites,
1440            },
1441          });
1442  
1443          // Add to human review for critical violations
1444          if (violation.violation_severity === 'critical') {
1445            await addReviewItem({
1446              file: 'Pipeline Performance',
1447              reason: violation.violation_description,
1448              type: 'critical',
1449              priority: 'critical',
1450            });
1451          }
1452        }
1453      } else {
1454        await this.log('info', 'All SLOs compliant', {
1455          task_id: task.id,
1456          total_slos: summary.total_slos,
1457        });
1458      }
1459  
1460      await this.completeTask(task.id, {
1461        total_slos: summary.total_slos,
1462        violations: violations.length,
1463        compliance_rate: summary.compliance_rate,
1464        violations_detail: violations,
1465      });
1466    }
1467  
1468    /**
1469     * Check for loops - site retry loops, agent task bounce, log domain frequency
1470     *
1471     * @param {Object} task - Task object
1472     * @returns {Promise<void>}
1473     */
1474    async checkLoops(task) {
1475      await this.log('info', 'Checking for loops', { task_id: task.id });
1476  
1477      const loops = [];
1478  
1479      // A. Site retry loops (recapture_count > 3)
1480      const siteLoops = await getAll(
1481        `SELECT id, domain, status, recapture_count
1482         FROM sites
1483         WHERE recapture_count > 3
1484         ORDER BY recapture_count DESC LIMIT 20`
1485      );
1486  
1487      for (const site of siteLoops) {
1488        loops.push({
1489          type: 'site_retry_loop',
1490          site_id: site.id,
1491          domain: site.domain,
1492          recapture_count: site.recapture_count,
1493          status: site.status,
1494        });
1495      }
1496  
1497      if (siteLoops.length > 0) {
1498        await this.log('warn', 'Site retry loops detected', {
1499          task_id: task.id,
1500          count: siteLoops.length,
1501          worst: siteLoops[0]?.domain,
1502          worst_count: siteLoops[0]?.recapture_count,
1503        });
1504      }
1505  
1506      // B. Agent task bounce loops (parent spawned >3 children in 24hr)
1507      const bounceLoops = await getAll(
1508        `SELECT parent_task_id, COUNT(*) as bounce_count,
1509                STRING_AGG(DISTINCT assigned_to::text, ',') as agents
1510         FROM tel.agent_tasks
1511         WHERE parent_task_id IS NOT NULL
1512           AND created_at > NOW() - INTERVAL '24 hours'
1513         GROUP BY parent_task_id
1514         HAVING COUNT(*) > 3`
1515      );
1516  
1517      for (const bounce of bounceLoops) {
1518        loops.push({
1519          type: 'agent_bounce_loop',
1520          parent_task_id: bounce.parent_task_id,
1521          bounce_count: bounce.bounce_count,
1522          agents: bounce.agents,
1523        });
1524      }
1525  
1526      if (bounceLoops.length > 0) {
1527        await this.log('warn', 'Agent task bounce loops detected', {
1528          task_id: task.id,
1529          count: bounceLoops.length,
1530        });
1531      }
1532  
1533      // C. Log-based domain frequency (>10 appearances in recent pipeline log)
1534      const logDir = process.env.LOGS_DIR || './logs/';
1535      const today = new Date().toISOString().slice(0, 10);
1536      const logPath = `${logDir}pipeline-${today}.log`;
1537      let domainLoops = [];
1538  
1539      try {
1540        const content = await fs.readFile(logPath, 'utf8');
1541        const lines = content.split('\n').slice(-500); // Last 500 lines
1542        const domainCounts = {};
1543  
1544        for (const line of lines) {
1545          // Extract domain from log lines (common patterns)
1546          const domainMatch = line.match(/(?:domain|site|url)[=: ]+([a-zA-Z0-9.-]+\.[a-z]{2,})/i);
1547          if (domainMatch) {
1548            const domain = domainMatch[1].toLowerCase();
1549            domainCounts[domain] = (domainCounts[domain] || 0) + 1;
1550          }
1551        }
1552  
1553        domainLoops = Object.entries(domainCounts)
1554          .filter(([, count]) => count > 10)
1555          .map(([domain, count]) => ({ domain, count }))
1556          .sort((a, b) => b.count - a.count);
1557  
1558        for (const dl of domainLoops) {
1559          loops.push({
1560            type: 'log_domain_loop',
1561            domain: dl.domain,
1562            log_count: dl.count,
1563          });
1564        }
1565      } catch {
1566        // Log file may not exist
1567      }
1568  
1569      await this.completeTask(task.id, {
1570        total_loops: loops.length,
1571        site_retry_loops: siteLoops.length,
1572        agent_bounce_loops: bounceLoops.length,
1573        log_domain_loops: domainLoops.length,
1574        loops,
1575      });
1576    }
1577  
1578    /**
1579     * Check blocked agent tasks and create triage tasks for investigation.
1580     * Replaces the old watchdog auto-cancel approach with proper triage.
1581     *
1582     * @param {Object} task - Task object
1583     * @returns {Promise<void>}
1584     */
1585    async checkBlockedTasks(task) {
1586      await this.log('info', 'Checking blocked tasks', { task_id: task.id });
1587  
1588      const blockedTasks = await getAll(
1589        `SELECT id, task_type, assigned_to, context_json, error_message, created_at,
1590                EXTRACT(EPOCH FROM (NOW() - created_at))/3600 as hours_blocked
1591         FROM tel.agent_tasks
1592         WHERE status = 'blocked'
1593           AND created_at < NOW() - INTERVAL '2 hours'
1594         ORDER BY created_at ASC`
1595      );
1596  
1597      let triageCreated = 0;
1598      let patternFixCreated = 0;
1599  
1600      // Group blocked tasks by error message prefix (first 60 chars) for pattern detection
1601      const errorGroups = {};
1602      for (const blocked of blockedTasks) {
1603        const errPrefix = (blocked.error_message || '').substring(0, 60);
1604        if (!errPrefix) continue;
1605        if (!errorGroups[errPrefix]) errorGroups[errPrefix] = [];
1606        errorGroups[errPrefix].push(blocked);
1607      }
1608  
1609      // Track which task IDs are covered by a pattern group fix_bug task
1610      const coveredByPattern = new Set();
1611  
1612      // For groups with >3 tasks sharing the same error prefix, create a single fix_bug task
1613      for (const [errPrefix, tasks] of Object.entries(errorGroups)) {
1614        if (tasks.length <= 3) continue;
1615  
1616        // Check if a fix_bug task already exists for this error pattern
1617        const existingFix = await getOne(
1618          `SELECT id FROM tel.agent_tasks
1619           WHERE task_type = 'fix_bug'
1620             AND context_json::text LIKE $1
1621             AND status NOT IN ('completed', 'failed')
1622           LIMIT 1`,
1623          [`%${errPrefix.substring(0, 40)}%`]
1624        );
1625  
1626        if (!existingFix) {
1627          await this.createTask({
1628            task_type: 'fix_bug',
1629            assigned_to: 'developer',
1630            priority: 9,
1631            context: {
1632              error_type: 'pattern_blocked_tasks',
1633              error_prefix: errPrefix,
1634              affected_task_count: tasks.length,
1635              affected_task_ids: tasks.map(t => t.id),
1636              sample_error: tasks[0].error_message,
1637              task_types: [...new Set(tasks.map(t => t.task_type))],
1638            },
1639          });
1640          patternFixCreated++;
1641  
1642          await this.log('warn', 'Pattern detected in blocked tasks', {
1643            task_id: task.id,
1644            error_prefix: errPrefix,
1645            affected_count: tasks.length,
1646          });
1647        }
1648  
1649        for (const t of tasks) coveredByPattern.add(t.id);
1650      }
1651  
1652      // For remaining blocked tasks not covered by a pattern, create individual triage tasks
1653      for (const blocked of blockedTasks) {
1654        if (coveredByPattern.has(blocked.id)) continue;
1655  
1656        const existingTriage = await getOne(
1657          `SELECT id FROM tel.agent_tasks
1658           WHERE parent_task_id = $1 AND task_type = 'triage_error'
1659             AND status NOT IN ('completed', 'failed')
1660           LIMIT 1`,
1661          [blocked.id]
1662        );
1663  
1664        if (!existingTriage) {
1665          await this.createTask({
1666            task_type: 'classify_error',
1667            assigned_to: 'triage',
1668            priority: 8,
1669            parent_task_id: blocked.id,
1670            context: {
1671              error_type: 'blocked_task',
1672              blocked_task_id: blocked.id,
1673              blocked_task_type: blocked.task_type,
1674              blocked_agent: blocked.assigned_to,
1675              hours_blocked: Math.round(blocked.hours_blocked),
1676              error_message:
1677                blocked.error_message ||
1678                `Task #${blocked.id} (${blocked.task_type}) blocked for ${Math.round(blocked.hours_blocked)}h`,
1679            },
1680          });
1681          triageCreated++;
1682        }
1683      }
1684  
1685      await this.log('info', 'Blocked task check complete', {
1686        task_id: task.id,
1687        total_blocked: blockedTasks.length,
1688        triage_created: triageCreated,
1689        pattern_fix_created: patternFixCreated,
1690        covered_by_pattern: coveredByPattern.size,
1691      });
1692  
1693      await this.completeTask(task.id, {
1694        total_blocked: blockedTasks.length,
1695        triage_created: triageCreated,
1696        pattern_fix_created: patternFixCreated,
1697        covered_by_pattern: coveredByPattern.size,
1698      });
1699    }
1700  
1701    /**
1702     * Analyse the rate-limit event log and surface patterns that indicate the
1703     * scheduler config needs tuning.
1704     *
1705     * Reads logs/rate-limit-events.jsonl (written by rate-limit-scheduler.js).
1706     * Detects three signal types (last 7 days):
1707     *
1708     *   wait_too_short   — Same API rate-limited again within 2 h of a previous clear.
1709     *                      Suggests the configured wait window is shorter than the
1710     *                      actual API quota window (e.g. use 'daily' instead of 'hourly').
1711     *
1712     *   false_positive   — Circuit breaker cleared (API recovered) with >50 % of the
1713     *                      scheduled wait still remaining.  Suggests timeout-based
1714     *                      detection fired for a genuine outage, not a quota exhaustion.
1715     *                      Consider raising the detectFromTimeouts threshold, or using
1716     *                      a shorter limitType (e.g. 'hourly' instead of 'daily').
1717     *
1718     *   high_frequency   — Same API rate-limited >3 times in 24 h.
1719     *                      Suggests usage is consistently hitting the quota ceiling;
1720     *                      consider reducing concurrency or spreading requests over time.
1721     *
1722     * Also surfaces any currently active rate limits so they appear in the
1723     * agent task history for human reviewers.
1724     *
1725     * @param {object} task - agent task context
1726     */
1727    async checkRateLimitPatterns(task) {
1728      await this.log('info', 'Checking rate limit patterns', { task_id: task.id });
1729  
1730      const eventsFilePath = join(process.cwd(), 'logs/rate-limit-events.jsonl');
1731      const sevenDaysAgo = Date.now() - 7 * 24 * 60 * 60 * 1000;
1732  
1733      // Parse the event log
1734      let events = [];
1735      try {
1736        const raw = await fs.readFile(eventsFilePath, 'utf8');
1737        events = raw
1738          .split('\n')
1739          .filter(Boolean)
1740          .map(line => {
1741            try {
1742              return JSON.parse(line);
1743            } catch {
1744              return null;
1745            }
1746          })
1747          .filter(e => e && new Date(e.timestamp).getTime() >= sevenDaysAgo);
1748      } catch {
1749        // File may not exist yet — no events to analyse
1750      }
1751  
1752      const findings = [];
1753  
1754      if (events.length > 0) {
1755        // ── Signal 1: wait_too_short ─────────────────────────────────────────
1756        // Same API: set event arrives within 2 h of a clear/expired event
1757        const TWO_HOURS = 2 * 60 * 60 * 1000;
1758        const byApi = {};
1759        for (const e of events) {
1760          if (!byApi[e.api]) byApi[e.api] = [];
1761          byApi[e.api].push(e);
1762        }
1763  
1764        for (const [api, apiEvents] of Object.entries(byApi)) {
1765          const sorted = apiEvents.sort((a, b) => new Date(a.timestamp) - new Date(b.timestamp));
1766          let lastClear = null;
1767          for (const e of sorted) {
1768            if (e.type === 'clear' || e.type === 'expired') {
1769              lastClear = new Date(e.timestamp).getTime();
1770            } else if (e.type === 'set' && lastClear !== null) {
1771              const gap = new Date(e.timestamp).getTime() - lastClear;
1772              if (gap < TWO_HOURS) {
1773                findings.push({
1774                  signal: 'wait_too_short',
1775                  api,
1776                  gapMinutes: Math.round(gap / 60_000),
1777                  suggestion: `${api}: rate limit re-triggered only ${Math.round(gap / 60_000)} min after previous clear. Consider increasing limitType from 'hourly' to 'daily', or reduce ZENROWS_CONCURRENCY.`,
1778                });
1779                break; // one finding per API is enough
1780              }
1781            }
1782          }
1783        }
1784  
1785        // ── Signal 2: false_positive ─────────────────────────────────────────
1786        // clear event with earlyByMs > 50% of scheduledWaitMs
1787        for (const e of events) {
1788          if (e.type === 'clear' && e.earlyByMs > 0 && e.actualWaitMs > 0) {
1789            const scheduledWaitMs = e.actualWaitMs + e.earlyByMs;
1790            const earlyPct = (e.earlyByMs / scheduledWaitMs) * 100;
1791            if (earlyPct > 50) {
1792              findings.push({
1793                signal: 'false_positive',
1794                api: e.api,
1795                earlyPct: Math.round(earlyPct),
1796                earlyByMinutes: Math.round(e.earlyByMs / 60_000),
1797                suggestion: `${e.api}: breaker closed ${Math.round(e.earlyByMs / 60_000)} min early (${Math.round(earlyPct)}% of scheduled wait unused). Possible outage misclassified as quota exhaustion. Consider using a shorter limitType or raising detectFromTimeouts threshold.`,
1798              });
1799            }
1800          }
1801        }
1802  
1803        // ── Signal 3: high_frequency ─────────────────────────────────────────
1804        // Same API: >3 'set' events in any rolling 24-hour window
1805        const DAY = 24 * 60 * 60 * 1000;
1806        for (const [api, apiEvents] of Object.entries(byApi)) {
1807          const sets = apiEvents
1808            .filter(e => e.type === 'set')
1809            .map(e => new Date(e.timestamp).getTime());
1810          if (sets.length > 3) {
1811            // Slide a 24h window
1812            for (let i = 0; i < sets.length; i++) {
1813              const windowEnd = sets[i] + DAY;
1814              const inWindow = sets.filter(t => t >= sets[i] && t <= windowEnd).length;
1815              if (inWindow > 3) {
1816                findings.push({
1817                  signal: 'high_frequency',
1818                  api,
1819                  triggersIn24h: inWindow,
1820                  suggestion: `${api}: rate-limited ${inWindow} times in 24 h. Usage consistently hitting quota ceiling. Consider reducing concurrency (ZENROWS_CONCURRENCY) or scheduling scraping to spread load across the day.`,
1821                });
1822                break;
1823              }
1824            }
1825          }
1826        }
1827      }
1828  
1829      // Also surface any currently active limits
1830      const rateLimitsFile = join(process.cwd(), 'logs/rate-limits.json');
1831      let activeLimits = [];
1832      try {
1833        const raw = JSON.parse(await fs.readFile(rateLimitsFile, 'utf8'));
1834        const now = Date.now();
1835        activeLimits = Object.entries(raw)
1836          .filter(([, v]) => Number(v.resetAt) > now)
1837          .map(([api, v]) => ({
1838            api,
1839            stages: v.stages,
1840            reason: v.reason,
1841            resetAt: new Date(Number(v.resetAt)).toISOString(),
1842            waitMinutes: Math.ceil((Number(v.resetAt) - now) / 60_000),
1843          }));
1844      } catch {
1845        /* no active limits or file missing */
1846      }
1847  
1848      if (findings.length > 0) {
1849        await this.log('warn', 'Rate limit pattern issues detected', {
1850          task_id: task.id,
1851          findings: findings.length,
1852          details: findings.map(f => f.suggestion),
1853        });
1854  
1855        // Add to human review queue so the suggestion isn't missed
1856        try {
1857          await addReviewItem({
1858            category: 'rate_limit_tuning',
1859            priority: 'medium',
1860            title: `Rate limit scheduler needs tuning (${findings.length} signal${findings.length > 1 ? 's' : ''})`,
1861            description: findings.map(f => `• ${f.suggestion}`).join('\n'),
1862            data: { findings, activeLimits },
1863          });
1864        } catch {
1865          /* human review queue may not be available in all environments */
1866        }
1867      } else {
1868        await this.log('info', 'Rate limit patterns look healthy', {
1869          task_id: task.id,
1870          eventsAnalysed: events.length,
1871        });
1872      }
1873  
1874      await this.completeTask(task.id, {
1875        events_analysed: events.length,
1876        findings: findings.length,
1877        active_limits: activeLimits.length,
1878        signals: findings.map(f => ({ signal: f.signal, api: f.api })),
1879      });
1880    }
1881  
1882    /**
1883     * Ensure recurring monitor tasks are scheduled
1884     *
1885     * @returns {Promise<void>}
1886     */
1887    async ensureRecurringTasks() {
1888      const recurringTasks = [
1889        { task_type: 'scan_logs', priority: 5, interval_minutes: 5 },
1890        { task_type: 'check_agent_health', priority: 6, interval_minutes: 30 },
1891        { task_type: 'check_process_compliance', priority: 5, interval_minutes: 15 },
1892        { task_type: 'detect_anomaly', priority: 4, interval_minutes: 60 },
1893        { task_type: 'check_pipeline_health', priority: 7, interval_minutes: 10 },
1894        { task_type: 'check_slo_compliance', priority: 7, interval_minutes: 30 },
1895        { task_type: 'check_rate_limits', priority: 4, interval_minutes: 360 }, // Every 6 hours
1896      ];
1897  
1898      for (const taskDef of recurringTasks) {
1899        // Check if task already pending/running
1900        const exists = await getOne(
1901          `SELECT 1 FROM tel.agent_tasks
1902           WHERE assigned_to = 'monitor'
1903           AND task_type = $1
1904           AND status IN ('pending', 'running')`,
1905          [taskDef.task_type]
1906        );
1907  
1908        if (!exists) {
1909          // Check if enough time has passed since last completion
1910          const lastCompleted = await getOne(
1911            `SELECT completed_at FROM tel.agent_tasks
1912             WHERE assigned_to = 'monitor'
1913             AND task_type = $1
1914             AND status = 'completed'
1915             ORDER BY completed_at DESC
1916             LIMIT 1`,
1917            [taskDef.task_type]
1918          );
1919  
1920          const shouldCreate =
1921            !lastCompleted ||
1922            Date.now() - new Date(lastCompleted.completed_at).getTime() >
1923              taskDef.interval_minutes * 60 * 1000;
1924  
1925          if (shouldCreate) {
1926            await this.createTask({
1927              task_type: taskDef.task_type,
1928              assigned_to: 'monitor',
1929              priority: taskDef.priority,
1930              context: { recurring: true },
1931            });
1932          }
1933        }
1934      }
1935    }
1936  }