process-guardian.js
1 /** 2 * Process Guardian - Tier 1 System Health Monitor 3 * 4 * Runs every 1 minute via cron. Performs fast, lightweight checks to keep 5 * services alive. This is a direct cron function (NOT an agent task) so it 6 * works even when the agent system itself is broken. 7 * 8 * Checks: 9 * 1. Pipeline service health - restart if stopped 10 * 2. Clearance cycle completion - restart pipeline after clearance 11 * 3. Circuit breaker status - alert on breaker-open errors 12 * 4. Rate limit auto-clear - expire stale rate limits even when pipeline is dead 13 * 5. Browser loop hung - restart pipeline if Playwright hung >30min while API loop active 14 * 15 * Writes human-readable summary to /tmp/watchdog-status.txt 16 */ 17 18 import { getOne, run } from './../utils/db.js'; 19 import { execSync } from 'child_process'; 20 import { writeFileSync } from 'fs'; 21 import { join, dirname } from 'path'; 22 import { fileURLToPath } from 'url'; 23 import Logger from '../utils/logger.js'; 24 import { getSkipStages, getRateLimitStatus } from '../utils/rate-limit-scheduler.js'; 25 import '../utils/load-env.js'; 26 27 const __filename = fileURLToPath(import.meta.url); 28 const __dirname = dirname(__filename); 29 const projectRoot = join(__dirname, '../..'); 30 31 const logger = new Logger('ProcessGuardian'); 32 33 /** 34 * Log a health check result to system_health table 35 */ 36 async function logHealth(checkType, status, details, actionTaken = null) { 37 await run( 38 `INSERT INTO tel.system_health (check_type, status, details, action_taken) 39 VALUES ($1, $2, $3, $4)`, 40 [checkType, status, JSON.stringify(details), actionTaken] 41 ); 42 } 43 44 /** 45 * Check 1: Pipeline Service Health 46 * Restarts the pipeline systemd service if it is not active. 47 */ 48 async function checkPipelineService() { 49 let serviceStatus = 'unknown'; 50 let status = 'ok'; 51 let actionTaken = null; 52 53 try { 54 const output = execSync('systemctl --user is-active 333method-pipeline', { 55 encoding: 'utf8', 56 timeout: 10000, 57 }).trim(); 58 serviceStatus = output; 59 } catch (err) { 60 serviceStatus = (err.stdout || '').trim() || 'inactive'; 61 } 62 63 if (serviceStatus !== 'active') { 64 logger.warn(`Pipeline service is ${serviceStatus} - attempting restart`); 65 status = 'warning'; 66 // Cooldown: skip restart if we just issued one in the last 3 minutes. 67 // With --no-block, the service briefly shows as 'deactivating' after a restart 68 // is issued, so Check 1 would otherwise fire again next minute. 69 const recentServiceRestart = await getOne( 70 `SELECT created_at FROM tel.system_health 71 WHERE check_type = 'pipeline_service' 72 AND action_taken = 'restarted_pipeline' 73 AND created_at > NOW() - INTERVAL '3 minutes' 74 LIMIT 1` 75 ); 76 if (recentServiceRestart) { 77 logger.info( 78 `Pipeline service ${serviceStatus} but restart cooldown active (last restart <3min ago) — waiting` 79 ); 80 actionTaken = `cooldown_skip_restart_${serviceStatus}`; 81 } else { 82 // Check if pipeline has been non-active for >10min (stuck crash loop — code error, not transient) 83 const stuckSince = await getOne( 84 `SELECT MIN(created_at) as first_fail FROM tel.system_health 85 WHERE check_type = 'pipeline_service' 86 AND action_taken = 'restarted_pipeline' 87 AND created_at > NOW() - INTERVAL '30 minutes'` 88 ); 89 const minutesStuck = stuckSince?.first_fail 90 ? (Date.now() - new Date(stuckSince.first_fail).getTime()) / 60000 91 : 0; 92 93 if (minutesStuck > 10) { 94 // Pipeline has been in crash loop >10min — capture startup error and escalate 95 let startupError = null; 96 try { 97 startupError = execSync( 98 'journalctl --user -u 333method-pipeline.service -n 20 --no-pager -o cat 2>/dev/null || true', 99 { encoding: 'utf8', timeout: 5000 } 100 ).trim(); 101 } catch { 102 // journalctl not available in this environment 103 } 104 105 // Create triage task directly in agent_tasks table 106 try { 107 await run( 108 `INSERT INTO tel.agent_tasks (task_type, assigned_to, priority, status, context_json, created_at, updated_at) 109 VALUES ('classify_error', 'triage', 10, 'pending', $1, NOW(), NOW())`, 110 [ 111 JSON.stringify({ 112 error_type: 'pipeline_startup_crash', 113 error_message: `Pipeline service stuck in crash loop for ${Math.round(minutesStuck)}min`, 114 stack_trace: startupError?.substring(0, 500) || null, 115 stage: 'startup', 116 severity: 'critical', 117 }), 118 ] 119 ); 120 logger.warn( 121 `Pipeline crash loop escalated to triage (${Math.round(minutesStuck)}min stuck)` 122 ); 123 } catch (insertErr) { 124 logger.error('Failed to create triage task for pipeline crash loop', insertErr); 125 } 126 status = 'critical'; 127 actionTaken = `crash_loop_escalated_${Math.round(minutesStuck)}min`; 128 } 129 130 try { 131 execSync('systemctl --user restart --no-block 333method-pipeline', { 132 encoding: 'utf8', 133 timeout: 15000, 134 }); 135 actionTaken = actionTaken || 'restarted_pipeline'; 136 logger.success('Pipeline service restarted successfully'); 137 } catch (restartErr) { 138 status = 'critical'; 139 actionTaken = `restart_failed: ${restartErr.message}`; 140 logger.error('Failed to restart pipeline service', restartErr); 141 } 142 } 143 } else { 144 logger.info('Pipeline service is active'); 145 } 146 147 await logHealth('pipeline_service', status, { service_status: serviceStatus }, actionTaken); 148 return { check: 'pipeline_service', status, serviceStatus, actionTaken }; 149 } 150 151 /** 152 * Check 2: Clearance Cycle Completion Detection 153 * If clearance was running last check but is now gone, restart pipeline. 154 */ 155 async function checkClearanceCycle() { 156 let clearanceRunning = false; 157 let actionTaken = null; 158 let status = 'ok'; 159 160 try { 161 const output = execSync('/run/current-system/sw/bin/ps aux || ps aux', { 162 encoding: 'utf8', 163 timeout: 10000, 164 shell: '/bin/sh', 165 }); 166 clearanceRunning = output.includes('/tmp/run-clearance-cycle.sh'); 167 } catch { 168 clearanceRunning = false; 169 } 170 171 const lastRow = await getOne( 172 `SELECT details FROM tel.system_health 173 WHERE check_type = 'clearance_cycle' 174 ORDER BY created_at DESC LIMIT 1` 175 ); 176 177 let wasRunning = false; 178 if (lastRow) { 179 try { 180 const lastDetails = JSON.parse(lastRow.details || '{}'); 181 wasRunning = lastDetails.clearance_running === true; 182 } catch { 183 wasRunning = false; 184 } 185 } 186 187 if (wasRunning && !clearanceRunning) { 188 logger.info('Clearance cycles completed - restarting pipeline service'); 189 try { 190 execSync('systemctl --user restart --no-block 333method-pipeline', { 191 encoding: 'utf8', 192 timeout: 15000, 193 }); 194 actionTaken = 'restarted_pipeline_after_clearance'; 195 logger.success('Pipeline restarted after clearance cycle completion'); 196 } catch (restartErr) { 197 status = 'warning'; 198 actionTaken = `clearance_restart_failed: ${restartErr.message}`; 199 logger.error('Failed to restart pipeline after clearance cycle', restartErr); 200 } 201 } 202 203 const details = { clearance_running: clearanceRunning, was_running: wasRunning }; 204 await logHealth('clearance_cycle', status, details, actionTaken); 205 logger.info(`Clearance cycle: ${clearanceRunning ? 'running' : 'not running'}`); 206 return { check: 'clearance_cycle', status, clearanceRunning, actionTaken }; 207 } 208 209 /** 210 * Check 3: Circuit Breaker Status 211 * Alerts if >3 'Breaker is open' errors in agent_tasks within the last hour. 212 */ 213 async function checkCircuitBreaker() { 214 const row = await getOne( 215 `SELECT COUNT(*) as count 216 FROM tel.agent_tasks 217 WHERE created_at >= NOW() - INTERVAL '1 hour' 218 AND ( 219 context_json LIKE '%Breaker is open%' 220 OR result_json LIKE '%Breaker is open%' 221 OR error_message LIKE '%Breaker is open%' 222 )` 223 ); 224 225 const count = row?.count || 0; 226 const CIRCUIT_BREAKER_THRESHOLD = 3; 227 228 const status = count > CIRCUIT_BREAKER_THRESHOLD ? 'critical' : 'ok'; 229 const details = { 230 breaker_open_errors_last_hour: count, 231 threshold: CIRCUIT_BREAKER_THRESHOLD, 232 }; 233 234 if (status === 'critical') { 235 logger.warn( 236 `Circuit breaker firing: ${count} 'Breaker is open' errors in last hour (threshold: ${CIRCUIT_BREAKER_THRESHOLD})` 237 ); 238 } else { 239 logger.info(`Circuit breaker: ${count} open errors in last hour`); 240 } 241 242 await logHealth('circuit_breaker', status, details); 243 return { check: 'circuit_breaker', status, ...details }; 244 } 245 246 /** 247 * Check 4: Rate Limit Auto-Clear 248 * Calls getSkipStages() which has the side effect of clearing expired rate limits 249 * from logs/rate-limits.json. This ensures stale rate limits are cleaned up even 250 * when the pipeline service is dead and not calling getSkipStages() itself. 251 */ 252 function cleanExpiredRateLimits() { 253 let status = 'ok'; 254 let activeRateLimits = 0; 255 256 try { 257 // getSkipStages() removes expired entries as a side effect 258 const skippedStages = getSkipStages(); 259 activeRateLimits = skippedStages.size; 260 261 if (activeRateLimits > 0) { 262 const details = getRateLimitStatus(); 263 const apis = details.map(d => `${d.api} (${d.waitMinutes}min left)`).join(', '); 264 logger.info(`Active rate limits: ${apis}`); 265 } else { 266 logger.info('Rate limits: none active'); 267 } 268 } catch (err) { 269 status = 'warning'; 270 logger.warn(`Rate limit cleanup failed: ${err.message}`); 271 } 272 273 return { check: 'rate_limit_cleanup', status, activeRateLimits }; 274 } 275 276 /** 277 * Check 5: Browser Loop Hung Detection 278 * 279 * The pipeline has three loops: api_loop (SERPs), browser_loop (Assets, Enrich), and 280 * outreach_loop (Outreach, Replies). If browser_loop's last_browser_loop_at is stale 281 * while api_loop is recent, Playwright has hung on a page navigation or screenshot 282 * and will never self-recover. 283 * 284 * Threshold: browser_loop not updated in 30min while api_loop is active (<15min old). 285 * Action: restart the pipeline service to clear the hung browser context. 286 */ 287 async function checkBrowserLoopHung() { 288 let status = 'ok'; 289 let actionTaken = null; 290 let browserAgeMin = null; 291 let apiAgeMin = null; 292 293 try { 294 const row = await getOne( 295 `SELECT 296 ROUND(EXTRACT(EPOCH FROM (NOW() - last_browser_loop_at)) / 60) as browser_age_min, 297 ROUND(EXTRACT(EPOCH FROM (NOW() - last_api_loop_at)) / 60) as api_age_min 298 FROM ops.pipeline_control LIMIT 1` 299 ); 300 301 if (!row || !Number.isFinite(Number(row.browser_age_min)) || !Number.isFinite(Number(row.api_age_min))) { 302 // No data yet — pipeline may not have started 303 await logHealth('browser_loop_hung', 'ok', { reason: 'no_pipeline_control_data' }, null); 304 return { 305 check: 'browser_loop_hung', 306 status: 'ok', 307 browserAgeMin: null, 308 apiAgeMin: null, 309 actionTaken: null, 310 }; 311 } 312 313 browserAgeMin = Number(row.browser_age_min); 314 apiAgeMin = Number(row.api_age_min); 315 316 // ENABLE_VISION=false means browser_loop intentionally never runs (HTML-only mode). 317 // Check 5 must be skipped to avoid false-positive restarts every 30 min. 318 if (process.env.ENABLE_VISION === 'false') { 319 logger.info( 320 'browser_loop check skipped — ENABLE_VISION=false (HTML-only mode, no browser_loop expected)' 321 ); 322 await logHealth( 323 'browser_loop_hung', 324 'ok', 325 { reason: 'vision_disabled', browser_age_min: browserAgeMin, api_age_min: apiAgeMin }, 326 null 327 ); 328 return { 329 check: 'browser_loop_hung', 330 status: 'ok', 331 browserAgeMin, 332 apiAgeMin, 333 actionTaken: 'skipped_vision_disabled', 334 }; 335 } 336 337 const BROWSER_HUNG_THRESHOLD_MIN = 45; // 30 was too tight when CPU gate is active 338 // api_loop cycles can take 30-80 min when proposals/scoring is slow, so 339 // last_api_loop_at goes stale mid-cycle even while the pipeline runs. 340 // Use the pipeline service active state as the primary "api_loop alive" signal. 341 // Fall back to the timestamp check (60 min) as a secondary guard. 342 const API_ACTIVE_THRESHOLD_MIN = 60; 343 344 let pipelineServiceActive = false; 345 try { 346 const serviceStatus = execSync('systemctl --user is-active 333method-pipeline', { 347 encoding: 'utf8', 348 timeout: 5000, 349 }).trim(); 350 pipelineServiceActive = serviceStatus === 'active'; 351 } catch { 352 // systemctl failed or returned non-zero (inactive) — fall back to timestamp 353 } 354 355 const browserHung = browserAgeMin > BROWSER_HUNG_THRESHOLD_MIN; 356 const apiActive = pipelineServiceActive || apiAgeMin < API_ACTIVE_THRESHOLD_MIN; 357 358 if (browserHung && apiActive) { 359 // Cooldown: don't restart again within 20 minutes of last browser_loop-hung restart. 360 const RESTART_COOLDOWN_MIN = 20; 361 const recentRestart = await getOne( 362 `SELECT created_at FROM tel.system_health 363 WHERE check_type = 'browser_loop_hung' 364 AND action_taken LIKE 'restarted_pipeline_browser_hung_%' 365 AND created_at > NOW() - INTERVAL '${RESTART_COOLDOWN_MIN} minutes' 366 LIMIT 1` 367 ); 368 369 if (recentRestart) { 370 logger.info( 371 `browser_loop hung (${browserAgeMin}min) but restart cooldown active (last restart <${RESTART_COOLDOWN_MIN}min ago) — skipping` 372 ); 373 actionTaken = `cooldown_skip_browser_hung_${browserAgeMin}min`; 374 } else { 375 status = 'warning'; 376 logger.warn( 377 `browser_loop hung: stale ${browserAgeMin}min while api_loop active ${apiAgeMin}min — restarting pipeline` 378 ); 379 try { 380 execSync('systemctl --user restart --no-block 333method-pipeline', { 381 encoding: 'utf8', 382 timeout: 15000, 383 }); 384 actionTaken = `restarted_pipeline_browser_hung_${browserAgeMin}min`; 385 logger.success('Pipeline restarted to clear hung browser_loop'); 386 } catch (restartErr) { 387 status = 'critical'; 388 actionTaken = `browser_hung_restart_failed: ${restartErr.message}`; 389 logger.error('Failed to restart pipeline after browser_loop hang', restartErr); 390 } 391 } 392 } else { 393 logger.info(`browser_loop: ${browserAgeMin}min ago, api_loop: ${apiAgeMin}min ago — ok`); 394 } 395 } catch (err) { 396 logger.warn(`Browser loop check failed: ${err.message}`); 397 } 398 399 await logHealth( 400 'browser_loop_hung', 401 status, 402 { browser_age_min: browserAgeMin, api_age_min: apiAgeMin }, 403 actionTaken 404 ); 405 return { check: 'browser_loop_hung', status, browserAgeMin, apiAgeMin, actionTaken }; 406 } 407 408 /** 409 * Check 6: Dead Cron Timer Detection 410 * 411 * When a cron service run hangs (e.g., a function handler blocks), systemd leaves the 412 * service in "activating" state. The timer uses OnUnitInactiveSec so it waits for the 413 * service to become inactive before scheduling the next run. A hung service means 414 * NextElapseUSecMonotonic=infinity — the timer will never fire again. 415 * 416 * Fix: detect `Trigger: n/a` (infinity elapse) and restart the timer. 417 * 10-min cooldown prevents rapid oscillation. 418 */ 419 async function checkCronTimer() { 420 let status = 'ok'; 421 let actionTaken = null; 422 423 try { 424 const output = execSync( 425 'systemctl --user show mmo-cron.timer --property=NextElapseUSecMonotonic', 426 { encoding: 'utf8', timeout: 5000 } 427 ).trim(); 428 429 if (output.includes('infinity')) { 430 const COOLDOWN_MIN = 5; 431 const recentRestart = await getOne( 432 `SELECT created_at FROM tel.system_health 433 WHERE check_type = 'cron_timer_dead' 434 AND action_taken LIKE 'restarted_cron_timer%' 435 AND created_at > NOW() - INTERVAL '${COOLDOWN_MIN} minutes' 436 LIMIT 1` 437 ); 438 439 if (recentRestart) { 440 logger.info('Cron timer dead but cooldown active — skipping restart'); 441 actionTaken = 'cooldown_skip_cron_timer'; 442 } else { 443 status = 'warning'; 444 logger.warn('Cron timer has no next trigger (infinity) — restarting timer'); 445 try { 446 execSync('systemctl --user restart --no-block mmo-cron.timer', { 447 encoding: 'utf8', 448 timeout: 5000, 449 }); 450 actionTaken = 'restarted_cron_timer'; 451 logger.success('Cron timer restarted — next cycle will be scheduled on completion'); 452 } catch (err) { 453 status = 'critical'; 454 actionTaken = `cron_timer_restart_failed: ${err.message}`; 455 logger.error('Failed to restart cron timer', err); 456 } 457 } 458 } else { 459 logger.info('Cron timer: next trigger scheduled ok'); 460 } 461 } catch (err) { 462 logger.warn(`Cron timer check failed: ${err.message}`); 463 } 464 465 await logHealth('cron_timer_dead', status, {}, actionTaken); 466 return { check: 'cron_timer_dead', status, actionTaken }; 467 } 468 469 /** 470 * Clean up old system_health records (keep last 7 days) 471 */ 472 async function cleanupOldRecords() { 473 const result = await run( 474 `DELETE FROM tel.system_health WHERE created_at < NOW() - INTERVAL '7 days'` 475 ); 476 if (result.changes > 0) { 477 logger.info(`Cleaned up ${result.changes} old system_health records`); 478 } 479 } 480 481 /** 482 * Write human-readable summary to /tmp/watchdog-status.txt 483 */ 484 function writeStatusFile(results) { 485 const timestamp = new Date().toISOString().slice(0, 16).replace('T', ' '); 486 487 const pipelineResult = results.find(r => r.check === 'pipeline_service'); 488 const circuitResult = results.find(r => r.check === 'circuit_breaker'); 489 490 const pipelineOk = pipelineResult?.serviceStatus === 'active'; 491 const pipelineIcon = pipelineOk ? '✅ running' : '❌ stopped'; 492 493 const cbErrors = circuitResult?.breaker_open_errors_last_hour || 0; 494 const cbIcon = cbErrors > 3 ? `❌ ${cbErrors} errors` : '✅ ok'; 495 496 const rateLimitResult = results.find(r => r.check === 'rate_limit_cleanup'); 497 const rlCount = rateLimitResult?.activeRateLimits || 0; 498 const rlIcon = rlCount > 0 ? `⏳ ${rlCount} active` : '✅ none'; 499 500 const actionLines = results.filter(r => r.actionTaken).map(r => r.actionTaken); 501 const actionsStr = actionLines.length > 0 ? actionLines.join(', ') : 'none'; 502 503 const issues = results.filter(r => r.status !== 'ok').map(r => `${r.check}: ${r.status}`); 504 505 const lines = [ 506 `=== Process Guardian: ${timestamp} ===`, 507 `Pipeline: ${pipelineIcon} | Circuit breaker: ${cbIcon} | Rate limits: ${rlIcon}`, 508 `Actions taken: ${actionsStr}`, 509 ]; 510 511 if (issues.length > 0) { 512 lines.push(`Issues: ${issues.join(', ')}`); 513 } 514 515 lines.push('==='); 516 517 writeFileSync('/tmp/watchdog-status.txt', `${lines.join('\n')}\n`); 518 } 519 520 /** 521 * Main process guardian function - runs all health checks. 522 * @returns {Object} Summary of all check results 523 */ 524 export async function runProcessGuardian() { 525 const startTime = Date.now(); 526 logger.info('Process guardian starting health checks'); 527 528 const results = []; 529 530 results.push(await checkPipelineService()); 531 results.push(await checkClearanceCycle()); 532 results.push(await checkCircuitBreaker()); 533 results.push(cleanExpiredRateLimits()); 534 results.push(await checkBrowserLoopHung()); 535 results.push(await checkCronTimer()); 536 537 // Housekeeping (only every ~10 runs to keep Tier 1 fast) 538 if (Math.random() < 0.1) { 539 await cleanupOldRecords(); 540 } 541 542 writeStatusFile(results); 543 544 const duration = ((Date.now() - startTime) / 1000).toFixed(1); 545 const criticalCount = results.filter(r => r.status === 'critical').length; 546 const warningCount = results.filter(r => r.status === 'warning').length; 547 const okCount = results.filter(r => r.status === 'ok').length; 548 549 const summary = { 550 ran_at: new Date().toISOString(), 551 duration_seconds: parseFloat(duration), 552 checks_run: results.length, 553 ok: okCount, 554 warnings: warningCount, 555 critical: criticalCount, 556 results, 557 }; 558 559 if (criticalCount > 0) { 560 logger.warn( 561 `Process guardian complete in ${duration}s: ${criticalCount} critical, ${warningCount} warnings` 562 ); 563 } else { 564 logger.success(`Process guardian complete in ${duration}s: all ${okCount} checks passed`); 565 } 566 567 return summary; 568 } 569 570 // CLI support 571 if (import.meta.url === `file://${process.argv[1]}`) { 572 runProcessGuardian() 573 .then(result => { 574 console.log(JSON.stringify(result, null, 2)); 575 process.exit(0); 576 }) 577 .catch(err => { 578 console.error('Process guardian error:', err.message); 579 process.exit(1); 580 }); 581 }