/ src / cron / process-guardian.js
process-guardian.js
  1  /**
  2   * Process Guardian - Tier 1 System Health Monitor
  3   *
  4   * Runs every 1 minute via cron. Performs fast, lightweight checks to keep
  5   * services alive. This is a direct cron function (NOT an agent task) so it
  6   * works even when the agent system itself is broken.
  7   *
  8   * Checks:
  9   * 1. Pipeline service health - restart if stopped
 10   * 2. Clearance cycle completion - restart pipeline after clearance
 11   * 3. Circuit breaker status - alert on breaker-open errors
 12   * 4. Rate limit auto-clear - expire stale rate limits even when pipeline is dead
 13   * 5. Browser loop hung - restart pipeline if Playwright hung >30min while API loop active
 14   *
 15   * Writes human-readable summary to /tmp/watchdog-status.txt
 16   */
 17  
 18  import { getOne, run } from './../utils/db.js';
 19  import { execSync } from 'child_process';
 20  import { writeFileSync } from 'fs';
 21  import { join, dirname } from 'path';
 22  import { fileURLToPath } from 'url';
 23  import Logger from '../utils/logger.js';
 24  import { getSkipStages, getRateLimitStatus } from '../utils/rate-limit-scheduler.js';
 25  import '../utils/load-env.js';
 26  
 27  const __filename = fileURLToPath(import.meta.url);
 28  const __dirname = dirname(__filename);
 29  const projectRoot = join(__dirname, '../..');
 30  
 31  const logger = new Logger('ProcessGuardian');
 32  
 33  /**
 34   * Log a health check result to system_health table
 35   */
 36  async function logHealth(checkType, status, details, actionTaken = null) {
 37    await run(
 38      `INSERT INTO tel.system_health (check_type, status, details, action_taken)
 39       VALUES ($1, $2, $3, $4)`,
 40      [checkType, status, JSON.stringify(details), actionTaken]
 41    );
 42  }
 43  
 44  /**
 45   * Check 1: Pipeline Service Health
 46   * Restarts the pipeline systemd service if it is not active.
 47   */
 48  async function checkPipelineService() {
 49    let serviceStatus = 'unknown';
 50    let status = 'ok';
 51    let actionTaken = null;
 52  
 53    try {
 54      const output = execSync('systemctl --user is-active 333method-pipeline', {
 55        encoding: 'utf8',
 56        timeout: 10000,
 57      }).trim();
 58      serviceStatus = output;
 59    } catch (err) {
 60      serviceStatus = (err.stdout || '').trim() || 'inactive';
 61    }
 62  
 63    if (serviceStatus !== 'active') {
 64      logger.warn(`Pipeline service is ${serviceStatus} - attempting restart`);
 65      status = 'warning';
 66      // Cooldown: skip restart if we just issued one in the last 3 minutes.
 67      // With --no-block, the service briefly shows as 'deactivating' after a restart
 68      // is issued, so Check 1 would otherwise fire again next minute.
 69      const recentServiceRestart = await getOne(
 70        `SELECT created_at FROM tel.system_health
 71         WHERE check_type = 'pipeline_service'
 72           AND action_taken = 'restarted_pipeline'
 73           AND created_at > NOW() - INTERVAL '3 minutes'
 74         LIMIT 1`
 75      );
 76      if (recentServiceRestart) {
 77        logger.info(
 78          `Pipeline service ${serviceStatus} but restart cooldown active (last restart <3min ago) — waiting`
 79        );
 80        actionTaken = `cooldown_skip_restart_${serviceStatus}`;
 81      } else {
 82        // Check if pipeline has been non-active for >10min (stuck crash loop — code error, not transient)
 83        const stuckSince = await getOne(
 84          `SELECT MIN(created_at) as first_fail FROM tel.system_health
 85           WHERE check_type = 'pipeline_service'
 86             AND action_taken = 'restarted_pipeline'
 87             AND created_at > NOW() - INTERVAL '30 minutes'`
 88        );
 89        const minutesStuck = stuckSince?.first_fail
 90          ? (Date.now() - new Date(stuckSince.first_fail).getTime()) / 60000
 91          : 0;
 92  
 93        if (minutesStuck > 10) {
 94          // Pipeline has been in crash loop >10min — capture startup error and escalate
 95          let startupError = null;
 96          try {
 97            startupError = execSync(
 98              'journalctl --user -u 333method-pipeline.service -n 20 --no-pager -o cat 2>/dev/null || true',
 99              { encoding: 'utf8', timeout: 5000 }
100            ).trim();
101          } catch {
102            // journalctl not available in this environment
103          }
104  
105          // Create triage task directly in agent_tasks table
106          try {
107            await run(
108              `INSERT INTO tel.agent_tasks (task_type, assigned_to, priority, status, context_json, created_at, updated_at)
109               VALUES ('classify_error', 'triage', 10, 'pending', $1, NOW(), NOW())`,
110              [
111                JSON.stringify({
112                  error_type: 'pipeline_startup_crash',
113                  error_message: `Pipeline service stuck in crash loop for ${Math.round(minutesStuck)}min`,
114                  stack_trace: startupError?.substring(0, 500) || null,
115                  stage: 'startup',
116                  severity: 'critical',
117                }),
118              ]
119            );
120            logger.warn(
121              `Pipeline crash loop escalated to triage (${Math.round(minutesStuck)}min stuck)`
122            );
123          } catch (insertErr) {
124            logger.error('Failed to create triage task for pipeline crash loop', insertErr);
125          }
126          status = 'critical';
127          actionTaken = `crash_loop_escalated_${Math.round(minutesStuck)}min`;
128        }
129  
130        try {
131          execSync('systemctl --user restart --no-block 333method-pipeline', {
132            encoding: 'utf8',
133            timeout: 15000,
134          });
135          actionTaken = actionTaken || 'restarted_pipeline';
136          logger.success('Pipeline service restarted successfully');
137        } catch (restartErr) {
138          status = 'critical';
139          actionTaken = `restart_failed: ${restartErr.message}`;
140          logger.error('Failed to restart pipeline service', restartErr);
141        }
142      }
143    } else {
144      logger.info('Pipeline service is active');
145    }
146  
147    await logHealth('pipeline_service', status, { service_status: serviceStatus }, actionTaken);
148    return { check: 'pipeline_service', status, serviceStatus, actionTaken };
149  }
150  
151  /**
152   * Check 2: Clearance Cycle Completion Detection
153   * If clearance was running last check but is now gone, restart pipeline.
154   */
155  async function checkClearanceCycle() {
156    let clearanceRunning = false;
157    let actionTaken = null;
158    let status = 'ok';
159  
160    try {
161      const output = execSync('/run/current-system/sw/bin/ps aux || ps aux', {
162        encoding: 'utf8',
163        timeout: 10000,
164        shell: '/bin/sh',
165      });
166      clearanceRunning = output.includes('/tmp/run-clearance-cycle.sh');
167    } catch {
168      clearanceRunning = false;
169    }
170  
171    const lastRow = await getOne(
172      `SELECT details FROM tel.system_health
173       WHERE check_type = 'clearance_cycle'
174       ORDER BY created_at DESC LIMIT 1`
175    );
176  
177    let wasRunning = false;
178    if (lastRow) {
179      try {
180        const lastDetails = JSON.parse(lastRow.details || '{}');
181        wasRunning = lastDetails.clearance_running === true;
182      } catch {
183        wasRunning = false;
184      }
185    }
186  
187    if (wasRunning && !clearanceRunning) {
188      logger.info('Clearance cycles completed - restarting pipeline service');
189      try {
190        execSync('systemctl --user restart --no-block 333method-pipeline', {
191          encoding: 'utf8',
192          timeout: 15000,
193        });
194        actionTaken = 'restarted_pipeline_after_clearance';
195        logger.success('Pipeline restarted after clearance cycle completion');
196      } catch (restartErr) {
197        status = 'warning';
198        actionTaken = `clearance_restart_failed: ${restartErr.message}`;
199        logger.error('Failed to restart pipeline after clearance cycle', restartErr);
200      }
201    }
202  
203    const details = { clearance_running: clearanceRunning, was_running: wasRunning };
204    await logHealth('clearance_cycle', status, details, actionTaken);
205    logger.info(`Clearance cycle: ${clearanceRunning ? 'running' : 'not running'}`);
206    return { check: 'clearance_cycle', status, clearanceRunning, actionTaken };
207  }
208  
209  /**
210   * Check 3: Circuit Breaker Status
211   * Alerts if >3 'Breaker is open' errors in agent_tasks within the last hour.
212   */
213  async function checkCircuitBreaker() {
214    const row = await getOne(
215      `SELECT COUNT(*) as count
216       FROM tel.agent_tasks
217       WHERE created_at >= NOW() - INTERVAL '1 hour'
218         AND (
219           context_json LIKE '%Breaker is open%'
220           OR result_json LIKE '%Breaker is open%'
221           OR error_message LIKE '%Breaker is open%'
222         )`
223    );
224  
225    const count = row?.count || 0;
226    const CIRCUIT_BREAKER_THRESHOLD = 3;
227  
228    const status = count > CIRCUIT_BREAKER_THRESHOLD ? 'critical' : 'ok';
229    const details = {
230      breaker_open_errors_last_hour: count,
231      threshold: CIRCUIT_BREAKER_THRESHOLD,
232    };
233  
234    if (status === 'critical') {
235      logger.warn(
236        `Circuit breaker firing: ${count} 'Breaker is open' errors in last hour (threshold: ${CIRCUIT_BREAKER_THRESHOLD})`
237      );
238    } else {
239      logger.info(`Circuit breaker: ${count} open errors in last hour`);
240    }
241  
242    await logHealth('circuit_breaker', status, details);
243    return { check: 'circuit_breaker', status, ...details };
244  }
245  
246  /**
247   * Check 4: Rate Limit Auto-Clear
248   * Calls getSkipStages() which has the side effect of clearing expired rate limits
249   * from logs/rate-limits.json. This ensures stale rate limits are cleaned up even
250   * when the pipeline service is dead and not calling getSkipStages() itself.
251   */
252  function cleanExpiredRateLimits() {
253    let status = 'ok';
254    let activeRateLimits = 0;
255  
256    try {
257      // getSkipStages() removes expired entries as a side effect
258      const skippedStages = getSkipStages();
259      activeRateLimits = skippedStages.size;
260  
261      if (activeRateLimits > 0) {
262        const details = getRateLimitStatus();
263        const apis = details.map(d => `${d.api} (${d.waitMinutes}min left)`).join(', ');
264        logger.info(`Active rate limits: ${apis}`);
265      } else {
266        logger.info('Rate limits: none active');
267      }
268    } catch (err) {
269      status = 'warning';
270      logger.warn(`Rate limit cleanup failed: ${err.message}`);
271    }
272  
273    return { check: 'rate_limit_cleanup', status, activeRateLimits };
274  }
275  
276  /**
277   * Check 5: Browser Loop Hung Detection
278   *
279   * The pipeline has three loops: api_loop (SERPs), browser_loop (Assets, Enrich), and
280   * outreach_loop (Outreach, Replies). If browser_loop's last_browser_loop_at is stale
281   * while api_loop is recent, Playwright has hung on a page navigation or screenshot
282   * and will never self-recover.
283   *
284   * Threshold: browser_loop not updated in 30min while api_loop is active (<15min old).
285   * Action: restart the pipeline service to clear the hung browser context.
286   */
287  async function checkBrowserLoopHung() {
288    let status = 'ok';
289    let actionTaken = null;
290    let browserAgeMin = null;
291    let apiAgeMin = null;
292  
293    try {
294      const row = await getOne(
295        `SELECT
296           ROUND(EXTRACT(EPOCH FROM (NOW() - last_browser_loop_at)) / 60) as browser_age_min,
297           ROUND(EXTRACT(EPOCH FROM (NOW() - last_api_loop_at)) / 60) as api_age_min
298         FROM ops.pipeline_control LIMIT 1`
299      );
300  
301      if (!row || !Number.isFinite(Number(row.browser_age_min)) || !Number.isFinite(Number(row.api_age_min))) {
302        // No data yet — pipeline may not have started
303        await logHealth('browser_loop_hung', 'ok', { reason: 'no_pipeline_control_data' }, null);
304        return {
305          check: 'browser_loop_hung',
306          status: 'ok',
307          browserAgeMin: null,
308          apiAgeMin: null,
309          actionTaken: null,
310        };
311      }
312  
313      browserAgeMin = Number(row.browser_age_min);
314      apiAgeMin = Number(row.api_age_min);
315  
316      // ENABLE_VISION=false means browser_loop intentionally never runs (HTML-only mode).
317      // Check 5 must be skipped to avoid false-positive restarts every 30 min.
318      if (process.env.ENABLE_VISION === 'false') {
319        logger.info(
320          'browser_loop check skipped — ENABLE_VISION=false (HTML-only mode, no browser_loop expected)'
321        );
322        await logHealth(
323          'browser_loop_hung',
324          'ok',
325          { reason: 'vision_disabled', browser_age_min: browserAgeMin, api_age_min: apiAgeMin },
326          null
327        );
328        return {
329          check: 'browser_loop_hung',
330          status: 'ok',
331          browserAgeMin,
332          apiAgeMin,
333          actionTaken: 'skipped_vision_disabled',
334        };
335      }
336  
337      const BROWSER_HUNG_THRESHOLD_MIN = 45; // 30 was too tight when CPU gate is active
338      // api_loop cycles can take 30-80 min when proposals/scoring is slow, so
339      // last_api_loop_at goes stale mid-cycle even while the pipeline runs.
340      // Use the pipeline service active state as the primary "api_loop alive" signal.
341      // Fall back to the timestamp check (60 min) as a secondary guard.
342      const API_ACTIVE_THRESHOLD_MIN = 60;
343  
344      let pipelineServiceActive = false;
345      try {
346        const serviceStatus = execSync('systemctl --user is-active 333method-pipeline', {
347          encoding: 'utf8',
348          timeout: 5000,
349        }).trim();
350        pipelineServiceActive = serviceStatus === 'active';
351      } catch {
352        // systemctl failed or returned non-zero (inactive) — fall back to timestamp
353      }
354  
355      const browserHung = browserAgeMin > BROWSER_HUNG_THRESHOLD_MIN;
356      const apiActive = pipelineServiceActive || apiAgeMin < API_ACTIVE_THRESHOLD_MIN;
357  
358      if (browserHung && apiActive) {
359        // Cooldown: don't restart again within 20 minutes of last browser_loop-hung restart.
360        const RESTART_COOLDOWN_MIN = 20;
361        const recentRestart = await getOne(
362          `SELECT created_at FROM tel.system_health
363           WHERE check_type = 'browser_loop_hung'
364             AND action_taken LIKE 'restarted_pipeline_browser_hung_%'
365             AND created_at > NOW() - INTERVAL '${RESTART_COOLDOWN_MIN} minutes'
366           LIMIT 1`
367        );
368  
369        if (recentRestart) {
370          logger.info(
371            `browser_loop hung (${browserAgeMin}min) but restart cooldown active (last restart <${RESTART_COOLDOWN_MIN}min ago) — skipping`
372          );
373          actionTaken = `cooldown_skip_browser_hung_${browserAgeMin}min`;
374        } else {
375          status = 'warning';
376          logger.warn(
377            `browser_loop hung: stale ${browserAgeMin}min while api_loop active ${apiAgeMin}min — restarting pipeline`
378          );
379          try {
380            execSync('systemctl --user restart --no-block 333method-pipeline', {
381              encoding: 'utf8',
382              timeout: 15000,
383            });
384            actionTaken = `restarted_pipeline_browser_hung_${browserAgeMin}min`;
385            logger.success('Pipeline restarted to clear hung browser_loop');
386          } catch (restartErr) {
387            status = 'critical';
388            actionTaken = `browser_hung_restart_failed: ${restartErr.message}`;
389            logger.error('Failed to restart pipeline after browser_loop hang', restartErr);
390          }
391        }
392      } else {
393        logger.info(`browser_loop: ${browserAgeMin}min ago, api_loop: ${apiAgeMin}min ago — ok`);
394      }
395    } catch (err) {
396      logger.warn(`Browser loop check failed: ${err.message}`);
397    }
398  
399    await logHealth(
400      'browser_loop_hung',
401      status,
402      { browser_age_min: browserAgeMin, api_age_min: apiAgeMin },
403      actionTaken
404    );
405    return { check: 'browser_loop_hung', status, browserAgeMin, apiAgeMin, actionTaken };
406  }
407  
408  /**
409   * Check 6: Dead Cron Timer Detection
410   *
411   * When a cron service run hangs (e.g., a function handler blocks), systemd leaves the
412   * service in "activating" state. The timer uses OnUnitInactiveSec so it waits for the
413   * service to become inactive before scheduling the next run. A hung service means
414   * NextElapseUSecMonotonic=infinity — the timer will never fire again.
415   *
416   * Fix: detect `Trigger: n/a` (infinity elapse) and restart the timer.
417   * 10-min cooldown prevents rapid oscillation.
418   */
419  async function checkCronTimer() {
420    let status = 'ok';
421    let actionTaken = null;
422  
423    try {
424      const output = execSync(
425        'systemctl --user show mmo-cron.timer --property=NextElapseUSecMonotonic',
426        { encoding: 'utf8', timeout: 5000 }
427      ).trim();
428  
429      if (output.includes('infinity')) {
430        const COOLDOWN_MIN = 5;
431        const recentRestart = await getOne(
432          `SELECT created_at FROM tel.system_health
433           WHERE check_type = 'cron_timer_dead'
434             AND action_taken LIKE 'restarted_cron_timer%'
435             AND created_at > NOW() - INTERVAL '${COOLDOWN_MIN} minutes'
436           LIMIT 1`
437        );
438  
439        if (recentRestart) {
440          logger.info('Cron timer dead but cooldown active — skipping restart');
441          actionTaken = 'cooldown_skip_cron_timer';
442        } else {
443          status = 'warning';
444          logger.warn('Cron timer has no next trigger (infinity) — restarting timer');
445          try {
446            execSync('systemctl --user restart --no-block mmo-cron.timer', {
447              encoding: 'utf8',
448              timeout: 5000,
449            });
450            actionTaken = 'restarted_cron_timer';
451            logger.success('Cron timer restarted — next cycle will be scheduled on completion');
452          } catch (err) {
453            status = 'critical';
454            actionTaken = `cron_timer_restart_failed: ${err.message}`;
455            logger.error('Failed to restart cron timer', err);
456          }
457        }
458      } else {
459        logger.info('Cron timer: next trigger scheduled ok');
460      }
461    } catch (err) {
462      logger.warn(`Cron timer check failed: ${err.message}`);
463    }
464  
465    await logHealth('cron_timer_dead', status, {}, actionTaken);
466    return { check: 'cron_timer_dead', status, actionTaken };
467  }
468  
469  /**
470   * Clean up old system_health records (keep last 7 days)
471   */
472  async function cleanupOldRecords() {
473    const result = await run(
474      `DELETE FROM tel.system_health WHERE created_at < NOW() - INTERVAL '7 days'`
475    );
476    if (result.changes > 0) {
477      logger.info(`Cleaned up ${result.changes} old system_health records`);
478    }
479  }
480  
481  /**
482   * Write human-readable summary to /tmp/watchdog-status.txt
483   */
484  function writeStatusFile(results) {
485    const timestamp = new Date().toISOString().slice(0, 16).replace('T', ' ');
486  
487    const pipelineResult = results.find(r => r.check === 'pipeline_service');
488    const circuitResult = results.find(r => r.check === 'circuit_breaker');
489  
490    const pipelineOk = pipelineResult?.serviceStatus === 'active';
491    const pipelineIcon = pipelineOk ? '✅ running' : '❌ stopped';
492  
493    const cbErrors = circuitResult?.breaker_open_errors_last_hour || 0;
494    const cbIcon = cbErrors > 3 ? `❌ ${cbErrors} errors` : '✅ ok';
495  
496    const rateLimitResult = results.find(r => r.check === 'rate_limit_cleanup');
497    const rlCount = rateLimitResult?.activeRateLimits || 0;
498    const rlIcon = rlCount > 0 ? `⏳ ${rlCount} active` : '✅ none';
499  
500    const actionLines = results.filter(r => r.actionTaken).map(r => r.actionTaken);
501    const actionsStr = actionLines.length > 0 ? actionLines.join(', ') : 'none';
502  
503    const issues = results.filter(r => r.status !== 'ok').map(r => `${r.check}: ${r.status}`);
504  
505    const lines = [
506      `=== Process Guardian: ${timestamp} ===`,
507      `Pipeline: ${pipelineIcon} | Circuit breaker: ${cbIcon} | Rate limits: ${rlIcon}`,
508      `Actions taken: ${actionsStr}`,
509    ];
510  
511    if (issues.length > 0) {
512      lines.push(`Issues: ${issues.join(', ')}`);
513    }
514  
515    lines.push('===');
516  
517    writeFileSync('/tmp/watchdog-status.txt', `${lines.join('\n')}\n`);
518  }
519  
520  /**
521   * Main process guardian function - runs all health checks.
522   * @returns {Object} Summary of all check results
523   */
524  export async function runProcessGuardian() {
525    const startTime = Date.now();
526    logger.info('Process guardian starting health checks');
527  
528    const results = [];
529  
530    results.push(await checkPipelineService());
531    results.push(await checkClearanceCycle());
532    results.push(await checkCircuitBreaker());
533    results.push(cleanExpiredRateLimits());
534    results.push(await checkBrowserLoopHung());
535    results.push(await checkCronTimer());
536  
537    // Housekeeping (only every ~10 runs to keep Tier 1 fast)
538    if (Math.random() < 0.1) {
539      await cleanupOldRecords();
540    }
541  
542    writeStatusFile(results);
543  
544    const duration = ((Date.now() - startTime) / 1000).toFixed(1);
545    const criticalCount = results.filter(r => r.status === 'critical').length;
546    const warningCount = results.filter(r => r.status === 'warning').length;
547    const okCount = results.filter(r => r.status === 'ok').length;
548  
549    const summary = {
550      ran_at: new Date().toISOString(),
551      duration_seconds: parseFloat(duration),
552      checks_run: results.length,
553      ok: okCount,
554      warnings: warningCount,
555      critical: criticalCount,
556      results,
557    };
558  
559    if (criticalCount > 0) {
560      logger.warn(
561        `Process guardian complete in ${duration}s: ${criticalCount} critical, ${warningCount} warnings`
562      );
563    } else {
564      logger.success(`Process guardian complete in ${duration}s: all ${okCount} checks passed`);
565    }
566  
567    return summary;
568  }
569  
570  // CLI support
571  if (import.meta.url === `file://${process.argv[1]}`) {
572    runProcessGuardian()
573      .then(result => {
574        console.log(JSON.stringify(result, null, 2));
575        process.exit(0);
576      })
577      .catch(err => {
578        console.error('Process guardian error:', err.message);
579        process.exit(1);
580      });
581  }