pipeline-status-monitor.js
1 /** 2 * Pipeline Status Monitor - 15-minute autonomous status report 3 * 4 * Runs every 15 minutes via cron. Appends a human-readable snapshot of 5 * pipeline health to logs/pipeline-status.txt so operators can review 6 * what happened while AFK. 7 * 8 * Checks: 9 * 1. Services - pipeline and cron.timer active? 10 * 1b. Hung pipeline - log silent >20min = deadlocked browser 11 * 1c. Browser loop - stale >90min while API loop healthy = hung Playwright 12 * 1d. Stranded sites - promote prog_scored→semantic_scored when ENABLE_VISION=false 13 * 1e. Retry failing - reset up to 500 retryable failing sites back to 'found' 14 * 2. Recent errors - last 10min from log files (test stack traces excluded) 15 * 3. Site status - distribution with +/-delta since previous check 16 * 4. Agent tasks - pending/running/blocked counts 17 * 5. Cron errors - actual error messages from journalctl 18 * 6. Site progress - sites that changed status in last 15 min 19 * 7. Stuck sites - sites with no update >4h 20 * 8. Rate limits - dynamic stage skips from circuit breakers 21 * 9. Zombie procs - Z-state process count 22 * 9b. Cron health - overdue cron job detection (2× interval threshold) 23 * 24 * Output: logs/pipeline-status.txt (append-only, rolling) 25 * Snapshot: logs/pipeline-status-snapshot.json (delta tracking) 26 */ 27 28 import { getAll, getOne, run } from '../utils/db.js'; 29 import { execSync } from 'child_process'; 30 import { 31 appendFileSync, 32 readFileSync, 33 writeFileSync, 34 readdirSync, 35 existsSync, 36 statSync, 37 mkdirSync, 38 } from 'fs'; 39 import { join, dirname } from 'path'; 40 import { fileURLToPath } from 'url'; 41 import '../utils/load-env.js'; 42 43 const __filename = fileURLToPath(import.meta.url); 44 const __dirname = dirname(__filename); 45 const projectRoot = join(__dirname, '../..'); 46 47 const statusFile = join(projectRoot, 'logs/pipeline-status.txt'); 48 const snapshotFile = join(projectRoot, 'logs/pipeline-status-snapshot.json'); 49 50 /** Format current time in Australia/Sydney local ISO format */ 51 function nowAEDT() { 52 const d = new Date(); 53 const pad = (n, w = 2) => String(n).padStart(w, '0'); 54 const offset = -d.getTimezoneOffset(); 55 const sign = offset >= 0 ? '+' : '-'; 56 const oh = Math.floor(Math.abs(offset) / 60); 57 const om = Math.abs(offset) % 60; 58 return ( 59 `${d.getFullYear()}-${pad(d.getMonth() + 1)}-${pad(d.getDate())} ` + 60 `${pad(d.getHours())}:${pad(d.getMinutes())}:${pad(d.getSeconds())} ` + 61 `AEDT(${sign}${pad(oh)}:${pad(om)})` 62 ); 63 } 64 65 /** 1. Check systemd services */ 66 function checkServices() { 67 const lines = []; 68 const services = [ 69 { unit: '333method-pipeline', label: 'Pipeline' }, 70 { unit: 'mmo-cron.timer', label: 'Cron timer' }, 71 ]; 72 const actions = []; 73 74 for (const { unit, label } of services) { 75 try { 76 execSync(`systemctl --user is-active ${unit}`, { encoding: 'utf8', timeout: 5000 }); 77 lines.push(`✓ ${label} running`); 78 } catch { 79 lines.push(`✗ ${label} DOWN`); 80 if (unit === '333method-pipeline') { 81 try { 82 execSync('systemctl --user restart --no-block 333method-pipeline', { 83 encoding: 'utf8', 84 timeout: 15000, 85 }); 86 lines.push(' → Restarted pipeline'); 87 actions.push('restarted_pipeline'); 88 } catch (err) { 89 lines.push(` → Restart failed: ${err.message}`); 90 } 91 } 92 } 93 } 94 // Detect cron SERVICE stuck in "activating" (timer active but service not completing). 95 // This happens when a cron job hangs longer than TimeoutStartSec (10min) — the timer 96 // fires again every 5min but the previous run is still blocking or pending kill. 97 try { 98 const cronStatus = execSync( 99 'systemctl --user show mmo-cron.service --property=ActiveState,StateChangeTimestamp', 100 { 101 encoding: 'utf8', 102 timeout: 5000, 103 } 104 ).trim(); 105 const activeState = (cronStatus.match(/ActiveState=(\S+)/) || [])[1]; 106 const stateChangeTs = (cronStatus.match(/StateChangeTimestamp=(.+)/) || [])[1]; 107 if (activeState === 'activating' && stateChangeTs) { 108 const changeMs = new Date(stateChangeTs).getTime(); 109 const stuckMin = Math.round((Date.now() - changeMs) / 60000); 110 if (stuckMin > 12) { 111 lines.push( 112 `⚠️ Cron SERVICE stuck in "activating" for ${stuckMin}min — a cron job is hung or exceeded TimeoutStartSec` 113 ); 114 } 115 } 116 } catch { 117 // systemctl unavailable or parse error — skip 118 } 119 120 return { lines, actions }; 121 } 122 123 /** 124 * 1b. Hung pipeline detection 125 * Sequential pipeline: log silent for >10min = deadlocked Playwright browser. 126 * Restarts the service to recover. 127 * 128 * Uses the most-recently-modified log across pipeline-*.log, proposals-*.log, 129 * utils-*.log, and scoring-*.log. This prevents false-positive restarts when the 130 * Proposals stage is active (it writes only to proposals/utils logs, not pipeline-*.log, 131 * so the 30-min threshold fired mid-batch and killed healthy proposal batches). 132 */ 133 function checkPipelineHung() { 134 // Proposals batch: 200 sites × 13.5s = ~45min. Scoring: ~28min. Assets: ~7-13min. 135 // Use 60min so even the slowest stage completes before we restart. 136 const HUNG_THRESHOLD_MS = 60 * 60 * 1000; // 60 minutes 137 const logsDir = join(projectRoot, 'logs'); 138 139 // Check all stage logs — proposals writes to proposals-*.log and utils-*.log, 140 // not pipeline-*.log, so we must look across all active log files. 141 const ACTIVE_LOG_PREFIXES = ['pipeline-', 'proposals-', 'scoring-', 'utils-']; 142 const candidateLogs = readdirSync(logsDir) 143 .filter( 144 f => 145 ACTIVE_LOG_PREFIXES.some(p => f.startsWith(p)) && 146 f.endsWith('.log') && 147 f !== 'pipeline-status.txt' 148 ) 149 .map(f => ({ name: f, mtime: statSync(join(logsDir, f)).mtimeMs })) 150 .sort((a, b) => b.mtime - a.mtime); 151 152 // Use the most recently written log as the heartbeat 153 const logPath = candidateLogs.length > 0 ? join(logsDir, candidateLogs[0].name) : null; 154 const lines = []; 155 const actions = []; 156 157 try { 158 if (!logPath) throw new Error('no pipeline logs'); 159 const { mtimeMs } = statSync(logPath); 160 const silentMs = Date.now() - mtimeMs; 161 const silentMin = Math.round(silentMs / 60000); 162 163 if (silentMs > HUNG_THRESHOLD_MS) { 164 lines.push(`⚠️ Pipeline log silent for ${silentMin}min — restarting (likely hung browser)`); 165 try { 166 execSync('systemctl --user restart --no-block 333method-pipeline', { 167 encoding: 'utf8', 168 timeout: 15000, 169 }); 170 lines.push(' → Pipeline restarted'); 171 actions.push(`restarted_hung_pipeline_${silentMin}min`); 172 } catch (err) { 173 lines.push(` → Restart failed: ${err.message}`); 174 } 175 } else { 176 const activeLogName = candidateLogs[0]?.name || 'unknown'; 177 lines.push(`✓ Pipeline log active (last write ${silentMin}min ago via ${activeLogName})`); 178 } 179 } catch { 180 lines.push('✓ Pipeline log not yet created today'); 181 } 182 183 return { lines, actions }; 184 } 185 186 /** 187 * 1c. browser_loop watchdog 188 * Detects when browser_loop (Assets+Enrich) has hung while api_loop 189 * (SERPs) is still running. Uses pipeline_control timestamps. 190 */ 191 async function checkBrowserLoopHung() { 192 const BROWSER_HUNG_THRESHOLD_MS = 90 * 60 * 1000; // 90 minutes (enrich batches can take 30-60min) 193 const lines = []; 194 const actions = []; 195 196 try { 197 const row = await getOne( 198 `SELECT last_browser_loop_at, last_api_loop_at 199 FROM ops.pipeline_control WHERE id = 1` 200 ); 201 202 if (!row || !row.last_browser_loop_at || !row.last_api_loop_at) { 203 lines.push('✓ browser_loop watchdog: no timestamps yet'); 204 return { lines, actions }; 205 } 206 207 const browserAge = Date.now() - new Date(`${row.last_browser_loop_at}Z`).getTime(); 208 const apiAge = Date.now() - new Date(`${row.last_api_loop_at}Z`).getTime(); 209 const browserMin = Math.round(browserAge / 60000); 210 const apiMin = Math.round(apiAge / 60000); 211 212 if (browserAge > BROWSER_HUNG_THRESHOLD_MS && apiAge < BROWSER_HUNG_THRESHOLD_MS) { 213 lines.push( 214 `⚠️ browser_loop stale (${browserMin}min) while api_loop active (${apiMin}min) — restarting pipeline` 215 ); 216 try { 217 execSync('systemctl --user restart --no-block 333method-pipeline', { 218 encoding: 'utf8', 219 timeout: 15000, 220 }); 221 lines.push(' → Pipeline restarted (browser_loop recovery)'); 222 actions.push(`restarted_hung_browser_loop_${browserMin}min`); 223 } catch (err) { 224 lines.push(` → Restart failed: ${err.message}`); 225 } 226 } else { 227 lines.push( 228 `✓ browser_loop active (last cycle ${browserMin}min ago, api_loop ${apiMin}min ago)` 229 ); 230 } 231 } catch (err) { 232 lines.push(`✓ browser_loop watchdog skipped: ${err.message}`); 233 } 234 235 return { lines, actions }; 236 } 237 238 /** 2. Recent errors from log files — exclude test stack traces */ 239 function recentErrors() { 240 const lines = ['Recent errors (last 10min):']; 241 try { 242 const logsDir = join(projectRoot, 'logs'); 243 const tenMinAgo = Date.now() - 10 * 60 * 1000; 244 const logFiles = readdirSync(logsDir) 245 .filter(f => f.endsWith('.log') && f !== 'pipeline-status.txt') 246 .map(f => join(logsDir, f)) 247 .filter(f => { 248 try { 249 return statSync(f).mtimeMs > tenMinAgo; 250 } catch { 251 return false; 252 } 253 }); 254 255 const errors = []; 256 for (const f of logFiles) { 257 try { 258 const content = readFileSync(f, 'utf8'); 259 const matches = content 260 .split('\n') 261 .filter(l => /\[ERROR\]/.test(l) && !/\.test\.js|tests\//.test(l) && l.trim().length > 0); 262 errors.push(...matches); 263 } catch { 264 /* skip unreadable files */ 265 } 266 } 267 268 const recent = errors.slice(-5); 269 if (recent.length === 0) { 270 lines.push(' (none)'); 271 } else { 272 lines.push(...recent.map(l => ` ${l.slice(0, 120)}`)); 273 } 274 } catch (err) { 275 lines.push(` (error reading logs: ${err.message})`); 276 } 277 return lines; 278 } 279 280 /** 3. Site status distribution with delta */ 281 async function siteStatusDistribution(prevCounts) { 282 const lines = ['Site status distribution (total | +/-delta):']; 283 try { 284 const rows = await getAll('SELECT status, COUNT(*) as cnt FROM sites GROUP BY status ORDER BY cnt DESC'); 285 const newCounts = {}; 286 for (const { status, cnt } of rows) { 287 newCounts[status] = cnt; 288 const prev = prevCounts[status] ?? 0; 289 const delta = cnt - prev; 290 const sign = delta > 0 ? '+' : ''; 291 const deltaStr = delta === 0 ? '(=)' : `(${sign}${delta})`; 292 lines.push(` ${status}: ${cnt} ${deltaStr}`); 293 } 294 return { lines, newCounts }; 295 } catch (err) { 296 return { lines: [` (error: ${err.message})`], newCounts: prevCounts }; 297 } 298 } 299 300 /** 4. Agent task status */ 301 async function agentTaskStatus() { 302 const lines = ['Agent task status:']; 303 try { 304 const byAgent = await getAll( 305 `SELECT assigned_to, status, COUNT(*) as cnt FROM tel.agent_tasks 306 WHERE status IN ('pending','running','blocked') 307 GROUP BY assigned_to, status ORDER BY assigned_to, status` 308 ); 309 for (const { assigned_to, status, cnt } of byAgent) { 310 lines.push(` ${assigned_to}|${status}: ${cnt}`); 311 } 312 313 lines.push(' --- totals ---'); 314 const totals = await getAll('SELECT status, COUNT(*) as cnt FROM tel.agent_tasks GROUP BY status ORDER BY status'); 315 for (const { status, cnt } of totals) { 316 lines.push(` ${status}: ${cnt}`); 317 } 318 } catch (err) { 319 lines.push(` (error: ${err.message})`); 320 } 321 return lines; 322 } 323 324 /** 5. Cron errors — actual messages from journalctl */ 325 function cronErrors() { 326 const lines = ['Cron jobs:']; 327 try { 328 const out = execSync( 329 'journalctl --user -u mmo-cron --since "15 minutes ago" --no-pager', 330 { encoding: 'utf8', timeout: 10000 } 331 ); 332 const errLines = out.split('\n').filter( 333 l => 334 /\[ERROR\]|Error:|failed:|FAILED|exception/i.test(l) && 335 !/\.test\.js|tests\//.test(l) && 336 // Exclude "0 errors" / "Errors: 0" false positives 337 !/[Ee]rrors?:\s*0|0\s+[Ee]rrors?|[Ff]ailed:\s*0/.test(l) 338 ); 339 340 if (errLines.length === 0) { 341 lines.push(' ✓ No cron errors'); 342 } else { 343 lines.push(` ⚠️ ${errLines.length} errors in last 15min:`); 344 lines.push(...errLines.slice(-5).map(l => ` ${l.slice(0, 120)}`)); 345 } 346 } catch { 347 lines.push(' (journalctl unavailable)'); 348 } 349 return lines; 350 } 351 352 /** 6. Site progression in last 15 minutes */ 353 async function siteProgression() { 354 const lines = ['Site progression (15min):']; 355 try { 356 const rows = await getAll( 357 `SELECT status, COUNT(*) as cnt FROM sites 358 WHERE updated_at > NOW() - INTERVAL '15 minutes' 359 GROUP BY status ORDER BY cnt DESC` 360 ); 361 if (rows.length === 0) { 362 lines.push(' (none)'); 363 } else { 364 for (const { status, cnt } of rows) { 365 lines.push(` ${cnt} → ${status}`); 366 } 367 } 368 } catch (err) { 369 lines.push(` (error: ${err.message})`); 370 } 371 return lines; 372 } 373 374 /** 7. Stuck sites */ 375 async function stuckSites() { 376 try { 377 const row = await getOne( 378 `SELECT COUNT(*) as cnt FROM sites 379 WHERE status NOT IN ('ignored','failing','high_score','outreach_sent','dead_letter') 380 AND updated_at < NOW() - INTERVAL '4 hours'` 381 ); 382 return `Stuck sites (>4h no update): ${row?.cnt ?? 'ERR'}`; 383 } catch { 384 return 'Stuck sites: ERR'; 385 } 386 } 387 388 /** 389 * 1f. Schema validation — detect missing migrations by verifying key CHECK constraints. 390 * In PostgreSQL, we verify by attempting to query for each expected status value 391 * in the pg_enum or information_schema. Since PG uses CHECK constraints differently, 392 * we test by doing a zero-row UPDATE and catching constraint violations. 393 */ 394 async function checkSchemaIntegrity() { 395 try { 396 const EXPECTED_STATUSES = [ 397 'found', 398 'assets_captured', 399 'prog_scored', 400 'semantic_scored', 401 'vision_scored', 402 'enriched', 403 'enriched_regex', 404 'enriched_llm', 405 'proposals_drafted', 406 'outreach_partial', 407 'outreach_sent', 408 'ignored', 409 'failing', 410 'high_score', 411 'dead_letter', 412 ]; 413 414 // Test-write each expected status (zero-row update — no actual changes) 415 const missing = []; 416 for (const status of EXPECTED_STATUSES) { 417 try { 418 await run(`UPDATE sites SET status=$1 WHERE 1=0`, [status]); 419 } catch (err) { 420 if (/check constraint|violates check/i.test(err.message)) { 421 missing.push(status); 422 } 423 } 424 } 425 426 if (missing.length > 0) { 427 return `⚠️ SCHEMA MIGRATION MISSING: sites.status does not accept: ${missing.join(', ')} — run migration and restart pipeline`; 428 } 429 430 // Also check site_status table 431 const missingStatus = []; 432 for (const status of EXPECTED_STATUSES) { 433 try { 434 await run(`UPDATE site_status SET status=$1 WHERE 1=0`, [status]); 435 } catch (err) { 436 if (/check constraint|violates check/i.test(err.message)) { 437 missingStatus.push(status); 438 } 439 } 440 } 441 442 if (missingStatus.length > 0) { 443 return `⚠️ SCHEMA MIGRATION MISSING: site_status.status does not accept: ${missingStatus.join(', ')} — run migration and restart pipeline`; 444 } 445 446 return null; // no issues 447 } catch { 448 return null; // don't block monitor if check fails 449 } 450 } 451 452 /** 8. Zombie process count — Z-state processes visible to host */ 453 function checkZombies() { 454 try { 455 // Count processes in zombie state (Z) visible from this host/container 456 const out = execSync('ps --no-header -eo stat 2>/dev/null | grep -c "^Z" || echo 0', { 457 encoding: 'utf8', 458 shell: true, 459 timeout: 5000, 460 }); 461 const count = parseInt(out.trim(), 10); 462 if (count >= 50) { 463 return `⚠️ Zombie processes: ${count} — container restart needed (tell user)`; 464 } else if (count > 0) { 465 return `⚠️ Zombie processes: ${count} (low — monitoring)`; 466 } 467 return '✓ No zombie processes'; 468 } catch { 469 return ' Zombie check unavailable'; 470 } 471 } 472 473 /** 474 * 9b. Cron job health — detect overdue cron jobs from cron_jobs table. 475 * Compares (now - last_run_at) against 2× the expected interval. 476 */ 477 async function checkCronHealth() { 478 const lines = ['Cron job health:']; 479 const actions = []; 480 481 try { 482 const rows = await getAll( 483 `SELECT task_key, interval_value, interval_unit, last_run_at, enabled 484 FROM ops.cron_jobs WHERE enabled = true` 485 ); 486 487 if (rows.length === 0) { 488 lines.push(' (no enabled cron jobs found)'); 489 return { lines, actions }; 490 } 491 492 const UNIT_TO_MS = { 493 seconds: 1_000, 494 second: 1_000, 495 minutes: 60_000, 496 minute: 60_000, 497 hours: 3_600_000, 498 hour: 3_600_000, 499 days: 86_400_000, 500 day: 86_400_000, 501 }; 502 503 const now = Date.now(); 504 let overdueCount = 0; 505 506 for (const row of rows) { 507 const multiplier = UNIT_TO_MS[row.interval_unit]; 508 if (!multiplier) continue; // unknown unit, skip 509 510 const expectedMs = row.interval_value * multiplier; 511 const threshold = expectedMs * 2; 512 513 if (!row.last_run_at) { 514 // Never ran — warn but don't count as overdue (could be newly added) 515 lines.push( 516 ` ⚠️ NEVER RAN: ${row.task_key} (expected every ${row.interval_value} ${row.interval_unit})` 517 ); 518 overdueCount++; 519 continue; 520 } 521 522 const lastRunMs = new Date( 523 `${row.last_run_at}${row.last_run_at.toString().endsWith('Z') ? '' : 'Z'}` 524 ).getTime(); 525 const elapsed = now - lastRunMs; 526 527 if (elapsed > threshold) { 528 const elapsedMin = Math.round(elapsed / 60_000); 529 lines.push( 530 ` ⚠️ OVERDUE: ${row.task_key} last ran ${elapsedMin}min ago (expected every ${row.interval_value} ${row.interval_unit})` 531 ); 532 overdueCount++; 533 actions.push(`cron_overdue_${row.task_key}`); 534 } 535 } 536 537 if (overdueCount === 0) { 538 lines.push(` ✓ All ${rows.length} cron jobs on schedule`); 539 } else { 540 lines.push(` ${overdueCount}/${rows.length} jobs overdue`); 541 } 542 } catch (err) { 543 lines.push(` (error checking cron health: ${err.message})`); 544 } 545 546 return { lines, actions }; 547 } 548 549 /** 550 * When ENABLE_VISION=false, sites should never linger at 'prog_scored' — the scoring 551 * stage promotes them directly to 'semantic_scored'. This catches edge cases where old 552 * pipeline runs left sites stranded at 'prog_scored' before the auto-promote logic existed. 553 * Runs as a Tier 1 safety net every 5 minutes. 554 */ 555 async function promoteStrandedScoredSites() { 556 if (process.env.ENABLE_VISION === 'true') return null; 557 try { 558 const result = await run( 559 `UPDATE sites SET status='semantic_scored', updated_at=NOW() 560 WHERE status='prog_scored' AND updated_at < NOW() - INTERVAL '1 hour'` 561 ); 562 if (result.changes > 0) { 563 return ` ⬆ Promoted ${result.changes} stranded prog_scored→semantic_scored (ENABLE_VISION=false)`; 564 } 565 } catch { 566 /* non-fatal */ 567 } 568 return null; 569 } 570 571 /** 572 * Retry failing sites whose errors are now resolved/transient. 573 * 574 * The pipeline marks sites as status='failing' after 3 retries. Many of these 575 * are retryable — they failed due to transient infrastructure issues (chromium 576 * binary missing before NixOS fix, LLM 400 burst, timeouts) and have never been 577 * given another chance. 578 * 579 * Strategy: reset a capped batch of retryable sites back to 'found' per run. 580 * Cap prevents flooding the pipeline when tens-of-thousands are queued. 581 * 582 * Dead letter queue: sites that have been reset DEAD_LETTER_THRESHOLD (3) times 583 * without recovering are promoted to status='dead_letter' instead of being reset 584 * again. chronic_failure_count tracks how many reset cycles each site has used. 585 * 586 * @param {number} [batchLimit=500] Max sites to reset per run (default: 500) 587 * @returns {string|null} Log message or null 588 */ 589 async function retryFailingSites(batchLimit = 500) { 590 // Sites reset >= this many times without recovery are dead-lettered permanently 591 const DEAD_LETTER_THRESHOLD = 3; 592 593 // Shared retryable error filter used in both queries below 594 const RETRYABLE_FILTER = ` 595 ( 596 error_message LIKE '%spawn /ho%' 597 OR error_message LIKE '%browserType.launch%' 598 OR error_message LIKE '%Target page, context or browser ha%' 599 OR error_message LIKE '%browser.newContext: Target page%' 600 OR error_message LIKE '%status code 400%' 601 OR error_message LIKE '%Incomplete LLM response%' 602 OR error_message LIKE '%Timeout 30000ms%' 603 OR error_message LIKE '%Timed out after 120000%' 604 OR error_message LIKE '%Country code is required%' 605 OR error_message LIKE '%Failed to parse JSON response%' 606 OR error_message LIKE '%page.content: Unable to retrieve%' 607 ) 608 AND (recapture_at IS NULL OR recapture_at < NOW()) 609 `; 610 611 try { 612 const messages = []; 613 614 // Step 1: Promote chronic failures to dead_letter. 615 // Sites that have been reset >= DEAD_LETTER_THRESHOLD times without ever 616 // progressing past 'failing' are permanently parked here. 617 const deadLetterResult = await run( 618 `UPDATE sites 619 SET status = 'dead_letter', 620 error_message = 'Chronic failure: reset ' || chronic_failure_count || ' times without recovery', 621 updated_at = NOW() 622 WHERE status = 'failing' 623 AND chronic_failure_count >= ${DEAD_LETTER_THRESHOLD} 624 AND ${RETRYABLE_FILTER}` 625 ); 626 627 if (deadLetterResult.changes > 0) { 628 messages.push( 629 ` ☠ Dead-lettered ${deadLetterResult.changes} chronic failing sites (>= ${DEAD_LETTER_THRESHOLD} retry cycles)` 630 ); 631 } 632 633 // Step 2: Reset remaining retryable sites that haven't exhausted their cycles. 634 // Increment chronic_failure_count so we know how many times we've tried each site. 635 const resetResult = await run( 636 `UPDATE sites 637 SET status = 'found', 638 error_message = NULL, 639 retry_count = 0, 640 recapture_at = NULL, 641 chronic_failure_count = chronic_failure_count + 1, 642 updated_at = NOW() 643 WHERE id IN ( 644 SELECT id FROM sites 645 WHERE status = 'failing' 646 AND chronic_failure_count < ${DEAD_LETTER_THRESHOLD} 647 AND ${RETRYABLE_FILTER} 648 ORDER BY updated_at ASC 649 LIMIT ${batchLimit} 650 )` 651 ); 652 653 if (resetResult.changes > 0) { 654 messages.push( 655 ` ♻ Reset ${resetResult.changes} retryable failing sites → found (batch limit: ${batchLimit})` 656 ); 657 } 658 659 return messages.length > 0 ? messages.join('\n') : null; 660 } catch (err) { 661 return ` ⚠ retryFailingSites error: ${err.message}`; 662 } 663 } 664 665 /** 666 * Show active rate limits from logs/rate-limits.json. 667 * These are dynamic SKIP_STAGES set automatically when a circuit breaker 668 * opens due to a rate limit (daily quota, 429, etc.). 669 * Tuning signals are analysed by MonitorAgent.checkRateLimitPatterns() every 6 h. 670 */ 671 function rateLimitStatus() { 672 const lines = ['Rate limits (dynamic stage skips):']; 673 try { 674 const rateLimitsPath = join(projectRoot, 'logs/rate-limits.json'); 675 if (!existsSync(rateLimitsPath)) { 676 lines.push(' None active'); 677 return lines; 678 } 679 const data = JSON.parse(readFileSync(rateLimitsPath, 'utf8')); 680 const now = Date.now(); 681 const active = Object.entries(data).filter(([, v]) => Number(v.resetAt) > now); 682 if (active.length === 0) { 683 lines.push(' None active'); 684 } else { 685 for (const [api, info] of active) { 686 const waitMin = Math.ceil((Number(info.resetAt) - now) / 60_000); 687 const resetStr = new Date(Number(info.resetAt)).toISOString(); 688 lines.push( 689 ` ⏸ ${api} → [${info.stages?.join(', ')}] until ${resetStr} (~${waitMin} min)` 690 ); 691 lines.push(` reason: ${info.reason}`); 692 } 693 } 694 } catch { 695 lines.push(' ERR reading rate-limits.json'); 696 } 697 return lines; 698 } 699 700 /** 701 * Main pipeline status monitor function. 702 * @returns {{ summary: string, checks_run: number, actions: string[] }} 703 */ 704 705 export async function runPipelineStatusMonitor() { 706 const startTime = Date.now(); 707 708 // Load previous snapshot for delta calculation 709 let prevCounts = {}; 710 try { 711 if (existsSync(snapshotFile)) { 712 prevCounts = JSON.parse(readFileSync(snapshotFile, 'utf8')); 713 } 714 } catch { 715 prevCounts = {}; 716 } 717 718 const sections = []; 719 let allActions = []; 720 721 // Header 722 sections.push(`\n--- ${nowAEDT()} ---`); 723 724 // 1. Services 725 const { lines: serviceLines, actions } = checkServices(); 726 allActions = actions; 727 sections.push(...serviceLines); 728 729 // 1b. Hung pipeline detection (log silent >20min = deadlocked browser) 730 const { lines: hungLines, actions: hungActions } = checkPipelineHung(); 731 sections.push(...hungLines); 732 allActions = [...allActions, ...hungActions]; 733 734 // 1c. browser_loop watchdog (detects hung browser_loop while api_loop is healthy) 735 const { lines: browserLines, actions: browserActions } = await checkBrowserLoopHung(); 736 sections.push(...browserLines); 737 allActions = [...allActions, ...browserActions]; 738 739 // 1d. Promote stranded prog_scored sites (ENABLE_VISION=false safety net) 740 const promotedMsg = await promoteStrandedScoredSites(); 741 if (promotedMsg) { 742 sections.push(promotedMsg); 743 allActions.push(promotedMsg); 744 } 745 746 // 1e. Retry failing sites with transient/resolved errors (500 per run) 747 const retryMsg = await retryFailingSites(); 748 if (retryMsg) { 749 sections.push(retryMsg); 750 allActions.push(retryMsg); 751 } 752 753 // 1f. Schema integrity — detect missing migrations 754 const schemaMsg = await checkSchemaIntegrity(); 755 if (schemaMsg) { 756 sections.push(schemaMsg); 757 allActions.push(schemaMsg); 758 } 759 760 // 2. Recent errors 761 sections.push(''); 762 sections.push(...recentErrors()); 763 764 // 3. Site status with delta 765 sections.push(''); 766 const { lines: statusLines, newCounts } = await siteStatusDistribution(prevCounts); 767 sections.push(...statusLines); 768 769 // Save new snapshot 770 try { 771 writeFileSync(snapshotFile, JSON.stringify(newCounts, null, 2)); 772 } catch { 773 /* non-fatal */ 774 } 775 776 // 4. Agent tasks 777 sections.push(''); 778 sections.push(...(await agentTaskStatus())); 779 780 // 5. Cron errors 781 sections.push(''); 782 sections.push(...cronErrors()); 783 784 // 6. Site progression 785 sections.push(''); 786 sections.push(...(await siteProgression())); 787 788 // 7. Stuck sites 789 sections.push(await stuckSites()); 790 791 // 8. Rate limits (dynamic stage skips from rate-limit-scheduler) 792 sections.push(''); 793 sections.push(...rateLimitStatus()); 794 795 // 9. Zombie processes 796 sections.push(checkZombies()); 797 798 // 9b. Cron job health (overdue detection) 799 sections.push(''); 800 const { lines: cronHealthLines, actions: cronHealthActions } = await checkCronHealth(); 801 sections.push(...cronHealthLines); 802 allActions = [...allActions, ...cronHealthActions]; 803 804 // 10. LLM cost variance check 805 try { 806 const { checkBudgetVariance } = await import('../utils/llm-usage-tracker.js'); 807 const variances = checkBudgetVariance(); 808 if (variances.length > 0) { 809 sections.push(''); 810 sections.push('⚠️ LLM COST ALERTS:'); 811 for (const v of variances) { 812 if (v.type === 'cost_variance') { 813 sections.push( 814 ` ${v.stage}: avg $${v.avgCost.toFixed(4)}/call (budget: $${v.expectedCost.toFixed(4)}, max: $${v.maxCost.toFixed(4)}) — ${v.callCount} calls/hr` 815 ); 816 } else if (v.type === 'model_mismatch') { 817 sections.push( 818 ` ${v.stage}: using ${v.actualModel} (expected: ${v.expectedModel}) — ${v.callCount} calls/hr` 819 ); 820 } 821 } 822 } 823 } catch { 824 // Budget table may not exist yet — skip silently 825 } 826 827 // Append to status file 828 try { 829 appendFileSync(statusFile, `${sections.join('\n')}\n`); 830 } catch { 831 // If logs/ dir doesn't exist yet, create it and retry 832 mkdirSync(join(projectRoot, 'logs'), { recursive: true }); 833 appendFileSync(statusFile, `${sections.join('\n')}\n`); 834 } 835 836 const duration = ((Date.now() - startTime) / 1000).toFixed(1); 837 const summary = 838 allActions.length > 0 839 ? `Pipeline status check done in ${duration}s — actions: ${allActions.join(', ')}` 840 : `Pipeline status check done in ${duration}s — all clear`; 841 842 return { 843 summary, 844 checks_run: 10, 845 actions: allActions, 846 duration_seconds: parseFloat(duration), 847 }; 848 } 849 850 // CLI support 851 if (import.meta.url === `file://${process.argv[1]}`) { 852 runPipelineStatusMonitor() 853 .then(result => { 854 console.log(result.summary); 855 process.exit(0); 856 }) 857 .catch(err => { 858 console.error('Pipeline status monitor error:', err.message); 859 process.exit(1); 860 }); 861 }