/ src / cron.js
cron.js
   1  #!/usr/bin/env node
   2  
   3  /**
   4   * Database-Driven Cron Runner (v2)
   5   *
   6   * Reads job definitions from cron_jobs table instead of hardcoded TASKS object.
   7   * Maintains backward compatibility with existing handler functions.
   8   *
   9   * Migration path:
  10   * 1. Run migration: psql -d mmo < db/migrations/029-create-cron-jobs-table.sql
  11   * 2. Seed database: npm run cron:migrate
  12   * 3. Update systemd/crontab to use this file instead of src/cron.js
  13   */
  14  
  15  /* eslint-disable require-await */
  16  
  17  import { existsSync, readdirSync, unlinkSync, renameSync, statSync, mkdirSync, symlinkSync, createReadStream, createWriteStream, writeFileSync } from 'fs';
  18  import { join, dirname } from 'path';
  19  import { fileURLToPath } from 'url';
  20  import { pipeline } from 'stream/promises';
  21  import { createGzip } from 'zlib';
  22  import { execFile } from 'child_process';
  23  import { syncEmailEvents } from './utils/sync-email-events.js';
  24  import { syncUnsubscribes } from './utils/sync-unsubscribes.js';
  25  import { pollInboundSMS } from './inbound/sms.js';
  26  import { pollInboundEmails } from './inbound/email.js';
  27  import { processAllReplies } from './inbound/processor.js';
  28  import { pollPurchases } from './cron/poll-purchases.js';
  29  import { pollFreeScans } from './cron/poll-free-scans.js';
  30  import { processPendingPurchases } from './cron/process-purchases.js';
  31  import { precomputeDashboard } from './cron/precompute-dashboard.js';
  32  import { runProcessGuardian } from './cron/process-guardian.js';
  33  import { runProcessReaper } from './cron/process-reaper.js';
  34  import { runPipelineStatusMonitor } from './cron/pipeline-status-monitor.js';
  35  import { classifyUnknownErrors } from './cron/classify-unknown-errors.js';
  36  import { runCleanupTestDbs } from './cron/cleanup-test-dbs.js';
  37  import { runAutoresponder } from './cron/autoresponder.js';
  38  import { sendScanEmailSequence } from './cron/send-scan-email-sequence.js';
  39  import { runCitationMonitor } from './cron/citation-monitor.js';
  40  import { rotateLogs } from './utils/log-rotator.js';
  41  import { createAgentTask } from './agents/utils/task-manager.js';
  42  import { getOne, getAll, run } from './utils/db.js';
  43  import Logger from './utils/logger.js';
  44  import './utils/load-env.js';
  45  
  46  const __filename = fileURLToPath(import.meta.url);
  47  const __dirname = dirname(__filename);
  48  const projectRoot = join(__dirname, '..');
  49  
  50  const logger = new Logger('Cron');
  51  
  52  /**
  53   * Check and clear stale locks
  54   * Uses cron_locks table (migrated from config table in migration 038)
  55   */
  56  async function checkAndClearStaleLock(lockKey, stageName) {
  57    const lock = await getOne('SELECT updated_at FROM ops.cron_locks WHERE lock_key = $1', [lockKey]);
  58  
  59    if (lock) {
  60      const lockAge = Date.now() - new Date(lock.updated_at).getTime();
  61      const STALE_THRESHOLD = 10 * 60 * 1000; // 10 minutes
  62  
  63      if (lockAge < STALE_THRESHOLD) {
  64        logger.warn(`${stageName} already running, skipping`);
  65        return false;
  66      } else {
  67        const ageMinutes = (lockAge / 1000 / 60).toFixed(1);
  68        logger.warn(
  69          `Stale lock detected for ${stageName} (${ageMinutes} min old), clearing and proceeding`
  70        );
  71        await run('DELETE FROM ops.cron_locks WHERE lock_key = $1', [lockKey]);
  72        return true;
  73      }
  74    }
  75  
  76    return true; // No lock, proceed
  77  }
  78  
  79  // =============================================================================
  80  // Backup helpers (DR-096: sync-safe DB backup via Syncthing)
  81  // =============================================================================
  82  
  83  /**
  84   * Atomically update a sync-snapshot symlink.
  85   * Creates new symlink under a temp name then renames over old one — no window
  86   * where the link is absent (avoids Syncthing propagating a deletion).
  87   */
  88  function updateSyncSnapshot(backupDir, prefix, latestFile, ext = 'db.gz') {
  89    const snapshotDir = join(backupDir, 'sync-snapshot');
  90    mkdirSync(snapshotDir, { recursive: true });
  91    const linkName = `${prefix}-latest.${ext}`;
  92    const linkPath = join(snapshotDir, linkName);
  93    const tmpLink = `${linkPath  }.new`;
  94    try { unlinkSync(tmpLink); } catch (_e) { /* cleanup */ }
  95    symlinkSync(join('..', latestFile), tmpLink);
  96    renameSync(tmpLink, linkPath);
  97  }
  98  
  99  /**
 100   * Grandfathered retention: 4×4h + 4×daily + 4×weekly = 12 backups max.
 101   * Files must be named <prefix>-<tier>-<timestamp>.<ext> e.g. sites-4h-2026-03-25T14-00.db.gz
 102   * Rotation keeps the newest N files per tier, deletes the rest.
 103   */
 104  function rotateBackups(backupDir, prefix, ext = 'db.gz') {
 105    const tiers = { '4h': 4, daily: 4, weekly: 4 };
 106    let deletedCount = 0;
 107    let deletedBytes = 0;
 108  
 109    for (const [tier, keep] of Object.entries(tiers)) {
 110      const pattern = new RegExp(`^${prefix}-${tier}-.*\\.${ext.replace(/\./g, '\\.')}$`);
 111      const files = readdirSync(backupDir)
 112        .filter(f => pattern.test(f))
 113        .sort()
 114        .reverse(); // newest first (ISO timestamps sort lexicographically)
 115  
 116      const toDelete = files.slice(keep);
 117      for (const f of toDelete) {
 118        const fp = join(backupDir, f);
 119        try {
 120          const {size} = statSync(fp);
 121          unlinkSync(fp);
 122          deletedCount++;
 123          deletedBytes += size;
 124          logger.info(`Deleted old backup: ${f}`);
 125        } catch (e) {
 126          logger.warn(`Failed to delete ${f}: ${e.message}`);
 127        }
 128      }
 129    }
 130  
 131    if (deletedCount > 0) {
 132      logger.info(`Rotation: deleted ${deletedCount} old backups (freed ${(deletedBytes / 1024 / 1024).toFixed(1)} MB)`);
 133    }
 134    return { deletedCount, deletedBytes };
 135  }
 136  
 137  /**
 138   * Clean up orphaned WAL sidecar files on /store.
 139   * These were left behind by better-sqlite3 backups (legacy — no longer generated).
 140   */
 141  function cleanupWalOrphans(backupDir) {
 142    for (const f of readdirSync(backupDir)) {
 143      if (f.endsWith('.db-shm') || f.endsWith('.db-wal') || f.endsWith('.db-journal')) {
 144        try {
 145          unlinkSync(join(backupDir, f));
 146          logger.info(`Cleaned up WAL orphan: ${f}`);
 147        } catch (_e) { /* cleanup */ }
 148      }
 149    }
 150  }
 151  
 152  /**
 153   * Determine which backup tier to assign based on existing backups.
 154   * Returns '4h' normally. Returns 'daily' if no daily exists for today.
 155   * Returns 'weekly' if no weekly exists for this ISO week.
 156   */
 157  function determineTier(backupDir, prefix, ext = 'db.gz') {
 158    const files = readdirSync(backupDir);
 159    const today = new Date().toISOString().split('T')[0];
 160    const isoWeek = getISOWeek(new Date());
 161  
 162    const dailyPattern = new RegExp(`^${prefix}-daily-${today}\\.${ext.replace(/\./g, '\\.')}$`);
 163    const weeklyPattern = new RegExp(`^${prefix}-weekly-${isoWeek}\\.${ext.replace(/\./g, '\\.')}$`);
 164  
 165    if (!files.some(f => weeklyPattern.test(f))) return 'weekly';
 166    if (!files.some(f => dailyPattern.test(f))) return 'daily';
 167    return '4h';
 168  }
 169  
 170  function getISOWeek(date) {
 171    const d = new Date(Date.UTC(date.getFullYear(), date.getMonth(), date.getDate()));
 172    const dayNum = d.getUTCDay() || 7;
 173    d.setUTCDate(d.getUTCDate() + 4 - dayNum);
 174    const yearStart = new Date(Date.UTC(d.getUTCFullYear(), 0, 1));
 175    const weekNum = Math.ceil(((d - yearStart) / 86400000 + 1) / 7);
 176    return `${d.getUTCFullYear()}-W${String(weekNum).padStart(2, '0')}`;
 177  }
 178  
 179  /**
 180   * Format a backup filename: <prefix>-<tier>-<timestamp>.<ext>
 181   */
 182  function backupFileName(prefix, tier, ext = 'db') {
 183    const now = new Date();
 184    let stamp;
 185    if (tier === '4h') {
 186      stamp = now.toISOString().replace(/[:.]/g, '-').slice(0, 16);
 187    } else if (tier === 'daily') {
 188      stamp = now.toISOString().split('T')[0];
 189    } else {
 190      stamp = getISOWeek(now);
 191    }
 192    return `${prefix}-${tier}-${stamp}.${ext}`;
 193  }
 194  
 195  /**
 196   * Gzip a file atomically: read → gzip → write .tmp → rename to .gz → delete original.
 197   * Temp file is in the same directory (same filesystem) so rename is atomic.
 198   */
 199  async function gzipFile(srcPath) {
 200    const gzPath = `${srcPath  }.gz`;
 201    const tmp = `${gzPath  }.tmp`;
 202    try {
 203      await pipeline(createReadStream(srcPath), createGzip({ level: 6 }), createWriteStream(tmp));
 204      renameSync(tmp, gzPath);
 205    } catch (err) {
 206      try { unlinkSync(tmp); } catch (_e) { /* cleanup */ } // clean up .tmp on failure
 207      throw err;
 208    }
 209    unlinkSync(srcPath); // delete uncompressed original
 210    return gzPath;
 211  }
 212  
 213  /**
 214   * Registry of handler functions
 215   * Maps task_key to handler function
 216   */
 217  const HANDLERS = {
 218    // ===== REAL-TIME OPERATIONS =====
 219    syncEmailEvents: async () => {
 220      const result = await syncEmailEvents();
 221      return {
 222        summary: `Synced ${result?.synced || 0} email events`,
 223        details: result || {},
 224        metrics: {
 225          synced: result?.synced || 0,
 226          errors: result?.errors || 0,
 227        },
 228      };
 229    },
 230  
 231    syncUnsubscribes: async () => {
 232      const result = await syncUnsubscribes();
 233      return {
 234        summary: `Synced ${result?.synced || 0} unsubscribes`,
 235        details: result || {},
 236        metrics: {
 237          synced: result?.synced || 0,
 238          errors: result?.errors || 0,
 239        },
 240      };
 241    },
 242  
 243    pollInboundSMS: async () => {
 244      const result = await pollInboundSMS();
 245      return {
 246        summary: `Polled ${result?.processed || 0} inbound SMS messages`,
 247        details: result || {},
 248        metrics: {
 249          processed: result?.processed || 0,
 250          new_messages: result?.new_messages || 0,
 251        },
 252      };
 253    },
 254  
 255    pollInboundEmails: async () => {
 256      const result = await pollInboundEmails();
 257      return {
 258        summary: `Polled ${result?.processed || 0} inbound email messages, ${result?.stored || 0} stored`,
 259        details: result || {},
 260        metrics: {
 261          processed: result?.processed || 0,
 262          stored: result?.stored || 0,
 263          unmatched: result?.unmatched || 0,
 264        },
 265      };
 266    },
 267  
 268    sendPendingReplies: async () => {
 269      const result = await processAllReplies();
 270      const smsSent = result?.sms?.sent || 0;
 271      const emailSent = result?.email?.sent || 0;
 272      return {
 273        summary: `Sent ${smsSent + emailSent} pending replies (SMS: ${smsSent}, Email: ${emailSent})`,
 274        details: result || {},
 275        metrics: { sms_sent: smsSent, email_sent: emailSent },
 276      };
 277    },
 278  
 279    autoresponder: async () => {
 280      const result = await runAutoresponder();
 281      return result;
 282    },
 283  
 284    pollFreeScans: async () => {
 285      const result = await pollFreeScans();
 286      return {
 287        summary: `Polled ${result?.processed || 0} free scans, ${result?.inserted || 0} archived`,
 288        details: result || {},
 289        metrics: { processed: result?.processed || 0, inserted: result?.inserted || 0 },
 290      };
 291    },
 292  
 293    pollPurchases: async () => {
 294      const result = await pollPurchases();
 295      return {
 296        summary: `Polled ${result?.processed || 0} purchases, ${result?.successful || 0} ingested`,
 297        details: result || {},
 298        metrics: { processed: result?.processed || 0, successful: result?.successful || 0 },
 299      };
 300    },
 301  
 302    processPurchases: async () => {
 303      const result = await processPendingPurchases();
 304      return {
 305        summary: `Processed ${result?.processed || 0} purchases, ${result?.delivered || 0} delivered`,
 306        details: result || {},
 307        metrics: {
 308          processed: result?.processed || 0,
 309          delivered: result?.delivered || 0,
 310          failed: result?.failed || 0,
 311        },
 312      };
 313    },
 314  
 315    sendScanEmailSequence: async () => {
 316      const result = await sendScanEmailSequence();
 317      return {
 318        summary: `Scan sequence: ${result?.sent || 0} sent, ${result?.skipped || 0} skipped, ${result?.failed || 0} failed`,
 319        details: result || {},
 320        metrics: {
 321          checked: result?.checked || 0,
 322          sent:    result?.sent    || 0,
 323          skipped: result?.skipped || 0,
 324          failed:  result?.failed  || 0,
 325        },
 326      };
 327    },
 328  
 329    // ===== AEO / CITATION MONITORING =====
 330    citationMonitor: async () => {
 331      const result = await runCitationMonitor();
 332      return result;
 333    },
 334  
 335    // ===== MONITORING =====
 336    checkKeywords: async () => {
 337      const pending = await getOne('SELECT COUNT(*) as count FROM keywords WHERE status = $1', ['pending']);
 338      const active  = await getOne('SELECT COUNT(*) as count FROM keywords WHERE status = $1', ['active']);
 339      const total   = await getOne('SELECT COUNT(*) as count FROM keywords', []);
 340  
 341      logger.info(`Found ${pending.count} pending keywords`);
 342  
 343      return {
 344        summary: `${pending.count} pending keywords in queue`,
 345        details: {
 346          pending_count: pending.count,
 347          active_count: active.count,
 348          total_count: total.count,
 349        },
 350        metrics: {
 351          pending: pending.count,
 352          active: active.count,
 353          total: total.count,
 354        },
 355      };
 356    },
 357  
 358    precomputeDashboard: async () => {
 359      const result = await precomputeDashboard();
 360      return result;
 361    },
 362  
 363    // Tier 1: Process Guardian (direct function - works even if agents are broken)
 364    processGuardian: async () => {
 365      const result = await runProcessGuardian();
 366      return {
 367        summary: `Process guardian: ${result.checks_run} checks, ${result.ok} ok, ${result.warnings} warnings, ${result.critical} critical`,
 368        details: result,
 369        metrics: {
 370          checks_run: result.checks_run,
 371          ok: result.ok,
 372          warnings: result.warnings,
 373          critical: result.critical,
 374          duration_seconds: result.duration_seconds,
 375        },
 376      };
 377    },
 378  
 379    // Process Reaper - kills stale agent processes, logs zombie count + memory pressure
 380    processReaper: async () => {
 381      const result = await runProcessReaper();
 382      return {
 383        summary: `Process reaper: ${result.zombie_count} zombies, ${result.free_mem_mb}MB free, ${result.stale_processes_killed} stale killed`,
 384        details: result,
 385        metrics: {
 386          zombie_count: result.zombie_count,
 387          free_mem_mb: result.free_mem_mb,
 388          swap_pct: result.swap_pct,
 389          stale_processes_killed: result.stale_processes_killed,
 390          duration_seconds: result.duration_seconds,
 391        },
 392      };
 393    },
 394  
 395    // Cleanup leftover test DB files from tests/ and /tmp
 396    cleanupTestDbs: () => {
 397      const result = runCleanupTestDbs();
 398      return {
 399        summary: `Cleaned up ${result.deleted} stale test DB(s), freed ${result.freed_kb} KB`,
 400        details: result,
 401        metrics: {
 402          deleted: result.deleted,
 403          freed_kb: result.freed_kb,
 404        },
 405      };
 406    },
 407  
 408    // Pipeline Status Monitor (writes human-readable report to logs/pipeline-status.txt)
 409    pipelineStatusMonitor: async () => {
 410      const result = await runPipelineStatusMonitor();
 411      return {
 412        summary: result.summary,
 413        details: result,
 414        metrics: {
 415          checks_run: result.checks_run,
 416          duration_seconds: result.duration_seconds,
 417          actions_taken: result.actions.length,
 418        },
 419      };
 420    },
 421  
 422    // Tier 2: Pipeline Monitor (creates monitor agent tasks for pipeline checks)
 423    monitorPipeline: async () => {
 424      const taskTypes = [
 425        { task_type: 'check_pipeline_health', priority: 7 },
 426        { task_type: 'scan_logs', priority: 5 },
 427        { task_type: 'check_process_compliance', priority: 5 },
 428        { task_type: 'check_loops', priority: 6 },
 429        { task_type: 'check_blocked_tasks', priority: 6 },
 430      ];
 431  
 432      let created = 0;
 433      let skipped = 0;
 434  
 435      for (const taskDef of taskTypes) {
 436        try {
 437          const taskId = await createAgentTask({
 438            task_type: taskDef.task_type,
 439            assigned_to: 'monitor',
 440            created_by: 'cron',
 441            priority: taskDef.priority,
 442            context: { source: 'cron_tier2', recurring: true },
 443          });
 444  
 445          if (taskId) created++;
 446          else skipped++;
 447        } catch {
 448          // createAgentTask returns null for duplicates via findDuplicateTask
 449          skipped++;
 450        }
 451      }
 452  
 453      return {
 454        summary: `Pipeline monitor: ${created} tasks created, ${skipped} skipped (duplicates)`,
 455        details: { task_types: taskTypes.map(t => t.task_type), created, skipped },
 456        metrics: { created, skipped },
 457      };
 458    },
 459  
 460    // Tier 3: System Health (creates monitor agent tasks for heavier checks)
 461    monitorSystem: async () => {
 462      const taskTypes = [
 463        { task_type: 'check_agent_health', priority: 6 },
 464        { task_type: 'detect_anomaly', priority: 4 },
 465        { task_type: 'check_slo_compliance', priority: 7 },
 466      ];
 467  
 468      let created = 0;
 469      let skipped = 0;
 470  
 471      for (const taskDef of taskTypes) {
 472        try {
 473          const taskId = await createAgentTask({
 474            task_type: taskDef.task_type,
 475            assigned_to: 'monitor',
 476            created_by: 'cron',
 477            priority: taskDef.priority,
 478            context: { source: 'cron_tier3', recurring: true },
 479          });
 480  
 481          if (taskId) created++;
 482          else skipped++;
 483        } catch {
 484          skipped++;
 485        }
 486      }
 487  
 488      return {
 489        summary: `System health: ${created} tasks created, ${skipped} skipped (duplicates)`,
 490        details: { task_types: taskTypes.map(t => t.task_type), created, skipped },
 491        metrics: { created, skipped },
 492      };
 493    },
 494  
 495    // Classify unknown errors — Phase 2 LLM call delegated to claude-orchestrator.sh --type classify_errors
 496    // This cron job now runs only Phase 1 (retry reclassification) and Phase 3 (auto-apply proposals).
 497    classifyUnknownErrors: async () => {
 498      const result = await classifyUnknownErrors();
 499      return {
 500        summary: `Retry reclassified: ${result.sites_retried || 0} sites, ${result.outreaches_retried || 0} outreaches`,
 501        details: result,
 502        metrics: {
 503          sites_retried: result.sites_retried || 0,
 504          outreaches_retried: result.outreaches_retried || 0,
 505          patterns_applied: result.patterns_applied || 0,
 506        },
 507      };
 508    },
 509  
 510    // ===== MAINTENANCE =====
 511    databaseMaintenance: async () => {
 512      logger.info('Running database maintenance...');
 513  
 514      const stats        = await getOne('SELECT COUNT(*) as sites FROM sites', []);
 515      const outreaches   = await getOne("SELECT COUNT(*) as count FROM messages WHERE direction = 'outbound'", []);
 516      const conversations = await getOne("SELECT COUNT(*) as count FROM messages WHERE direction = 'inbound'", []);
 517      const keywords     = await getOne('SELECT COUNT(*) as count FROM keywords', []);
 518  
 519      // PostgreSQL does not have PRAGMA integrity_check. Report row counts only.
 520      const healthy = true;
 521  
 522      logger.success(
 523        `Database healthy: ${stats.sites} sites, ${outreaches.count} outreaches`
 524      );
 525  
 526      return {
 527        summary: `Database healthy: ${stats.sites} sites, ${outreaches.count} outreaches`,
 528        details: {
 529          integrity_status: 'ok',
 530          table_counts: {
 531            sites: stats.sites,
 532            outreaches: outreaches.count,
 533            conversations: conversations.count,
 534            keywords: keywords.count,
 535          },
 536          operations: ['Row counts collected (PostgreSQL — no PRAGMA optimize/integrity_check)'],
 537        },
 538        metrics: {
 539          sites: stats.sites,
 540          outreaches: outreaches.count,
 541          conversations: conversations.count,
 542          keywords: keywords.count,
 543          healthy: healthy ? 1 : 0,
 544        },
 545      };
 546    },
 547  
 548    vacuumDatabase: async () => {
 549      // PostgreSQL runs autovacuum automatically; manual VACUUM FULL requires a superuser
 550      // connection and exclusive lock — not appropriate for a cron job on a shared pool.
 551      // This handler is retained for scheduler compatibility but performs no action.
 552      logger.info('vacuumDatabase: skipped — PostgreSQL manages autovacuum automatically');
 553      return {
 554        summary: 'Vacuum skipped: PostgreSQL autovacuum handles this automatically',
 555        details: { note: 'Use pg_stat_user_tables to monitor bloat if needed.' },
 556        metrics: { saved_bytes: 0, saved_mb: 0, databases_vacuumed: 0 },
 557      };
 558    },
 559  
 560    walCheckpoint: async () => {
 561      // Repurposed: clean up old WAL archive files (>7 days) from the store volume.
 562      // PG's archive_command copies WAL to /run/media/jason/store/backups/pg/wal/.
 563      // Without cleanup, this grows indefinitely (~16MB per file).
 564      const walDir = '/run/media/jason/store/backups/pg/wal';
 565      const maxAgeDays = 7;
 566      const cutoff = Date.now() - maxAgeDays * 86400000;
 567      let deleted = 0;
 568      let freedBytes = 0;
 569      let errors = 0;
 570  
 571      try {
 572        if (!existsSync(walDir)) {
 573          logger.warn(`walCheckpoint: WAL archive dir not found: ${walDir}`);
 574          return { summary: 'WAL cleanup skipped: archive dir not found', details: { walDir }, metrics: { deleted: 0 } };
 575        }
 576  
 577        const files = readdirSync(walDir).filter(f => /^[0-9A-F]{24}$/.test(f));
 578        for (const f of files) {
 579          try {
 580            const fp = join(walDir, f);
 581            const { mtimeMs, size } = statSync(fp);
 582            if (mtimeMs < cutoff) {
 583              unlinkSync(fp);
 584              deleted++;
 585              freedBytes += size;
 586            }
 587          } catch (e) {
 588            errors++;
 589            if (errors <= 3) logger.warn(`walCheckpoint: failed to delete ${f}: ${e.message}`);
 590          }
 591        }
 592  
 593        const totalFiles = files.length;
 594        const freedMB = (freedBytes / 1024 / 1024).toFixed(1);
 595        logger.info(`walCheckpoint: deleted ${deleted}/${totalFiles} WAL archives older than ${maxAgeDays}d (freed ${freedMB} MB)${errors ? `, ${errors} errors` : ''}`);
 596  
 597        return {
 598          summary: `Cleaned ${deleted} WAL archives (${freedMB} MB freed)`,
 599          details: { walDir, totalFiles, deleted, errors, freedMB, maxAgeDays },
 600          metrics: { wal_archives_deleted: deleted, wal_freed_mb: parseFloat(freedMB), wal_total_files: totalFiles - deleted },
 601        };
 602      } catch (e) {
 603        logger.error(`walCheckpoint: ${e.message}`);
 604        return { summary: `WAL cleanup failed: ${e.message}`, details: {}, metrics: { deleted: 0 } };
 605      }
 606    },
 607  
 608    backupDatabase: async () => {
 609      // pg_dump the mmo database to the store volume with tiered retention.
 610      const dumpDir = '/run/media/jason/store/backups/pg/dumps';
 611      const prefix = 'mmo';
 612  
 613      try {
 614        mkdirSync(dumpDir, { recursive: true });
 615      } catch (e) {
 616        logger.error(`backupDatabase: cannot create dump dir ${dumpDir}: ${e.message}`);
 617        return { summary: `Backup failed: ${e.message}`, details: {}, metrics: { success: 0 } };
 618      }
 619  
 620      const tier = determineTier(dumpDir, prefix, 'dump.gz');
 621      const filename = backupFileName(prefix, tier, 'dump.gz');
 622      const filepath = join(dumpDir, filename);
 623  
 624      logger.info(`backupDatabase: starting pg_dump → ${filename} (${tier})`);
 625      const startMs = Date.now();
 626  
 627      try {
 628        // pg_dump → gzip → file
 629        await new Promise((resolve, reject) => {
 630          const dump = execFile('pg_dump', [
 631            '-h', '/run/postgresql',
 632            '-d', 'mmo',
 633            '--format=custom',
 634            '--no-owner',
 635            '--compress=6',
 636            '-f', filepath,
 637          ], { timeout: 300000 }, (err) => {
 638            if (err) reject(err);
 639            else resolve();
 640          });
 641        });
 642  
 643        const { size } = statSync(filepath);
 644        const sizeMB = (size / 1024 / 1024).toFixed(1);
 645        const durationSec = ((Date.now() - startMs) / 1000).toFixed(1);
 646        logger.info(`backupDatabase: ${filename} — ${sizeMB} MB in ${durationSec}s`);
 647  
 648        // Update sync-snapshot symlink
 649        updateSyncSnapshot(dumpDir, prefix, filename, 'dump.gz');
 650  
 651        // Rotate old backups (keep 4 per tier)
 652        const rotation = rotateBackups(dumpDir, prefix, 'dump.gz');
 653  
 654        // Clean up legacy SQLite WAL orphans in the old backup dir
 655        const oldBackupDir = join(projectRoot, 'db', 'backup');
 656        if (existsSync(oldBackupDir)) {
 657          cleanupWalOrphans(oldBackupDir);
 658        }
 659  
 660        return {
 661          summary: `pg_dump ${tier}: ${filename} (${sizeMB} MB, ${durationSec}s)`,
 662          details: { filename, sizeMB, durationSec, tier, rotation },
 663          metrics: { success: 1, size_mb: parseFloat(sizeMB), duration_sec: parseFloat(durationSec) },
 664        };
 665      } catch (e) {
 666        logger.error(`backupDatabase: pg_dump failed: ${e.message}`);
 667        // Clean up partial file
 668        try { unlinkSync(filepath); } catch (_) { /* */ }
 669        return { summary: `Backup failed: ${e.message}`, details: {}, metrics: { success: 0 } };
 670      }
 671    },
 672  
 673    analyzePerformance: async () => {
 674      logger.info('Analyzing database performance...');
 675  
 676      // Gather table row counts from PostgreSQL catalog
 677      const tableRows = await getAll(`
 678        SELECT relname AS table_name,
 679               reltuples::bigint AS rows
 680        FROM pg_class
 681        JOIN pg_namespace ON pg_namespace.oid = pg_class.relnamespace
 682        WHERE pg_namespace.nspname = 'm333'
 683          AND pg_class.relkind = 'r'
 684        ORDER BY reltuples DESC
 685      `, []);
 686  
 687      const tableStats = tableRows.map(r => ({ table_name: r.table_name, rows: Number(r.rows) }));
 688  
 689      // Gather existing indexes from PostgreSQL catalog
 690      const indexRows = await getAll(`
 691        SELECT indexname AS name,
 692               tablename AS tbl_name,
 693               indexdef  AS sql
 694        FROM pg_indexes
 695        WHERE schemaname = 'm333'
 696      `, []);
 697  
 698      const indexNames = new Set(indexRows.map(i => i.name));
 699  
 700      // Build set of indexed column combos per table for smarter detection
 701      const indexedColumns = {};
 702      for (const idx of indexRows) {
 703        if (!indexedColumns[idx.tbl_name]) indexedColumns[idx.tbl_name] = [];
 704        const colMatch = idx.sql && idx.sql.match(/\(([^)]+)\)/);
 705        if (colMatch) {
 706          const cols = colMatch[1]
 707            .split(',')
 708            .map(c => c.trim().replace(/ (ASC|DESC|COLLATE \w+)/gi, ''));
 709          indexedColumns[idx.tbl_name].push(cols);
 710        }
 711      }
 712  
 713      const recommendations = [];
 714  
 715      // Common query patterns to check via EXPLAIN
 716      const commonQueries = [
 717        {
 718          sql: "SELECT * FROM sites WHERE status = 'prog_scored' AND error_message IS NOT NULL LIMIT 1",
 719          table: 'sites',
 720          cols: ['status', 'error_message'],
 721          desc: 'Retry failed scoring (status + error_message filter)',
 722        },
 723        {
 724          sql: "SELECT * FROM messages WHERE direction = 'inbound' AND read_at IS NULL LIMIT 1",
 725          table: 'messages',
 726          cols: ['direction', 'read_at'],
 727          desc: 'Unread inbound messages',
 728        },
 729        {
 730          sql: "SELECT * FROM sites WHERE country_code = 'US' AND status = 'found' LIMIT 1",
 731          table: 'sites',
 732          cols: ['country_code', 'status'],
 733          desc: 'Country-filtered pipeline queries',
 734        },
 735        {
 736          sql: "SELECT * FROM messages WHERE contact_method = 'email' AND direction = 'outbound' AND delivery_status = 'sent' LIMIT 1",
 737          table: 'messages',
 738          cols: ['contact_method', 'direction', 'delivery_status'],
 739          desc: 'Channel-specific outreach queries',
 740        },
 741        {
 742          sql: "SELECT * FROM sites WHERE recapture_at IS NOT NULL AND recapture_at <= NOW() LIMIT 1",
 743          table: 'sites',
 744          cols: ['recapture_at'],
 745          desc: 'Sites scheduled for recapture',
 746        },
 747        {
 748          sql: "SELECT * FROM messages WHERE direction = 'inbound' AND read_at IS NULL ORDER BY created_at DESC LIMIT 1",
 749          table: 'messages',
 750          cols: ['direction', 'read_at', 'created_at'],
 751          desc: 'Recent unread inbound messages',
 752        },
 753        {
 754          sql: "SELECT * FROM tel.llm_usage WHERE stage = 'scoring' AND created_at >= NOW() - INTERVAL '7 days' LIMIT 1",
 755          table: 'llm_usage',
 756          cols: ['stage', 'created_at'],
 757          desc: 'Recent LLM usage by stage',
 758        },
 759        {
 760          sql: "SELECT * FROM ops.cron_job_logs WHERE job_name = 'test' AND status = 'failed' LIMIT 1",
 761          table: 'cron_job_logs',
 762          cols: ['job_name', 'status'],
 763          desc: 'Failed cron job lookup',
 764        },
 765      ];
 766  
 767      // Run EXPLAIN to detect sequential scans
 768      // SAFETY: sql comes from the hardcoded commonQueries array above — never user input
 769      for (const { sql, table, cols, desc } of commonQueries) {
 770        try {
 771          const planRows = await getAll(`EXPLAIN ${sql}`, []);
 772          const planText = planRows.map(r => Object.values(r)[0]).join('\n');
 773          const hasSeqScan = planText.includes('Seq Scan');
 774  
 775          if (hasSeqScan) {
 776            // Check if a useful composite index already covers these columns
 777            const tableIndexes = indexedColumns[table] || [];
 778            const alreadyCovered = tableIndexes.some(idxCols =>
 779              cols.every((c, i) => idxCols[i] && idxCols[i].toLowerCase() === c.toLowerCase())
 780            );
 781  
 782            if (!alreadyCovered) {
 783              const idxName = `idx_${table}_${cols.join('_')}`;
 784              const idxSql = `CREATE INDEX IF NOT EXISTS ${idxName} ON ${table}(${cols.join(', ')});`;
 785              recommendations.push({
 786                file: `${table}:${cols.join(',')}`,
 787                priority: 'medium',
 788                reason:
 789                  `**Sequential scan detected** on \`${table}\` for: ${desc}\n\n` +
 790                  `**Suggested fix:**\n\`\`\`sql\n${idxSql}\n\`\`\``,
 791              });
 792            }
 793          }
 794        } catch {
 795          // Query may reference missing tables; skip
 796        }
 797      }
 798  
 799      // Check for large tables with sparse indexing
 800      for (const stat of tableStats) {
 801        if (stat.rows > 5000) {
 802          const count = (indexedColumns[stat.table_name] || []).length;
 803          if (count < 2) {
 804            recommendations.push({
 805              file: `${stat.table_name}:sparse_indexes`,
 806              priority: 'medium',
 807              reason:
 808                `**Sparse indexing** on large table \`${stat.table_name}\` ` +
 809                `(${stat.rows.toLocaleString()} rows, only ${count} user-defined indexes).\n\n` +
 810                `Review query patterns and add indexes on frequently filtered/sorted columns.`,
 811            });
 812          }
 813        }
 814      }
 815  
 816      // Store new recommendations in human_review_queue (deduplicated)
 817      let newCount = 0;
 818      for (const rec of recommendations) {
 819        const existing = await getOne(
 820          "SELECT COUNT(*) as count FROM human_review_queue WHERE type = 'performance' AND file = $1 AND status = 'pending'",
 821          [rec.file]
 822        );
 823        if (Number(existing.count) === 0) {
 824          await run(
 825            "INSERT INTO human_review_queue (file, reason, type, priority) VALUES ($1, $2, 'performance', $3)",
 826            [rec.file, rec.reason, rec.priority]
 827          );
 828          newCount++;
 829        }
 830      }
 831  
 832      const summaryMsg =
 833        `Analyzed ${tableStats.length} tables, ${indexRows.length} indexes. ` +
 834        `Found ${recommendations.length} recommendations (${newCount} new).`;
 835      logger.success(summaryMsg);
 836  
 837      return {
 838        summary: summaryMsg,
 839        details: {
 840          table_stats: tableStats,
 841          index_count: indexRows.length,
 842          recommendations,
 843          new_recommendations: newCount,
 844        },
 845        metrics: {
 846          tables_analyzed: tableStats.length,
 847          index_count: indexRows.length,
 848          recommendations_found: recommendations.length,
 849          new_recommendations: newCount,
 850        },
 851      };
 852    },
 853  
 854    rotateLogs: async () => {
 855      const result = rotateLogs({ logDir: join(projectRoot, 'logs'), retentionDays: 3 });
 856      const summary = `Log rotation: deleted ${result.deleted} files (${(result.freedSpace / (1024 * 1024)).toFixed(2)} MB freed), kept ${result.kept}`;
 857      logger.success(summary);
 858      return {
 859        summary,
 860        details: result,
 861        metrics: {
 862          deleted: result.deleted,
 863          kept: result.kept,
 864          freed_mb: result.freedSpace / (1024 * 1024),
 865        },
 866      };
 867    },
 868  
 869    checkRateLimits: async () => {
 870      const checks = {
 871        zenrows: process.env.ZENROWS_API_KEY ? 'configured' : 'missing',
 872        openrouter: process.env.OPENROUTER_API_KEY ? 'configured' : 'missing',
 873        resend: process.env.RESEND_API_KEY ? 'configured' : 'missing',
 874        twilio: process.env.TWILIO_ACCOUNT_SID ? 'configured' : 'missing',
 875      };
 876  
 877      const configuredCount = Object.values(checks).filter(v => v === 'configured').length;
 878      const missingCount = Object.values(checks).filter(v => v === 'missing').length;
 879  
 880      logger.info('Rate limit health check:', checks);
 881  
 882      return {
 883        summary: `API keys: ${configuredCount} configured, ${missingCount} missing`,
 884        details: {
 885          api_status: checks,
 886          configured_count: configuredCount,
 887          missing_count: missingCount,
 888          note: 'Full rate limit checks require API calls - not yet implemented',
 889        },
 890        metrics: {
 891          configured: configuredCount,
 892          missing: missingCount,
 893        },
 894      };
 895    },
 896  
 897    purgeSiteStatusHistory: async () => {
 898      // Keep only the last 5 status entries per active site.
 899      // For ignored sites, keep only 1 (they're terminal — history is irrelevant).
 900      // This prevents site_status from growing unboundedly (observed: 2.3M rows = ~400MB).
 901      const beforeRow = await getOne('SELECT COUNT(*) as n FROM site_status', []);
 902      const beforeCount = Number(beforeRow.n);
 903  
 904      await run(`
 905        DELETE FROM site_status
 906        WHERE id NOT IN (
 907          SELECT id FROM site_status s2
 908          WHERE s2.site_id = site_status.site_id
 909          ORDER BY s2.id DESC
 910          LIMIT 5
 911        )
 912      `, []);
 913  
 914      // For ignored sites, keep only 1 entry (the most recent)
 915      await run(`
 916        DELETE FROM site_status
 917        WHERE id NOT IN (
 918          SELECT MAX(ss.id) FROM site_status ss
 919          JOIN sites s ON ss.site_id = s.id
 920          WHERE s.status = 'ignored'
 921          GROUP BY ss.site_id
 922        )
 923        AND site_id IN (SELECT id FROM sites WHERE status = 'ignored')
 924      `, []);
 925  
 926      const afterRow = await getOne('SELECT COUNT(*) as n FROM site_status', []);
 927      const afterCount = Number(afterRow.n);
 928      const deleted = beforeCount - afterCount;
 929      const summary = `Purged ${deleted.toLocaleString()} old site_status rows (${beforeCount.toLocaleString()} → ${afterCount.toLocaleString()})`;
 930      logger.success(summary);
 931      return { summary, metrics: { rows_deleted: deleted, rows_remaining: afterCount } };
 932    },
 933  
 934    diskCleanup: async () => {
 935      const { rmSync, readdirSync, statSync, existsSync } = await import('fs');
 936      const { join: pathJoin } = await import('path');
 937  
 938      const results = [];
 939      let totalFreedBytes = 0;
 940  
 941      const removeDir = dir => {
 942        const full = pathJoin(projectRoot, dir);
 943        if (existsSync(full)) {
 944          const before = statSync(full).size || 0;
 945          rmSync(full, { recursive: true, force: true });
 946          results.push({ path: dir, action: 'removed_dir' });
 947          // Approximate: use 0 since rmSync doesn't return size; caller tracks separately
 948          return before;
 949        }
 950        return 0;
 951      };
 952  
 953      // 1. Coverage & test artifact directories
 954      for (const dir of ['coverage', '.coverage-backup-testrunner']) {
 955        removeDir(dir);
 956      }
 957  
 958      // Any .nyc_output* dirs at project root
 959      const rootEntries = readdirSync(projectRoot);
 960      for (const entry of rootEntries) {
 961        if (entry.startsWith('.nyc_output')) {
 962          removeDir(entry);
 963        }
 964      }
 965  
 966      // 2. Test DBs in db/ (keep test-e2e.db)
 967      const dbDir = pathJoin(projectRoot, 'db');
 968      const testDbs = readdirSync(dbDir).filter(
 969        f => f.startsWith('test-') && f.endsWith('.db') && f !== 'test-e2e.db'
 970      );
 971      for (const f of testDbs) {
 972        const full = pathJoin(dbDir, f);
 973        const { size } = statSync(full);
 974        rmSync(full, { force: true });
 975        totalFreedBytes += size;
 976        results.push({
 977          path: `db/${f}`,
 978          action: 'removed_file',
 979          size_mb: (size / 1024 / 1024).toFixed(2),
 980        });
 981      }
 982  
 983      // 3. Old files in backups/ (root-level, not db/backup which vacuumDatabase manages)
 984      const backupsDir = pathJoin(projectRoot, 'backups');
 985      if (existsSync(backupsDir)) {
 986        const cutoff = Date.now() - 3 * 24 * 60 * 60 * 1000;
 987        const walk = dir => {
 988          for (const entry of readdirSync(dir, { withFileTypes: true })) {
 989            const full = pathJoin(dir, entry.name);
 990            if (entry.isDirectory()) {
 991              walk(full);
 992            } else {
 993              const { mtimeMs, size } = statSync(full);
 994              if (mtimeMs < cutoff) {
 995                rmSync(full, { force: true });
 996                totalFreedBytes += size;
 997                results.push({
 998                  path: `backups/${entry.name}`,
 999                  action: 'removed_file',
1000                  size_mb: (size / 1024 / 1024).toFixed(2),
1001                });
1002              }
1003            }
1004          }
1005        };
1006        walk(backupsDir);
1007      }
1008  
1009      const freedMB = (totalFreedBytes / 1024 / 1024).toFixed(2);
1010      const summary = `Disk cleanup: ${results.length} actions, ${freedMB} MB freed`;
1011      logger.success(summary);
1012      return {
1013        summary,
1014        details: results,
1015        metrics: { actions: results.length, freed_mb: parseFloat(freedMB) },
1016      };
1017    },
1018  
1019    technicalDebtReview: async () => {
1020      const { readFileSync } = await import('fs');
1021      const { join } = await import('path');
1022  
1023      try {
1024        const todoPath = join(projectRoot, 'docs/TODO.md');
1025        const todoContent = readFileSync(todoPath, 'utf-8');
1026        const incompleteTasks = (todoContent.match(/- \[ \]/g) || []).length;
1027        const completedTasks = (todoContent.match(/- \[x\]/gi) || []).length;
1028        const totalTasks = incompleteTasks + completedTasks;
1029        const completionRate = totalTasks > 0 ? ((completedTasks / totalTasks) * 100).toFixed(1) : 0;
1030  
1031        logger.info(
1032          `Technical debt: ${incompleteTasks} incomplete tasks, ${completedTasks} completed`
1033        );
1034  
1035        return {
1036          summary: `TODO.md: ${incompleteTasks} incomplete, ${completedTasks} completed (${completionRate}% done)`,
1037          details: {
1038            incomplete_tasks: incompleteTasks,
1039            completed_tasks: completedTasks,
1040            total_tasks: totalTasks,
1041            completion_rate: parseFloat(completionRate),
1042            file_path: todoPath,
1043          },
1044          metrics: {
1045            incomplete: incompleteTasks,
1046            completed: completedTasks,
1047            completion_rate: parseFloat(completionRate),
1048          },
1049        };
1050      } catch (error) {
1051        logger.warn('Could not read TODO.md');
1052        return {
1053          summary: 'Could not read TODO.md',
1054          details: { error: error.message },
1055          metrics: { success: 0 },
1056        };
1057      }
1058    },
1059  
1060    // NOTE: claudeCodeDocCheck, deepCodeAnalysis, updateDependencies, and sageAutofix
1061    // have been removed as they are all included in unifiedAutofix.
1062    // Use `npm run autofix` to run all maintenance tasks together.
1063  
1064    unifiedAutofix: async () => {
1065      logger.info('Running unified auto-fix (all maintenance tasks)...');
1066  
1067      try {
1068        const { execSync } = await import('child_process');
1069        execSync('node scripts/unified-autofix.js', {
1070          encoding: 'utf8',
1071          stdio: 'inherit',
1072          cwd: process.cwd(),
1073        });
1074  
1075        // Get branch summary
1076        const summaryPath = join(projectRoot, 'scripts/autofix-branch.js');
1077        let commitCount = 0;
1078  
1079        try {
1080          // Import and call getAutofixSummary
1081          const { getAutofixSummary } = await import(summaryPath);
1082          const summary = getAutofixSummary();
1083          commitCount = summary.commitCount || 0;
1084        } catch {
1085          // Fallback if summary not available
1086          commitCount = 1;
1087        }
1088  
1089        return {
1090          summary: `Unified auto-fix completed - ${commitCount} commits in autofix branch`,
1091          details: {
1092            automated: true,
1093            branch: 'autofix',
1094            tasks_run: [
1095              'Prettier formatting',
1096              'ESLint auto-fix',
1097              'Security audit',
1098              'Dependency updates',
1099              'Sage AI fixes',
1100              'Documentation check',
1101              'Test generation',
1102            ],
1103            note: 'Review autofix branch and merge if satisfied. Run: git diff main autofix',
1104          },
1105          metrics: {
1106            success: 1,
1107            commits: commitCount,
1108          },
1109        };
1110      } catch (error) {
1111        logger.error(`Unified auto-fix failed: ${error.message}`);
1112        return {
1113          summary: 'Unified auto-fix failed',
1114          details: {
1115            error: error.message,
1116            note: 'Check logs for details. May need claude CLI in PATH or OPENROUTER_API_KEY in .env',
1117          },
1118          metrics: {
1119            success: 0,
1120            failed: 1,
1121          },
1122        };
1123      }
1124    },
1125  
1126    // ===== BACKUP: ops + telemetry (DR-096) =====
1127    // DEPRECATED: was a SQLite db.backup() call. Now a no-op stub kept so the cron runner
1128    // can still schedule/reference the key. Real backups use pg_dump on the NixOS host.
1129    backupOpsAndTelemetry: async () => {
1130      logger.warn('backupOpsAndTelemetry: skipped — use pg_dump for PostgreSQL backups');
1131      return {
1132        summary: 'Backup skipped: use pg_dump for PostgreSQL (ops + tel schemas)',
1133        details: { note: 'Configure pg_dump on the NixOS host. All schemas are in the mmo database.' },
1134        metrics: { success: 0, skipped: 1 },
1135      };
1136    },
1137  
1138    // ===== BACKUP: 2step.db (DR-096) =====
1139    // DEPRECATED: was a SQLite file copy. Now a no-op stub kept so the cron runner
1140    // can still schedule/reference the key. Real backups use pg_dump on the NixOS host.
1141    backup2StepDb: async () => {
1142      logger.warn('backup2StepDb: skipped — 2Step uses PostgreSQL; use pg_dump on the host');
1143      return {
1144        summary: 'Backup skipped: 2Step uses PostgreSQL',
1145        details: { note: 'Configure pg_dump for the 2Step schema on the NixOS host.' },
1146        metrics: { success: 0, skipped: 1 },
1147      };
1148    },
1149  
1150    // ===== BACKUP: AdManager SQLite DB =====
1151    backupAdManagerDb: async () => {
1152      const srcPath = join(projectRoot, '..', 'AdManager', 'db', 'admanager.db');
1153      const backupDir = '/run/media/jason/store/backups/pg/dumps'; // co-locate with pg_dump backups
1154      if (!existsSync(srcPath)) {
1155        logger.warn(`backupAdManagerDb: source not found: ${  srcPath}`);
1156        return { summary: 'Skipped: admanager.db not found', metrics: { success: 0 } };
1157      }
1158      try {
1159        mkdirSync(backupDir, { recursive: true });
1160        const tier = determineTier(backupDir, 'admanager', 'db.gz');
1161        const ts = new Date().toISOString().replace(/:/g, '-').slice(0, 16);
1162        const filename = `admanager-${tier}-${ts}.db.gz`;
1163        const dest = join(backupDir, filename);
1164  
1165        // Stream copy with gzip
1166        const src = createReadStream(srcPath);
1167        const gz = createGzip({ level: 6 });
1168        const out = createWriteStream(dest);
1169        await pipeline(src, gz, out);
1170  
1171        const { size } = statSync(dest);
1172        const sizeMB = (size / 1024 / 1024).toFixed(2);
1173        logger.info(`backupAdManagerDb: ${filename} (${sizeMB} MB)`);
1174  
1175        updateSyncSnapshot(backupDir, 'admanager', filename, 'db.gz');
1176        rotateBackups(backupDir, 'admanager', 'db.gz');
1177  
1178        return {
1179          summary: `AdManager backup ${tier}: ${filename} (${sizeMB} MB)`,
1180          details: { filename, sizeMB, tier },
1181          metrics: { success: 1, size_mb: parseFloat(sizeMB) },
1182        };
1183      } catch (e) {
1184        logger.error(`backupAdManagerDb: ${e.message}`);
1185        return { summary: `Backup failed: ${e.message}`, metrics: { success: 0 } };
1186      }
1187    },
1188  
1189    // ===== BACKUP: data/scores/ + data/contacts/ tarballs (DR-096) =====
1190    backupDataDirs: async () => {
1191      const backupDir = join(projectRoot, 'db', 'backup');
1192      if (!existsSync(backupDir)) {
1193        return { summary: 'Skipped: backup dir not accessible', metrics: { success: 0, skipped: 1 } };
1194      }
1195  
1196      const { promisify } = await import('util');
1197      const execFileAsync = promisify(execFile);
1198      const results = [];
1199  
1200      for (const [prefix, srcDir] of [
1201        ['scores', join(projectRoot, 'data', 'scores')],
1202        ['contacts', join(projectRoot, 'data', 'contacts')],
1203      ]) {
1204        if (!existsSync(srcDir)) {
1205          logger.warn(`${prefix} dir not found: ${srcDir}`);
1206          continue;
1207        }
1208  
1209        const tier = determineTier(backupDir, prefix, 'tar.gz');
1210        const tarFile = backupFileName(prefix, tier, 'tar.gz');
1211        const tarPath = join(backupDir, tarFile);
1212        const tmpPath = `${tarPath  }.tmp`;
1213  
1214        logger.info(`Archiving ${prefix}/ → ${tarFile} (tier: ${tier})...`);
1215        try {
1216          await execFileAsync('tar', ['czf', tmpPath, '-C', dirname(srcDir), prefix], { timeout: 15 * 60 * 1000 });
1217          renameSync(tmpPath, tarPath);
1218        } catch (err) {
1219          try { unlinkSync(tmpPath); } catch (_e) { /* cleanup */ }
1220          logger.error(`Failed to archive ${prefix}: ${err.message}`);
1221          continue;
1222        }
1223  
1224        const tarSize = statSync(tarPath).size;
1225        if (tarSize < 1024) {
1226          try { unlinkSync(tarPath); } catch (_e) { /* cleanup */ }
1227          logger.error(`${prefix} tarball suspiciously small (${tarSize} bytes), deleted`);
1228          continue;
1229        }
1230  
1231        updateSyncSnapshot(backupDir, prefix, tarFile, 'tar.gz');
1232        rotateBackups(backupDir, prefix, 'tar.gz');
1233  
1234        logger.success(`${prefix}: ${(tarSize / 1024 / 1024).toFixed(1)} MB (${tier})`);
1235        results.push({ prefix, tier, size_mb: tarSize / 1024 / 1024 });
1236      }
1237  
1238      return {
1239        summary: `Archived ${results.map(r => r.prefix).join(', ')}`,
1240        details: { archives: results },
1241        metrics: { success: 1, dirs_archived: results.length },
1242      };
1243    },
1244  };
1245  
1246  /**
1247   * Execute a command-based job
1248   */
1249  async function executeCommand(command, lockKey = null, timeoutSeconds = null) {
1250    try {
1251      // Set lock if needed
1252      if (lockKey) {
1253        const canRun = await checkAndClearStaleLock(lockKey, command);
1254        if (!canRun) {
1255          return {
1256            summary: 'Skipped - already running',
1257            details: { skipped: true, reason: 'Singleton check failed' },
1258            metrics: { skipped: 1 },
1259          };
1260        }
1261  
1262        await run(
1263          `INSERT INTO ops.cron_locks (lock_key, description, updated_at)
1264           VALUES ($1, $2, NOW())
1265           ON CONFLICT (lock_key) DO UPDATE SET
1266             description = EXCLUDED.description,
1267             updated_at  = EXCLUDED.updated_at`,
1268          [lockKey, `${command} running lock`]
1269        );
1270      }
1271  
1272      const { spawn } = await import('child_process');
1273      const startTime = Date.now();
1274      // Default timeout: 30 minutes. Prevents one hung cron command from blocking all others.
1275      const timeoutMs = (timeoutSeconds || 1800) * 1000;
1276  
1277      // Security: only allow commands with known-safe prefixes (P1 finding)
1278      const ALLOWED_COMMAND_PREFIXES = ['node scripts/', 'node src/', 'npm run'];
1279      if (!ALLOWED_COMMAND_PREFIXES.some(p => command.startsWith(p))) {
1280        throw new Error(`Blocked: command '${command}' not in allowlist (allowed: ${ALLOWED_COMMAND_PREFIXES.join(', ')})`);
1281      }
1282  
1283      return new Promise((resolve, reject) => {
1284        // Run all cron commands with lowest CPU/IO priority to prevent system slowdown
1285        const [cmd, ...args] = command.split(' ');
1286        const proc = spawn('nice', ['-n', '19', 'ionice', '-c', '3', cmd, ...args], {
1287          cwd: projectRoot,
1288          stdio: 'pipe', // Capture stdout/stderr
1289          env: process.env, // Pass environment to child process (needed for PATH)
1290        });
1291  
1292        // Kill the process if it exceeds the timeout
1293        const killTimer = setTimeout(() => {
1294          proc.kill('SIGTERM');
1295          setTimeout(() => proc.kill('SIGKILL'), 5000); // Force kill after 5s
1296          reject(new Error(`Command timed out after ${timeoutMs / 1000}s: ${command}`));
1297        }, timeoutMs);
1298  
1299        // Capture stdout and stderr
1300        let stdout = '';
1301        let stderr = '';
1302  
1303        proc.stdout.on('data', data => {
1304          const output = data.toString();
1305          stdout += output;
1306          process.stdout.write(output); // Also write to console for real-time viewing
1307        });
1308  
1309        proc.stderr.on('data', data => {
1310          const output = data.toString();
1311          stderr += output;
1312          process.stderr.write(output); // Also write to console for real-time viewing
1313        });
1314  
1315        // Use 'exit' not 'close': 'close' waits for all stdio streams to close, but
1316        // background processes spawned by the command (e.g. `nohup npm run dashboard &`)
1317        // can inherit the pipe FD and keep it open indefinitely, causing 'close' to never fire.
1318        // 'exit' fires as soon as the direct child process exits, regardless of stdio state.
1319        proc.on('exit', code => {
1320          // Destroy stdio streams to prevent the pipe from staying open
1321          proc.stdout.destroy();
1322          proc.stderr.destroy();
1323  
1324          clearTimeout(killTimer);
1325  
1326          // Release lock asynchronously — fire and forget; errors are logged but don't block resolution
1327          if (lockKey) {
1328            run('DELETE FROM ops.cron_locks WHERE lock_key = $1', [lockKey]).catch(error => {
1329              logger.warn(`Failed to release lock for ${lockKey}: ${error.message}`);
1330            });
1331          }
1332  
1333          const duration = ((Date.now() - startTime) / 1000).toFixed(2);
1334          const output = { stdout: stdout.trim(), stderr: stderr.trim() };
1335  
1336          if (code === 0) {
1337            resolve({
1338              summary: `Command completed in ${duration}s`,
1339              details: {
1340                duration_seconds: parseFloat(duration),
1341                exit_code: code,
1342                command,
1343                output,
1344              },
1345              metrics: {
1346                duration_seconds: parseFloat(duration),
1347                exit_code: 0,
1348              },
1349            });
1350          } else {
1351            // Include captured output in error
1352            const errorLines = stderr.split('\n').filter(line => line.trim());
1353            const errorSummary =
1354              errorLines.length > 0
1355                ? errorLines[errorLines.length - 1]
1356                : `Command exited with code ${code}`;
1357  
1358            const error = new Error(errorSummary);
1359            error.details = {
1360              exit_code: code,
1361              command,
1362              stdout: stdout.trim(),
1363              stderr: stderr.trim(),
1364            };
1365            reject(error);
1366          }
1367        });
1368      });
1369    } catch (error) {
1370      // Ensure lock is released on error
1371      if (lockKey) {
1372        run('DELETE FROM ops.cron_locks WHERE lock_key = $1', [lockKey]).catch(dbErr => {
1373          logger.warn(`Failed to release lock for ${lockKey} after error: ${dbErr.message}`);
1374        });
1375      }
1376      throw error;
1377    }
1378  }
1379  
1380  /**
1381   * Load jobs from database, sorted by interval (shortest first).
1382   * This ensures critical high-frequency jobs (processGuardian: 1min, monitorPipeline: 5min)
1383   * always run before low-frequency maintenance jobs (diskCleanup: daily, rotateLogs: 25h).
1384   * Without this ordering, a single slow daily job can consume the systemd TimeoutStartSec
1385   * (10min) window before the critical monitoring jobs ever get a chance to run.
1386   */
1387  async function loadJobs() {
1388    const jobs = await getAll('SELECT * FROM ops.cron_jobs WHERE enabled = true', []);
1389    // Sort by interval in minutes ascending (1-min jobs first, weekly jobs last)
1390    jobs.sort(
1391      (a, b) =>
1392        intervalToMinutes(a.interval_value, a.interval_unit) -
1393        intervalToMinutes(b.interval_value, b.interval_unit)
1394    );
1395    return jobs;
1396  }
1397  
1398  /**
1399   * Convert interval to minutes
1400   */
1401  function intervalToMinutes(value, unit) {
1402    switch (unit) {
1403      case 'minutes':
1404        return value;
1405      case 'hours':
1406        return value * 60;
1407      case 'days':
1408        return value * 24 * 60;
1409      case 'weeks':
1410        return value * 7 * 24 * 60;
1411      default:
1412        throw new Error(`Unknown interval unit: ${unit}`);
1413    }
1414  }
1415  
1416  /**
1417   * Check if job should run based on last_run_at and interval
1418   */
1419  function shouldRun(job) {
1420    if (!job.last_run_at) return true; // Never run before
1421  
1422    const now = new Date();
1423    // PostgreSQL returns timestamps as JS Date objects via the pg driver.
1424    // Ensure we handle both Date objects and ISO strings safely.
1425    const lastRun = job.last_run_at instanceof Date
1426      ? job.last_run_at
1427      : new Date(job.last_run_at.endsWith('Z') ? job.last_run_at : `${job.last_run_at}Z`);
1428    const minutesSinceLastRun = (now - lastRun) / 1000 / 60;
1429    const intervalMinutes = intervalToMinutes(job.interval_value, job.interval_unit);
1430  
1431    return minutesSinceLastRun >= intervalMinutes;
1432  }
1433  
1434  /**
1435   * Update job's last_run_at timestamp
1436   */
1437  async function updateLastRun(taskKey) {
1438    await run('UPDATE ops.cron_jobs SET last_run_at = NOW() WHERE task_key = $1', [taskKey]);
1439  }
1440  
1441  /**
1442   * Create summary from result
1443   */
1444  function generateSummary(result) {
1445    if (!result) return 'Task completed';
1446    if (result.summary) return result.summary;
1447    if (result.manual) return 'Manual task - reminder sent';
1448  
1449    const parts = [];
1450    if (result.processed !== undefined) parts.push(`Processed: ${result.processed}`);
1451    if (result.items_processed !== undefined) parts.push(`Processed: ${result.items_processed}`);
1452    if (result.sites !== undefined) parts.push(`Sites: ${result.sites}`);
1453    if (result.outreaches !== undefined) parts.push(`Outreaches: ${result.outreaches}`);
1454    if (result.success !== undefined) parts.push(`Success: ${result.success}`);
1455  
1456    return parts.length > 0 ? parts.join(', ') : 'Task completed successfully';
1457  }
1458  
1459  /**
1460   * Log task execution to database
1461   */
1462  async function logTaskStart(taskName) {
1463    const result = await run(
1464      `INSERT INTO ops.cron_job_logs (job_name, started_at, status, items_processed, items_failed)
1465       VALUES ($1, NOW(), 'running', 0, 0)`,
1466      [taskName]
1467    );
1468    return result.lastInsertRowid;
1469  }
1470  
1471  /**
1472   * Update task log on completion
1473   */
1474  async function logTaskComplete(logId, result) {
1475    const summary = generateSummary(result);
1476    const fullLog = JSON.stringify(result, null, 2);
1477  
1478    let itemsProcessed = 1;
1479    if (result?.metrics?.processed !== undefined) {
1480      itemsProcessed = result.metrics.processed;
1481    } else if (result?.items_processed !== undefined) {
1482      itemsProcessed = result.items_processed;
1483    } else if (result?.processed !== undefined) {
1484      itemsProcessed = result.processed;
1485    }
1486  
1487    await run(
1488      `UPDATE ops.cron_job_logs
1489       SET finished_at     = NOW(),
1490           status          = 'success',
1491           summary         = $1,
1492           full_log        = $2,
1493           items_processed = $3
1494       WHERE id = $4`,
1495      [summary, fullLog, itemsProcessed, logId]
1496    );
1497  }
1498  
1499  /**
1500   * Update task log on failure
1501   */
1502  async function logTaskFailed(logId, error) {
1503    // Extract error name/type for summary
1504    const errorName = error.name || 'Error';
1505    const summary = `${errorName}: ${error.message}`;
1506  
1507    // Build full error details with same structure as success entries
1508    const fullLog = JSON.stringify(
1509      {
1510        summary,
1511        error: error.message,
1512        details: error.details || {
1513          stack: error.stack,
1514        },
1515        metrics: {
1516          success: 0,
1517          failed: 1,
1518        },
1519      },
1520      null,
1521      2
1522    );
1523  
1524    await run(
1525      `UPDATE ops.cron_job_logs
1526       SET finished_at   = NOW(),
1527           status        = 'failed',
1528           summary       = $1,
1529           full_log      = $2,
1530           error_message = $3,
1531           items_failed  = 1
1532       WHERE id = $4`,
1533      [summary, fullLog, error.message, logId]
1534    );
1535  }
1536  
1537  /**
1538   * Run all eligible jobs from database
1539   */
1540  async function runCron() {
1541    logger.success('Cron started (database-driven v2)');
1542  
1543    const results = {
1544      ran: [],
1545      skipped: [],
1546      failed: [],
1547    };
1548  
1549    // Check circuit breaker - global kill switch for all cron jobs
1550    const circuitBreaker = await getOne(
1551      "SELECT value FROM ops.settings WHERE key = $1",
1552      ['cron_circuit_breaker_enabled']
1553    );
1554    if (circuitBreaker?.value === 'false') {
1555      logger.warn(
1556        'Circuit breaker is DISABLED - all cron jobs are disabled (systemd timer still runs, but jobs are skipped)'
1557      );
1558      return;
1559    }
1560  
1561    // Global singleton lock - prevent multiple cron instances from running simultaneously
1562    const GLOBAL_LOCK_KEY = 'cron_runner_global_lock';
1563    const canRun = await checkAndClearStaleLock(GLOBAL_LOCK_KEY, 'Cron Runner');
1564  
1565    if (!canRun) {
1566      logger.warn('Another cron instance is already running, exiting');
1567      return;
1568    }
1569  
1570    // Set global lock (uses cron_locks table)
1571    await run(
1572      `INSERT INTO ops.cron_locks (lock_key, description, updated_at)
1573       VALUES ($1, $2, NOW())
1574       ON CONFLICT (lock_key) DO UPDATE SET
1575         description = EXCLUDED.description,
1576         updated_at  = EXCLUDED.updated_at`,
1577      [GLOBAL_LOCK_KEY, 'Global cron runner lock (singleton)']
1578    );
1579  
1580    try {
1581      const jobs = await loadJobs();
1582  
1583      if (jobs.length === 0) {
1584        logger.warn('No enabled jobs found in database. Run npm run cron:migrate to seed jobs.');
1585        return;
1586      }
1587  
1588      for (const job of jobs) {
1589        // Check if job should run
1590        if (!shouldRun(job)) {
1591          const lastRun = job.last_run_at instanceof Date
1592            ? job.last_run_at
1593            : new Date(job.last_run_at);
1594          const nextRun = new Date(
1595            lastRun.getTime() +
1596              intervalToMinutes(job.interval_value, job.interval_unit) * 60 * 1000
1597          );
1598          logger.info(`Skipping ${job.name} (next run: ${nextRun.toLocaleString()})`);
1599          results.skipped.push(job.name);
1600          continue;
1601        }
1602  
1603        // Execute job
1604        let logId;
1605        try {
1606          logger.info(`Running: ${job.name}`);
1607          const start = Date.now();
1608  
1609          // Log task start
1610          logId = await logTaskStart(job.name);
1611  
1612          // Update last_run_at BEFORE executing the job.
1613          // This prevents systemd SIGKILL (at TimeoutStartSec=10min) from leaving last_run_at
1614          // un-updated, which would cause the job to re-run on every subsequent cron cycle.
1615          // Jobs that need to retry sooner can explicitly clear last_run_at in their handler.
1616          await updateLastRun(job.task_key);
1617  
1618          // Execute based on handler type
1619          let result;
1620          if (job.handler_type === 'function') {
1621            // Call handler function with timeout to prevent blocking the cron cycle.
1622            // Function-type jobs can hang just as command-type jobs do (e.g. slow DB queries,
1623            // hung LLM calls). Default: 8min (below systemd's 10min TimeoutStartSec).
1624            const handler = HANDLERS[job.task_key];
1625            if (!handler) {
1626              throw new Error(`Handler function not found: ${job.task_key}`);
1627            }
1628            const fnTimeoutMs = (job.timeout_seconds || 480) * 1000;
1629            result = await Promise.race([
1630              handler(),
1631              new Promise((_, reject) =>
1632                setTimeout(
1633                  () =>
1634                    reject(
1635                      new Error(
1636                        `Function handler timed out after ${fnTimeoutMs / 1000}s: ${job.task_key}`
1637                      )
1638                    ),
1639                  fnTimeoutMs
1640                )
1641              ),
1642            ]);
1643          } else if (job.handler_type === 'command') {
1644            // Execute command with lock for pipeline stages
1645            const isPipelineStage = ['scoring', 'rescoring', 'enrich', 'proposals'].some(stage =>
1646              job.handler_value.includes(stage)
1647            );
1648            const lockKey = isPipelineStage ? `cron_${job.task_key}_running` : null;
1649            // Use job's timeout_seconds if set; otherwise default to 8 min (below systemd's 10min limit)
1650            const cmdTimeout = job.timeout_seconds || 480;
1651            result = await executeCommand(job.handler_value, lockKey, cmdTimeout);
1652          } else {
1653            throw new Error(`Unknown handler type: ${job.handler_type}`);
1654          }
1655  
1656          const duration = ((Date.now() - start) / 1000).toFixed(2);
1657  
1658          // Log task completion
1659          await logTaskComplete(logId, result);
1660  
1661          // Update last_run_at (second write — confirms actual completion)
1662          await updateLastRun(job.task_key);
1663  
1664          logger.success(`${job.name} completed in ${duration}s`);
1665          results.ran.push({ name: job.name, duration, result });
1666        } catch (error) {
1667          logger.error(`${job.name} failed:`, error);
1668  
1669          // Log task failure
1670          if (logId) {
1671            await logTaskFailed(logId, error);
1672          }
1673  
1674          // Update last_run_at even on failure to prevent immediate retries
1675          await updateLastRun(job.task_key);
1676  
1677          results.failed.push({
1678            name: job.name,
1679            error: error.message,
1680            critical: Boolean(job.critical ?? 1),
1681          });
1682        }
1683      }
1684    } finally {
1685      // Release global lock
1686      try {
1687        await run('DELETE FROM ops.cron_locks WHERE lock_key = $1', [GLOBAL_LOCK_KEY]);
1688      } catch (error) {
1689        logger.warn(`Failed to release global cron lock: ${error.message}`);
1690      }
1691    }
1692  
1693    // Summary
1694    console.log('\n=== Cron Summary ===');
1695    console.log(`Ran: ${results.ran.length} tasks`);
1696    console.log(`Skipped: ${results.skipped.length} tasks`);
1697    console.log(`Failed: ${results.failed.length} tasks`);
1698  
1699    if (results.ran.length > 0) {
1700      console.log('\nCompleted:');
1701      results.ran.forEach(r => console.log(`  ${r.name} (${r.duration}s)`));
1702    }
1703  
1704    if (results.failed.length > 0) {
1705      console.log('\nFailed:');
1706      results.failed.forEach(f => console.log(`  ${f.name}: ${f.error}`));
1707    }
1708  
1709    console.log('====================\n');
1710  
1711    // Exit with error only if critical tasks failed
1712    const criticalFailures = results.failed.filter(f => f.critical);
1713    if (criticalFailures.length > 0) {
1714      console.log(`\n${criticalFailures.length} critical task(s) failed - exiting with error\n`);
1715      process.exit(1);
1716    } else if (results.failed.length > 0) {
1717      console.log(
1718        `\n${results.failed.length} non-critical task(s) failed - continuing normally\n`
1719      );
1720    }
1721  }
1722  
1723  // CLI functionality
1724  if (import.meta.url === `file://${process.argv[1]}`) {
1725    runCron()
1726      .then(() => {
1727        logger.success('Cron completed successfully');
1728        process.exit(0);
1729      })
1730      .catch(error => {
1731        logger.error('Cron failed:', error);
1732        process.exit(1);
1733      });
1734  }
1735  
1736  export default { runCron, HANDLERS };