/ src / cron / pipeline-status-monitor.js
pipeline-status-monitor.js
  1  /**
  2   * Pipeline Status Monitor - 15-minute autonomous status report
  3   *
  4   * Runs every 15 minutes via cron. Appends a human-readable snapshot of
  5   * pipeline health to logs/pipeline-status.txt so operators can review
  6   * what happened while AFK.
  7   *
  8   * Checks:
  9   * 1.  Services       - pipeline and cron.timer active?
 10   * 1b. Hung pipeline  - log silent >20min = deadlocked browser
 11   * 1c. Browser loop   - stale >90min while API loop healthy = hung Playwright
 12   * 1d. Stranded sites - promote prog_scored→semantic_scored when ENABLE_VISION=false
 13   * 1e. Retry failing  - reset up to 500 retryable failing sites back to 'found'
 14   * 2.  Recent errors  - last 10min from log files (test stack traces excluded)
 15   * 3.  Site status    - distribution with +/-delta since previous check
 16   * 4.  Agent tasks    - pending/running/blocked counts
 17   * 5.  Cron errors    - actual error messages from journalctl
 18   * 6.  Site progress  - sites that changed status in last 15 min
 19   * 7.  Stuck sites    - sites with no update >4h
 20   * 8.  Rate limits    - dynamic stage skips from circuit breakers
 21   * 9.  Zombie procs   - Z-state process count
 22   * 9b. Cron health    - overdue cron job detection (2× interval threshold)
 23   *
 24   * Output:   logs/pipeline-status.txt  (append-only, rolling)
 25   * Snapshot: logs/pipeline-status-snapshot.json  (delta tracking)
 26   */
 27  
 28  import { getAll, getOne, run } from '../utils/db.js';
 29  import { execSync } from 'child_process';
 30  import {
 31    appendFileSync,
 32    readFileSync,
 33    writeFileSync,
 34    readdirSync,
 35    existsSync,
 36    statSync,
 37    mkdirSync,
 38  } from 'fs';
 39  import { join, dirname } from 'path';
 40  import { fileURLToPath } from 'url';
 41  import '../utils/load-env.js';
 42  
 43  const __filename = fileURLToPath(import.meta.url);
 44  const __dirname = dirname(__filename);
 45  const projectRoot = join(__dirname, '../..');
 46  
 47  const statusFile = join(projectRoot, 'logs/pipeline-status.txt');
 48  const snapshotFile = join(projectRoot, 'logs/pipeline-status-snapshot.json');
 49  
 50  /** Format current time in Australia/Sydney local ISO format */
 51  function nowAEDT() {
 52    const d = new Date();
 53    const pad = (n, w = 2) => String(n).padStart(w, '0');
 54    const offset = -d.getTimezoneOffset();
 55    const sign = offset >= 0 ? '+' : '-';
 56    const oh = Math.floor(Math.abs(offset) / 60);
 57    const om = Math.abs(offset) % 60;
 58    return (
 59      `${d.getFullYear()}-${pad(d.getMonth() + 1)}-${pad(d.getDate())} ` +
 60      `${pad(d.getHours())}:${pad(d.getMinutes())}:${pad(d.getSeconds())} ` +
 61      `AEDT(${sign}${pad(oh)}:${pad(om)})`
 62    );
 63  }
 64  
 65  /** 1. Check systemd services */
 66  function checkServices() {
 67    const lines = [];
 68    const services = [
 69      { unit: '333method-pipeline', label: 'Pipeline' },
 70      { unit: 'mmo-cron.timer', label: 'Cron timer' },
 71    ];
 72    const actions = [];
 73  
 74    for (const { unit, label } of services) {
 75      try {
 76        execSync(`systemctl --user is-active ${unit}`, { encoding: 'utf8', timeout: 5000 });
 77        lines.push(`✓ ${label} running`);
 78      } catch {
 79        lines.push(`✗ ${label} DOWN`);
 80        if (unit === '333method-pipeline') {
 81          try {
 82            execSync('systemctl --user restart --no-block 333method-pipeline', {
 83              encoding: 'utf8',
 84              timeout: 15000,
 85            });
 86            lines.push('  → Restarted pipeline');
 87            actions.push('restarted_pipeline');
 88          } catch (err) {
 89            lines.push(`  → Restart failed: ${err.message}`);
 90          }
 91        }
 92      }
 93    }
 94    // Detect cron SERVICE stuck in "activating" (timer active but service not completing).
 95    // This happens when a cron job hangs longer than TimeoutStartSec (10min) — the timer
 96    // fires again every 5min but the previous run is still blocking or pending kill.
 97    try {
 98      const cronStatus = execSync(
 99        'systemctl --user show mmo-cron.service --property=ActiveState,StateChangeTimestamp',
100        {
101          encoding: 'utf8',
102          timeout: 5000,
103        }
104      ).trim();
105      const activeState = (cronStatus.match(/ActiveState=(\S+)/) || [])[1];
106      const stateChangeTs = (cronStatus.match(/StateChangeTimestamp=(.+)/) || [])[1];
107      if (activeState === 'activating' && stateChangeTs) {
108        const changeMs = new Date(stateChangeTs).getTime();
109        const stuckMin = Math.round((Date.now() - changeMs) / 60000);
110        if (stuckMin > 12) {
111          lines.push(
112            `⚠️  Cron SERVICE stuck in "activating" for ${stuckMin}min — a cron job is hung or exceeded TimeoutStartSec`
113          );
114        }
115      }
116    } catch {
117      // systemctl unavailable or parse error — skip
118    }
119  
120    return { lines, actions };
121  }
122  
123  /**
124   * 1b. Hung pipeline detection
125   * Sequential pipeline: log silent for >10min = deadlocked Playwright browser.
126   * Restarts the service to recover.
127   *
128   * Uses the most-recently-modified log across pipeline-*.log, proposals-*.log,
129   * utils-*.log, and scoring-*.log. This prevents false-positive restarts when the
130   * Proposals stage is active (it writes only to proposals/utils logs, not pipeline-*.log,
131   * so the 30-min threshold fired mid-batch and killed healthy proposal batches).
132   */
133  function checkPipelineHung() {
134    // Proposals batch: 200 sites × 13.5s = ~45min. Scoring: ~28min. Assets: ~7-13min.
135    // Use 60min so even the slowest stage completes before we restart.
136    const HUNG_THRESHOLD_MS = 60 * 60 * 1000; // 60 minutes
137    const logsDir = join(projectRoot, 'logs');
138  
139    // Check all stage logs — proposals writes to proposals-*.log and utils-*.log,
140    // not pipeline-*.log, so we must look across all active log files.
141    const ACTIVE_LOG_PREFIXES = ['pipeline-', 'proposals-', 'scoring-', 'utils-'];
142    const candidateLogs = readdirSync(logsDir)
143      .filter(
144        f =>
145          ACTIVE_LOG_PREFIXES.some(p => f.startsWith(p)) &&
146          f.endsWith('.log') &&
147          f !== 'pipeline-status.txt'
148      )
149      .map(f => ({ name: f, mtime: statSync(join(logsDir, f)).mtimeMs }))
150      .sort((a, b) => b.mtime - a.mtime);
151  
152    // Use the most recently written log as the heartbeat
153    const logPath = candidateLogs.length > 0 ? join(logsDir, candidateLogs[0].name) : null;
154    const lines = [];
155    const actions = [];
156  
157    try {
158      if (!logPath) throw new Error('no pipeline logs');
159      const { mtimeMs } = statSync(logPath);
160      const silentMs = Date.now() - mtimeMs;
161      const silentMin = Math.round(silentMs / 60000);
162  
163      if (silentMs > HUNG_THRESHOLD_MS) {
164        lines.push(`⚠️  Pipeline log silent for ${silentMin}min — restarting (likely hung browser)`);
165        try {
166          execSync('systemctl --user restart --no-block 333method-pipeline', {
167            encoding: 'utf8',
168            timeout: 15000,
169          });
170          lines.push('  → Pipeline restarted');
171          actions.push(`restarted_hung_pipeline_${silentMin}min`);
172        } catch (err) {
173          lines.push(`  → Restart failed: ${err.message}`);
174        }
175      } else {
176        const activeLogName = candidateLogs[0]?.name || 'unknown';
177        lines.push(`✓ Pipeline log active (last write ${silentMin}min ago via ${activeLogName})`);
178      }
179    } catch {
180      lines.push('✓ Pipeline log not yet created today');
181    }
182  
183    return { lines, actions };
184  }
185  
186  /**
187   * 1c. browser_loop watchdog
188   * Detects when browser_loop (Assets+Enrich) has hung while api_loop
189   * (SERPs) is still running. Uses pipeline_control timestamps.
190   */
191  async function checkBrowserLoopHung() {
192    const BROWSER_HUNG_THRESHOLD_MS = 90 * 60 * 1000; // 90 minutes (enrich batches can take 30-60min)
193    const lines = [];
194    const actions = [];
195  
196    try {
197      const row = await getOne(
198        `SELECT last_browser_loop_at, last_api_loop_at
199         FROM ops.pipeline_control WHERE id = 1`
200      );
201  
202      if (!row || !row.last_browser_loop_at || !row.last_api_loop_at) {
203        lines.push('✓ browser_loop watchdog: no timestamps yet');
204        return { lines, actions };
205      }
206  
207      const browserAge = Date.now() - new Date(`${row.last_browser_loop_at}Z`).getTime();
208      const apiAge = Date.now() - new Date(`${row.last_api_loop_at}Z`).getTime();
209      const browserMin = Math.round(browserAge / 60000);
210      const apiMin = Math.round(apiAge / 60000);
211  
212      if (browserAge > BROWSER_HUNG_THRESHOLD_MS && apiAge < BROWSER_HUNG_THRESHOLD_MS) {
213        lines.push(
214          `⚠️  browser_loop stale (${browserMin}min) while api_loop active (${apiMin}min) — restarting pipeline`
215        );
216        try {
217          execSync('systemctl --user restart --no-block 333method-pipeline', {
218            encoding: 'utf8',
219            timeout: 15000,
220          });
221          lines.push('  → Pipeline restarted (browser_loop recovery)');
222          actions.push(`restarted_hung_browser_loop_${browserMin}min`);
223        } catch (err) {
224          lines.push(`  → Restart failed: ${err.message}`);
225        }
226      } else {
227        lines.push(
228          `✓ browser_loop active (last cycle ${browserMin}min ago, api_loop ${apiMin}min ago)`
229        );
230      }
231    } catch (err) {
232      lines.push(`✓ browser_loop watchdog skipped: ${err.message}`);
233    }
234  
235    return { lines, actions };
236  }
237  
238  /** 2. Recent errors from log files — exclude test stack traces */
239  function recentErrors() {
240    const lines = ['Recent errors (last 10min):'];
241    try {
242      const logsDir = join(projectRoot, 'logs');
243      const tenMinAgo = Date.now() - 10 * 60 * 1000;
244      const logFiles = readdirSync(logsDir)
245        .filter(f => f.endsWith('.log') && f !== 'pipeline-status.txt')
246        .map(f => join(logsDir, f))
247        .filter(f => {
248          try {
249            return statSync(f).mtimeMs > tenMinAgo;
250          } catch {
251            return false;
252          }
253        });
254  
255      const errors = [];
256      for (const f of logFiles) {
257        try {
258          const content = readFileSync(f, 'utf8');
259          const matches = content
260            .split('\n')
261            .filter(l => /\[ERROR\]/.test(l) && !/\.test\.js|tests\//.test(l) && l.trim().length > 0);
262          errors.push(...matches);
263        } catch {
264          /* skip unreadable files */
265        }
266      }
267  
268      const recent = errors.slice(-5);
269      if (recent.length === 0) {
270        lines.push('  (none)');
271      } else {
272        lines.push(...recent.map(l => `  ${l.slice(0, 120)}`));
273      }
274    } catch (err) {
275      lines.push(`  (error reading logs: ${err.message})`);
276    }
277    return lines;
278  }
279  
280  /** 3. Site status distribution with delta */
281  async function siteStatusDistribution(prevCounts) {
282    const lines = ['Site status distribution (total | +/-delta):'];
283    try {
284      const rows = await getAll('SELECT status, COUNT(*) as cnt FROM sites GROUP BY status ORDER BY cnt DESC');
285      const newCounts = {};
286      for (const { status, cnt } of rows) {
287        newCounts[status] = cnt;
288        const prev = prevCounts[status] ?? 0;
289        const delta = cnt - prev;
290        const sign = delta > 0 ? '+' : '';
291        const deltaStr = delta === 0 ? '(=)' : `(${sign}${delta})`;
292        lines.push(`  ${status}: ${cnt} ${deltaStr}`);
293      }
294      return { lines, newCounts };
295    } catch (err) {
296      return { lines: [`  (error: ${err.message})`], newCounts: prevCounts };
297    }
298  }
299  
300  /** 4. Agent task status */
301  async function agentTaskStatus() {
302    const lines = ['Agent task status:'];
303    try {
304      const byAgent = await getAll(
305        `SELECT assigned_to, status, COUNT(*) as cnt FROM tel.agent_tasks
306         WHERE status IN ('pending','running','blocked')
307         GROUP BY assigned_to, status ORDER BY assigned_to, status`
308      );
309      for (const { assigned_to, status, cnt } of byAgent) {
310        lines.push(`  ${assigned_to}|${status}: ${cnt}`);
311      }
312  
313      lines.push('  --- totals ---');
314      const totals = await getAll('SELECT status, COUNT(*) as cnt FROM tel.agent_tasks GROUP BY status ORDER BY status');
315      for (const { status, cnt } of totals) {
316        lines.push(`  ${status}: ${cnt}`);
317      }
318    } catch (err) {
319      lines.push(`  (error: ${err.message})`);
320    }
321    return lines;
322  }
323  
324  /** 5. Cron errors — actual messages from journalctl */
325  function cronErrors() {
326    const lines = ['Cron jobs:'];
327    try {
328      const out = execSync(
329        'journalctl --user -u mmo-cron --since "15 minutes ago" --no-pager',
330        { encoding: 'utf8', timeout: 10000 }
331      );
332      const errLines = out.split('\n').filter(
333        l =>
334          /\[ERROR\]|Error:|failed:|FAILED|exception/i.test(l) &&
335          !/\.test\.js|tests\//.test(l) &&
336          // Exclude "0 errors" / "Errors: 0" false positives
337          !/[Ee]rrors?:\s*0|0\s+[Ee]rrors?|[Ff]ailed:\s*0/.test(l)
338      );
339  
340      if (errLines.length === 0) {
341        lines.push('  ✓ No cron errors');
342      } else {
343        lines.push(`  ⚠️  ${errLines.length} errors in last 15min:`);
344        lines.push(...errLines.slice(-5).map(l => `    ${l.slice(0, 120)}`));
345      }
346    } catch {
347      lines.push('  (journalctl unavailable)');
348    }
349    return lines;
350  }
351  
352  /** 6. Site progression in last 15 minutes */
353  async function siteProgression() {
354    const lines = ['Site progression (15min):'];
355    try {
356      const rows = await getAll(
357        `SELECT status, COUNT(*) as cnt FROM sites
358         WHERE updated_at > NOW() - INTERVAL '15 minutes'
359         GROUP BY status ORDER BY cnt DESC`
360      );
361      if (rows.length === 0) {
362        lines.push('  (none)');
363      } else {
364        for (const { status, cnt } of rows) {
365          lines.push(`  ${cnt} → ${status}`);
366        }
367      }
368    } catch (err) {
369      lines.push(`  (error: ${err.message})`);
370    }
371    return lines;
372  }
373  
374  /** 7. Stuck sites */
375  async function stuckSites() {
376    try {
377      const row = await getOne(
378        `SELECT COUNT(*) as cnt FROM sites
379         WHERE status NOT IN ('ignored','failing','high_score','outreach_sent','dead_letter')
380         AND updated_at < NOW() - INTERVAL '4 hours'`
381      );
382      return `Stuck sites (>4h no update): ${row?.cnt ?? 'ERR'}`;
383    } catch {
384      return 'Stuck sites: ERR';
385    }
386  }
387  
388  /**
389   * 1f. Schema validation — detect missing migrations by verifying key CHECK constraints.
390   * In PostgreSQL, we verify by attempting to query for each expected status value
391   * in the pg_enum or information_schema. Since PG uses CHECK constraints differently,
392   * we test by doing a zero-row UPDATE and catching constraint violations.
393   */
394  async function checkSchemaIntegrity() {
395    try {
396      const EXPECTED_STATUSES = [
397        'found',
398        'assets_captured',
399        'prog_scored',
400        'semantic_scored',
401        'vision_scored',
402        'enriched',
403        'enriched_regex',
404        'enriched_llm',
405        'proposals_drafted',
406        'outreach_partial',
407        'outreach_sent',
408        'ignored',
409        'failing',
410        'high_score',
411        'dead_letter',
412      ];
413  
414      // Test-write each expected status (zero-row update — no actual changes)
415      const missing = [];
416      for (const status of EXPECTED_STATUSES) {
417        try {
418          await run(`UPDATE sites SET status=$1 WHERE 1=0`, [status]);
419        } catch (err) {
420          if (/check constraint|violates check/i.test(err.message)) {
421            missing.push(status);
422          }
423        }
424      }
425  
426      if (missing.length > 0) {
427        return `⚠️  SCHEMA MIGRATION MISSING: sites.status does not accept: ${missing.join(', ')} — run migration and restart pipeline`;
428      }
429  
430      // Also check site_status table
431      const missingStatus = [];
432      for (const status of EXPECTED_STATUSES) {
433        try {
434          await run(`UPDATE site_status SET status=$1 WHERE 1=0`, [status]);
435        } catch (err) {
436          if (/check constraint|violates check/i.test(err.message)) {
437            missingStatus.push(status);
438          }
439        }
440      }
441  
442      if (missingStatus.length > 0) {
443        return `⚠️  SCHEMA MIGRATION MISSING: site_status.status does not accept: ${missingStatus.join(', ')} — run migration and restart pipeline`;
444      }
445  
446      return null; // no issues
447    } catch {
448      return null; // don't block monitor if check fails
449    }
450  }
451  
452  /** 8. Zombie process count — Z-state processes visible to host */
453  function checkZombies() {
454    try {
455      // Count processes in zombie state (Z) visible from this host/container
456      const out = execSync('ps --no-header -eo stat 2>/dev/null | grep -c "^Z" || echo 0', {
457        encoding: 'utf8',
458        shell: true,
459        timeout: 5000,
460      });
461      const count = parseInt(out.trim(), 10);
462      if (count >= 50) {
463        return `⚠️  Zombie processes: ${count} — container restart needed (tell user)`;
464      } else if (count > 0) {
465        return `⚠️  Zombie processes: ${count} (low — monitoring)`;
466      }
467      return '✓ No zombie processes';
468    } catch {
469      return '  Zombie check unavailable';
470    }
471  }
472  
473  /**
474   * 9b. Cron job health — detect overdue cron jobs from cron_jobs table.
475   * Compares (now - last_run_at) against 2× the expected interval.
476   */
477  async function checkCronHealth() {
478    const lines = ['Cron job health:'];
479    const actions = [];
480  
481    try {
482      const rows = await getAll(
483        `SELECT task_key, interval_value, interval_unit, last_run_at, enabled
484         FROM ops.cron_jobs WHERE enabled = true`
485      );
486  
487      if (rows.length === 0) {
488        lines.push('  (no enabled cron jobs found)');
489        return { lines, actions };
490      }
491  
492      const UNIT_TO_MS = {
493        seconds: 1_000,
494        second: 1_000,
495        minutes: 60_000,
496        minute: 60_000,
497        hours: 3_600_000,
498        hour: 3_600_000,
499        days: 86_400_000,
500        day: 86_400_000,
501      };
502  
503      const now = Date.now();
504      let overdueCount = 0;
505  
506      for (const row of rows) {
507        const multiplier = UNIT_TO_MS[row.interval_unit];
508        if (!multiplier) continue; // unknown unit, skip
509  
510        const expectedMs = row.interval_value * multiplier;
511        const threshold = expectedMs * 2;
512  
513        if (!row.last_run_at) {
514          // Never ran — warn but don't count as overdue (could be newly added)
515          lines.push(
516            `  ⚠️  NEVER RAN: ${row.task_key} (expected every ${row.interval_value} ${row.interval_unit})`
517          );
518          overdueCount++;
519          continue;
520        }
521  
522        const lastRunMs = new Date(
523          `${row.last_run_at}${row.last_run_at.toString().endsWith('Z') ? '' : 'Z'}`
524        ).getTime();
525        const elapsed = now - lastRunMs;
526  
527        if (elapsed > threshold) {
528          const elapsedMin = Math.round(elapsed / 60_000);
529          lines.push(
530            `  ⚠️  OVERDUE: ${row.task_key} last ran ${elapsedMin}min ago (expected every ${row.interval_value} ${row.interval_unit})`
531          );
532          overdueCount++;
533          actions.push(`cron_overdue_${row.task_key}`);
534        }
535      }
536  
537      if (overdueCount === 0) {
538        lines.push(`  ✓ All ${rows.length} cron jobs on schedule`);
539      } else {
540        lines.push(`  ${overdueCount}/${rows.length} jobs overdue`);
541      }
542    } catch (err) {
543      lines.push(`  (error checking cron health: ${err.message})`);
544    }
545  
546    return { lines, actions };
547  }
548  
549  /**
550   * When ENABLE_VISION=false, sites should never linger at 'prog_scored' — the scoring
551   * stage promotes them directly to 'semantic_scored'. This catches edge cases where old
552   * pipeline runs left sites stranded at 'prog_scored' before the auto-promote logic existed.
553   * Runs as a Tier 1 safety net every 5 minutes.
554   */
555  async function promoteStrandedScoredSites() {
556    if (process.env.ENABLE_VISION === 'true') return null;
557    try {
558      const result = await run(
559        `UPDATE sites SET status='semantic_scored', updated_at=NOW()
560         WHERE status='prog_scored' AND updated_at < NOW() - INTERVAL '1 hour'`
561      );
562      if (result.changes > 0) {
563        return `  ⬆  Promoted ${result.changes} stranded prog_scored→semantic_scored (ENABLE_VISION=false)`;
564      }
565    } catch {
566      /* non-fatal */
567    }
568    return null;
569  }
570  
571  /**
572   * Retry failing sites whose errors are now resolved/transient.
573   *
574   * The pipeline marks sites as status='failing' after 3 retries.  Many of these
575   * are retryable — they failed due to transient infrastructure issues (chromium
576   * binary missing before NixOS fix, LLM 400 burst, timeouts) and have never been
577   * given another chance.
578   *
579   * Strategy: reset a capped batch of retryable sites back to 'found' per run.
580   * Cap prevents flooding the pipeline when tens-of-thousands are queued.
581   *
582   * Dead letter queue: sites that have been reset DEAD_LETTER_THRESHOLD (3) times
583   * without recovering are promoted to status='dead_letter' instead of being reset
584   * again. chronic_failure_count tracks how many reset cycles each site has used.
585   *
586   * @param {number} [batchLimit=500] Max sites to reset per run (default: 500)
587   * @returns {string|null} Log message or null
588   */
589  async function retryFailingSites(batchLimit = 500) {
590    // Sites reset >= this many times without recovery are dead-lettered permanently
591    const DEAD_LETTER_THRESHOLD = 3;
592  
593    // Shared retryable error filter used in both queries below
594    const RETRYABLE_FILTER = `
595      (
596        error_message LIKE '%spawn /ho%'
597        OR error_message LIKE '%browserType.launch%'
598        OR error_message LIKE '%Target page, context or browser ha%'
599        OR error_message LIKE '%browser.newContext: Target page%'
600        OR error_message LIKE '%status code 400%'
601        OR error_message LIKE '%Incomplete LLM response%'
602        OR error_message LIKE '%Timeout 30000ms%'
603        OR error_message LIKE '%Timed out after 120000%'
604        OR error_message LIKE '%Country code is required%'
605        OR error_message LIKE '%Failed to parse JSON response%'
606        OR error_message LIKE '%page.content: Unable to retrieve%'
607      )
608      AND (recapture_at IS NULL OR recapture_at < NOW())
609    `;
610  
611    try {
612      const messages = [];
613  
614      // Step 1: Promote chronic failures to dead_letter.
615      // Sites that have been reset >= DEAD_LETTER_THRESHOLD times without ever
616      // progressing past 'failing' are permanently parked here.
617      const deadLetterResult = await run(
618        `UPDATE sites
619         SET status        = 'dead_letter',
620             error_message = 'Chronic failure: reset ' || chronic_failure_count || ' times without recovery',
621             updated_at    = NOW()
622         WHERE status = 'failing'
623           AND chronic_failure_count >= ${DEAD_LETTER_THRESHOLD}
624           AND ${RETRYABLE_FILTER}`
625      );
626  
627      if (deadLetterResult.changes > 0) {
628        messages.push(
629          `  ☠  Dead-lettered ${deadLetterResult.changes} chronic failing sites (>= ${DEAD_LETTER_THRESHOLD} retry cycles)`
630        );
631      }
632  
633      // Step 2: Reset remaining retryable sites that haven't exhausted their cycles.
634      // Increment chronic_failure_count so we know how many times we've tried each site.
635      const resetResult = await run(
636        `UPDATE sites
637         SET status               = 'found',
638             error_message        = NULL,
639             retry_count          = 0,
640             recapture_at         = NULL,
641             chronic_failure_count = chronic_failure_count + 1,
642             updated_at           = NOW()
643         WHERE id IN (
644           SELECT id FROM sites
645           WHERE status = 'failing'
646             AND chronic_failure_count < ${DEAD_LETTER_THRESHOLD}
647             AND ${RETRYABLE_FILTER}
648           ORDER BY updated_at ASC
649           LIMIT ${batchLimit}
650         )`
651      );
652  
653      if (resetResult.changes > 0) {
654        messages.push(
655          `  ♻  Reset ${resetResult.changes} retryable failing sites → found (batch limit: ${batchLimit})`
656        );
657      }
658  
659      return messages.length > 0 ? messages.join('\n') : null;
660    } catch (err) {
661      return `  ⚠  retryFailingSites error: ${err.message}`;
662    }
663  }
664  
665  /**
666   * Show active rate limits from logs/rate-limits.json.
667   * These are dynamic SKIP_STAGES set automatically when a circuit breaker
668   * opens due to a rate limit (daily quota, 429, etc.).
669   * Tuning signals are analysed by MonitorAgent.checkRateLimitPatterns() every 6 h.
670   */
671  function rateLimitStatus() {
672    const lines = ['Rate limits (dynamic stage skips):'];
673    try {
674      const rateLimitsPath = join(projectRoot, 'logs/rate-limits.json');
675      if (!existsSync(rateLimitsPath)) {
676        lines.push('  None active');
677        return lines;
678      }
679      const data = JSON.parse(readFileSync(rateLimitsPath, 'utf8'));
680      const now = Date.now();
681      const active = Object.entries(data).filter(([, v]) => Number(v.resetAt) > now);
682      if (active.length === 0) {
683        lines.push('  None active');
684      } else {
685        for (const [api, info] of active) {
686          const waitMin = Math.ceil((Number(info.resetAt) - now) / 60_000);
687          const resetStr = new Date(Number(info.resetAt)).toISOString();
688          lines.push(
689            `  ⏸  ${api} → [${info.stages?.join(', ')}] until ${resetStr} (~${waitMin} min)`
690          );
691          lines.push(`      reason: ${info.reason}`);
692        }
693      }
694    } catch {
695      lines.push('  ERR reading rate-limits.json');
696    }
697    return lines;
698  }
699  
700  /**
701   * Main pipeline status monitor function.
702   * @returns {{ summary: string, checks_run: number, actions: string[] }}
703   */
704  
705  export async function runPipelineStatusMonitor() {
706    const startTime = Date.now();
707  
708    // Load previous snapshot for delta calculation
709    let prevCounts = {};
710    try {
711      if (existsSync(snapshotFile)) {
712        prevCounts = JSON.parse(readFileSync(snapshotFile, 'utf8'));
713      }
714    } catch {
715      prevCounts = {};
716    }
717  
718    const sections = [];
719    let allActions = [];
720  
721    // Header
722    sections.push(`\n--- ${nowAEDT()} ---`);
723  
724    // 1. Services
725    const { lines: serviceLines, actions } = checkServices();
726    allActions = actions;
727    sections.push(...serviceLines);
728  
729    // 1b. Hung pipeline detection (log silent >20min = deadlocked browser)
730    const { lines: hungLines, actions: hungActions } = checkPipelineHung();
731    sections.push(...hungLines);
732    allActions = [...allActions, ...hungActions];
733  
734    // 1c. browser_loop watchdog (detects hung browser_loop while api_loop is healthy)
735    const { lines: browserLines, actions: browserActions } = await checkBrowserLoopHung();
736    sections.push(...browserLines);
737    allActions = [...allActions, ...browserActions];
738  
739    // 1d. Promote stranded prog_scored sites (ENABLE_VISION=false safety net)
740    const promotedMsg = await promoteStrandedScoredSites();
741    if (promotedMsg) {
742      sections.push(promotedMsg);
743      allActions.push(promotedMsg);
744    }
745  
746    // 1e. Retry failing sites with transient/resolved errors (500 per run)
747    const retryMsg = await retryFailingSites();
748    if (retryMsg) {
749      sections.push(retryMsg);
750      allActions.push(retryMsg);
751    }
752  
753    // 1f. Schema integrity — detect missing migrations
754    const schemaMsg = await checkSchemaIntegrity();
755    if (schemaMsg) {
756      sections.push(schemaMsg);
757      allActions.push(schemaMsg);
758    }
759  
760    // 2. Recent errors
761    sections.push('');
762    sections.push(...recentErrors());
763  
764    // 3. Site status with delta
765    sections.push('');
766    const { lines: statusLines, newCounts } = await siteStatusDistribution(prevCounts);
767    sections.push(...statusLines);
768  
769    // Save new snapshot
770    try {
771      writeFileSync(snapshotFile, JSON.stringify(newCounts, null, 2));
772    } catch {
773      /* non-fatal */
774    }
775  
776    // 4. Agent tasks
777    sections.push('');
778    sections.push(...(await agentTaskStatus()));
779  
780    // 5. Cron errors
781    sections.push('');
782    sections.push(...cronErrors());
783  
784    // 6. Site progression
785    sections.push('');
786    sections.push(...(await siteProgression()));
787  
788    // 7. Stuck sites
789    sections.push(await stuckSites());
790  
791    // 8. Rate limits (dynamic stage skips from rate-limit-scheduler)
792    sections.push('');
793    sections.push(...rateLimitStatus());
794  
795    // 9. Zombie processes
796    sections.push(checkZombies());
797  
798    // 9b. Cron job health (overdue detection)
799    sections.push('');
800    const { lines: cronHealthLines, actions: cronHealthActions } = await checkCronHealth();
801    sections.push(...cronHealthLines);
802    allActions = [...allActions, ...cronHealthActions];
803  
804    // 10. LLM cost variance check
805    try {
806      const { checkBudgetVariance } = await import('../utils/llm-usage-tracker.js');
807      const variances = checkBudgetVariance();
808      if (variances.length > 0) {
809        sections.push('');
810        sections.push('⚠️  LLM COST ALERTS:');
811        for (const v of variances) {
812          if (v.type === 'cost_variance') {
813            sections.push(
814              `  ${v.stage}: avg $${v.avgCost.toFixed(4)}/call (budget: $${v.expectedCost.toFixed(4)}, max: $${v.maxCost.toFixed(4)}) — ${v.callCount} calls/hr`
815            );
816          } else if (v.type === 'model_mismatch') {
817            sections.push(
818              `  ${v.stage}: using ${v.actualModel} (expected: ${v.expectedModel}) — ${v.callCount} calls/hr`
819            );
820          }
821        }
822      }
823    } catch {
824      // Budget table may not exist yet — skip silently
825    }
826  
827    // Append to status file
828    try {
829      appendFileSync(statusFile, `${sections.join('\n')}\n`);
830    } catch {
831      // If logs/ dir doesn't exist yet, create it and retry
832      mkdirSync(join(projectRoot, 'logs'), { recursive: true });
833      appendFileSync(statusFile, `${sections.join('\n')}\n`);
834    }
835  
836    const duration = ((Date.now() - startTime) / 1000).toFixed(1);
837    const summary =
838      allActions.length > 0
839        ? `Pipeline status check done in ${duration}s — actions: ${allActions.join(', ')}`
840        : `Pipeline status check done in ${duration}s — all clear`;
841  
842    return {
843      summary,
844      checks_run: 10,
845      actions: allActions,
846      duration_seconds: parseFloat(duration),
847    };
848  }
849  
850  // CLI support
851  if (import.meta.url === `file://${process.argv[1]}`) {
852    runPipelineStatusMonitor()
853      .then(result => {
854        console.log(result.summary);
855        process.exit(0);
856      })
857      .catch(err => {
858        console.error('Pipeline status monitor error:', err.message);
859        process.exit(1);
860      });
861  }