cron.js
1 #!/usr/bin/env node 2 3 /** 4 * Database-Driven Cron Runner (v2) 5 * 6 * Reads job definitions from cron_jobs table instead of hardcoded TASKS object. 7 * Maintains backward compatibility with existing handler functions. 8 * 9 * Migration path: 10 * 1. Run migration: psql -d mmo < db/migrations/029-create-cron-jobs-table.sql 11 * 2. Seed database: npm run cron:migrate 12 * 3. Update systemd/crontab to use this file instead of src/cron.js 13 */ 14 15 /* eslint-disable require-await */ 16 17 import { existsSync, readdirSync, unlinkSync, renameSync, statSync, mkdirSync, symlinkSync, createReadStream, createWriteStream, writeFileSync } from 'fs'; 18 import { join, dirname } from 'path'; 19 import { fileURLToPath } from 'url'; 20 import { pipeline } from 'stream/promises'; 21 import { createGzip } from 'zlib'; 22 import { execFile } from 'child_process'; 23 import { syncEmailEvents } from './utils/sync-email-events.js'; 24 import { syncUnsubscribes } from './utils/sync-unsubscribes.js'; 25 import { pollInboundSMS } from './inbound/sms.js'; 26 import { pollInboundEmails } from './inbound/email.js'; 27 import { processAllReplies } from './inbound/processor.js'; 28 import { pollPurchases } from './cron/poll-purchases.js'; 29 import { pollFreeScans } from './cron/poll-free-scans.js'; 30 import { processPendingPurchases } from './cron/process-purchases.js'; 31 import { precomputeDashboard } from './cron/precompute-dashboard.js'; 32 import { runProcessGuardian } from './cron/process-guardian.js'; 33 import { runProcessReaper } from './cron/process-reaper.js'; 34 import { runPipelineStatusMonitor } from './cron/pipeline-status-monitor.js'; 35 import { classifyUnknownErrors } from './cron/classify-unknown-errors.js'; 36 import { runCleanupTestDbs } from './cron/cleanup-test-dbs.js'; 37 import { runAutoresponder } from './cron/autoresponder.js'; 38 import { sendScanEmailSequence } from './cron/send-scan-email-sequence.js'; 39 import { runCitationMonitor } from './cron/citation-monitor.js'; 40 import { rotateLogs } from './utils/log-rotator.js'; 41 import { createAgentTask } from './agents/utils/task-manager.js'; 42 import { getOne, getAll, run } from './utils/db.js'; 43 import Logger from './utils/logger.js'; 44 import './utils/load-env.js'; 45 46 const __filename = fileURLToPath(import.meta.url); 47 const __dirname = dirname(__filename); 48 const projectRoot = join(__dirname, '..'); 49 50 const logger = new Logger('Cron'); 51 52 /** 53 * Check and clear stale locks 54 * Uses cron_locks table (migrated from config table in migration 038) 55 */ 56 async function checkAndClearStaleLock(lockKey, stageName) { 57 const lock = await getOne('SELECT updated_at FROM ops.cron_locks WHERE lock_key = $1', [lockKey]); 58 59 if (lock) { 60 const lockAge = Date.now() - new Date(lock.updated_at).getTime(); 61 const STALE_THRESHOLD = 10 * 60 * 1000; // 10 minutes 62 63 if (lockAge < STALE_THRESHOLD) { 64 logger.warn(`${stageName} already running, skipping`); 65 return false; 66 } else { 67 const ageMinutes = (lockAge / 1000 / 60).toFixed(1); 68 logger.warn( 69 `Stale lock detected for ${stageName} (${ageMinutes} min old), clearing and proceeding` 70 ); 71 await run('DELETE FROM ops.cron_locks WHERE lock_key = $1', [lockKey]); 72 return true; 73 } 74 } 75 76 return true; // No lock, proceed 77 } 78 79 // ============================================================================= 80 // Backup helpers (DR-096: sync-safe DB backup via Syncthing) 81 // ============================================================================= 82 83 /** 84 * Atomically update a sync-snapshot symlink. 85 * Creates new symlink under a temp name then renames over old one — no window 86 * where the link is absent (avoids Syncthing propagating a deletion). 87 */ 88 function updateSyncSnapshot(backupDir, prefix, latestFile, ext = 'db.gz') { 89 const snapshotDir = join(backupDir, 'sync-snapshot'); 90 mkdirSync(snapshotDir, { recursive: true }); 91 const linkName = `${prefix}-latest.${ext}`; 92 const linkPath = join(snapshotDir, linkName); 93 const tmpLink = `${linkPath }.new`; 94 try { unlinkSync(tmpLink); } catch (_e) { /* cleanup */ } 95 symlinkSync(join('..', latestFile), tmpLink); 96 renameSync(tmpLink, linkPath); 97 } 98 99 /** 100 * Grandfathered retention: 4×4h + 4×daily + 4×weekly = 12 backups max. 101 * Files must be named <prefix>-<tier>-<timestamp>.<ext> e.g. sites-4h-2026-03-25T14-00.db.gz 102 * Rotation keeps the newest N files per tier, deletes the rest. 103 */ 104 function rotateBackups(backupDir, prefix, ext = 'db.gz') { 105 const tiers = { '4h': 4, daily: 4, weekly: 4 }; 106 let deletedCount = 0; 107 let deletedBytes = 0; 108 109 for (const [tier, keep] of Object.entries(tiers)) { 110 const pattern = new RegExp(`^${prefix}-${tier}-.*\\.${ext.replace(/\./g, '\\.')}$`); 111 const files = readdirSync(backupDir) 112 .filter(f => pattern.test(f)) 113 .sort() 114 .reverse(); // newest first (ISO timestamps sort lexicographically) 115 116 const toDelete = files.slice(keep); 117 for (const f of toDelete) { 118 const fp = join(backupDir, f); 119 try { 120 const {size} = statSync(fp); 121 unlinkSync(fp); 122 deletedCount++; 123 deletedBytes += size; 124 logger.info(`Deleted old backup: ${f}`); 125 } catch (e) { 126 logger.warn(`Failed to delete ${f}: ${e.message}`); 127 } 128 } 129 } 130 131 if (deletedCount > 0) { 132 logger.info(`Rotation: deleted ${deletedCount} old backups (freed ${(deletedBytes / 1024 / 1024).toFixed(1)} MB)`); 133 } 134 return { deletedCount, deletedBytes }; 135 } 136 137 /** 138 * Clean up orphaned WAL sidecar files on /store. 139 * These were left behind by better-sqlite3 backups (legacy — no longer generated). 140 */ 141 function cleanupWalOrphans(backupDir) { 142 for (const f of readdirSync(backupDir)) { 143 if (f.endsWith('.db-shm') || f.endsWith('.db-wal') || f.endsWith('.db-journal')) { 144 try { 145 unlinkSync(join(backupDir, f)); 146 logger.info(`Cleaned up WAL orphan: ${f}`); 147 } catch (_e) { /* cleanup */ } 148 } 149 } 150 } 151 152 /** 153 * Determine which backup tier to assign based on existing backups. 154 * Returns '4h' normally. Returns 'daily' if no daily exists for today. 155 * Returns 'weekly' if no weekly exists for this ISO week. 156 */ 157 function determineTier(backupDir, prefix, ext = 'db.gz') { 158 const files = readdirSync(backupDir); 159 const today = new Date().toISOString().split('T')[0]; 160 const isoWeek = getISOWeek(new Date()); 161 162 const dailyPattern = new RegExp(`^${prefix}-daily-${today}\\.${ext.replace(/\./g, '\\.')}$`); 163 const weeklyPattern = new RegExp(`^${prefix}-weekly-${isoWeek}\\.${ext.replace(/\./g, '\\.')}$`); 164 165 if (!files.some(f => weeklyPattern.test(f))) return 'weekly'; 166 if (!files.some(f => dailyPattern.test(f))) return 'daily'; 167 return '4h'; 168 } 169 170 function getISOWeek(date) { 171 const d = new Date(Date.UTC(date.getFullYear(), date.getMonth(), date.getDate())); 172 const dayNum = d.getUTCDay() || 7; 173 d.setUTCDate(d.getUTCDate() + 4 - dayNum); 174 const yearStart = new Date(Date.UTC(d.getUTCFullYear(), 0, 1)); 175 const weekNum = Math.ceil(((d - yearStart) / 86400000 + 1) / 7); 176 return `${d.getUTCFullYear()}-W${String(weekNum).padStart(2, '0')}`; 177 } 178 179 /** 180 * Format a backup filename: <prefix>-<tier>-<timestamp>.<ext> 181 */ 182 function backupFileName(prefix, tier, ext = 'db') { 183 const now = new Date(); 184 let stamp; 185 if (tier === '4h') { 186 stamp = now.toISOString().replace(/[:.]/g, '-').slice(0, 16); 187 } else if (tier === 'daily') { 188 stamp = now.toISOString().split('T')[0]; 189 } else { 190 stamp = getISOWeek(now); 191 } 192 return `${prefix}-${tier}-${stamp}.${ext}`; 193 } 194 195 /** 196 * Gzip a file atomically: read → gzip → write .tmp → rename to .gz → delete original. 197 * Temp file is in the same directory (same filesystem) so rename is atomic. 198 */ 199 async function gzipFile(srcPath) { 200 const gzPath = `${srcPath }.gz`; 201 const tmp = `${gzPath }.tmp`; 202 try { 203 await pipeline(createReadStream(srcPath), createGzip({ level: 6 }), createWriteStream(tmp)); 204 renameSync(tmp, gzPath); 205 } catch (err) { 206 try { unlinkSync(tmp); } catch (_e) { /* cleanup */ } // clean up .tmp on failure 207 throw err; 208 } 209 unlinkSync(srcPath); // delete uncompressed original 210 return gzPath; 211 } 212 213 /** 214 * Registry of handler functions 215 * Maps task_key to handler function 216 */ 217 const HANDLERS = { 218 // ===== REAL-TIME OPERATIONS ===== 219 syncEmailEvents: async () => { 220 const result = await syncEmailEvents(); 221 return { 222 summary: `Synced ${result?.synced || 0} email events`, 223 details: result || {}, 224 metrics: { 225 synced: result?.synced || 0, 226 errors: result?.errors || 0, 227 }, 228 }; 229 }, 230 231 syncUnsubscribes: async () => { 232 const result = await syncUnsubscribes(); 233 return { 234 summary: `Synced ${result?.synced || 0} unsubscribes`, 235 details: result || {}, 236 metrics: { 237 synced: result?.synced || 0, 238 errors: result?.errors || 0, 239 }, 240 }; 241 }, 242 243 pollInboundSMS: async () => { 244 const result = await pollInboundSMS(); 245 return { 246 summary: `Polled ${result?.processed || 0} inbound SMS messages`, 247 details: result || {}, 248 metrics: { 249 processed: result?.processed || 0, 250 new_messages: result?.new_messages || 0, 251 }, 252 }; 253 }, 254 255 pollInboundEmails: async () => { 256 const result = await pollInboundEmails(); 257 return { 258 summary: `Polled ${result?.processed || 0} inbound email messages, ${result?.stored || 0} stored`, 259 details: result || {}, 260 metrics: { 261 processed: result?.processed || 0, 262 stored: result?.stored || 0, 263 unmatched: result?.unmatched || 0, 264 }, 265 }; 266 }, 267 268 sendPendingReplies: async () => { 269 const result = await processAllReplies(); 270 const smsSent = result?.sms?.sent || 0; 271 const emailSent = result?.email?.sent || 0; 272 return { 273 summary: `Sent ${smsSent + emailSent} pending replies (SMS: ${smsSent}, Email: ${emailSent})`, 274 details: result || {}, 275 metrics: { sms_sent: smsSent, email_sent: emailSent }, 276 }; 277 }, 278 279 autoresponder: async () => { 280 const result = await runAutoresponder(); 281 return result; 282 }, 283 284 pollFreeScans: async () => { 285 const result = await pollFreeScans(); 286 return { 287 summary: `Polled ${result?.processed || 0} free scans, ${result?.inserted || 0} archived`, 288 details: result || {}, 289 metrics: { processed: result?.processed || 0, inserted: result?.inserted || 0 }, 290 }; 291 }, 292 293 pollPurchases: async () => { 294 const result = await pollPurchases(); 295 return { 296 summary: `Polled ${result?.processed || 0} purchases, ${result?.successful || 0} ingested`, 297 details: result || {}, 298 metrics: { processed: result?.processed || 0, successful: result?.successful || 0 }, 299 }; 300 }, 301 302 processPurchases: async () => { 303 const result = await processPendingPurchases(); 304 return { 305 summary: `Processed ${result?.processed || 0} purchases, ${result?.delivered || 0} delivered`, 306 details: result || {}, 307 metrics: { 308 processed: result?.processed || 0, 309 delivered: result?.delivered || 0, 310 failed: result?.failed || 0, 311 }, 312 }; 313 }, 314 315 sendScanEmailSequence: async () => { 316 const result = await sendScanEmailSequence(); 317 return { 318 summary: `Scan sequence: ${result?.sent || 0} sent, ${result?.skipped || 0} skipped, ${result?.failed || 0} failed`, 319 details: result || {}, 320 metrics: { 321 checked: result?.checked || 0, 322 sent: result?.sent || 0, 323 skipped: result?.skipped || 0, 324 failed: result?.failed || 0, 325 }, 326 }; 327 }, 328 329 // ===== AEO / CITATION MONITORING ===== 330 citationMonitor: async () => { 331 const result = await runCitationMonitor(); 332 return result; 333 }, 334 335 // ===== MONITORING ===== 336 checkKeywords: async () => { 337 const pending = await getOne('SELECT COUNT(*) as count FROM keywords WHERE status = $1', ['pending']); 338 const active = await getOne('SELECT COUNT(*) as count FROM keywords WHERE status = $1', ['active']); 339 const total = await getOne('SELECT COUNT(*) as count FROM keywords', []); 340 341 logger.info(`Found ${pending.count} pending keywords`); 342 343 return { 344 summary: `${pending.count} pending keywords in queue`, 345 details: { 346 pending_count: pending.count, 347 active_count: active.count, 348 total_count: total.count, 349 }, 350 metrics: { 351 pending: pending.count, 352 active: active.count, 353 total: total.count, 354 }, 355 }; 356 }, 357 358 precomputeDashboard: async () => { 359 const result = await precomputeDashboard(); 360 return result; 361 }, 362 363 // Tier 1: Process Guardian (direct function - works even if agents are broken) 364 processGuardian: async () => { 365 const result = await runProcessGuardian(); 366 return { 367 summary: `Process guardian: ${result.checks_run} checks, ${result.ok} ok, ${result.warnings} warnings, ${result.critical} critical`, 368 details: result, 369 metrics: { 370 checks_run: result.checks_run, 371 ok: result.ok, 372 warnings: result.warnings, 373 critical: result.critical, 374 duration_seconds: result.duration_seconds, 375 }, 376 }; 377 }, 378 379 // Process Reaper - kills stale agent processes, logs zombie count + memory pressure 380 processReaper: async () => { 381 const result = await runProcessReaper(); 382 return { 383 summary: `Process reaper: ${result.zombie_count} zombies, ${result.free_mem_mb}MB free, ${result.stale_processes_killed} stale killed`, 384 details: result, 385 metrics: { 386 zombie_count: result.zombie_count, 387 free_mem_mb: result.free_mem_mb, 388 swap_pct: result.swap_pct, 389 stale_processes_killed: result.stale_processes_killed, 390 duration_seconds: result.duration_seconds, 391 }, 392 }; 393 }, 394 395 // Cleanup leftover test DB files from tests/ and /tmp 396 cleanupTestDbs: () => { 397 const result = runCleanupTestDbs(); 398 return { 399 summary: `Cleaned up ${result.deleted} stale test DB(s), freed ${result.freed_kb} KB`, 400 details: result, 401 metrics: { 402 deleted: result.deleted, 403 freed_kb: result.freed_kb, 404 }, 405 }; 406 }, 407 408 // Pipeline Status Monitor (writes human-readable report to logs/pipeline-status.txt) 409 pipelineStatusMonitor: async () => { 410 const result = await runPipelineStatusMonitor(); 411 return { 412 summary: result.summary, 413 details: result, 414 metrics: { 415 checks_run: result.checks_run, 416 duration_seconds: result.duration_seconds, 417 actions_taken: result.actions.length, 418 }, 419 }; 420 }, 421 422 // Tier 2: Pipeline Monitor (creates monitor agent tasks for pipeline checks) 423 monitorPipeline: async () => { 424 const taskTypes = [ 425 { task_type: 'check_pipeline_health', priority: 7 }, 426 { task_type: 'scan_logs', priority: 5 }, 427 { task_type: 'check_process_compliance', priority: 5 }, 428 { task_type: 'check_loops', priority: 6 }, 429 { task_type: 'check_blocked_tasks', priority: 6 }, 430 ]; 431 432 let created = 0; 433 let skipped = 0; 434 435 for (const taskDef of taskTypes) { 436 try { 437 const taskId = await createAgentTask({ 438 task_type: taskDef.task_type, 439 assigned_to: 'monitor', 440 created_by: 'cron', 441 priority: taskDef.priority, 442 context: { source: 'cron_tier2', recurring: true }, 443 }); 444 445 if (taskId) created++; 446 else skipped++; 447 } catch { 448 // createAgentTask returns null for duplicates via findDuplicateTask 449 skipped++; 450 } 451 } 452 453 return { 454 summary: `Pipeline monitor: ${created} tasks created, ${skipped} skipped (duplicates)`, 455 details: { task_types: taskTypes.map(t => t.task_type), created, skipped }, 456 metrics: { created, skipped }, 457 }; 458 }, 459 460 // Tier 3: System Health (creates monitor agent tasks for heavier checks) 461 monitorSystem: async () => { 462 const taskTypes = [ 463 { task_type: 'check_agent_health', priority: 6 }, 464 { task_type: 'detect_anomaly', priority: 4 }, 465 { task_type: 'check_slo_compliance', priority: 7 }, 466 ]; 467 468 let created = 0; 469 let skipped = 0; 470 471 for (const taskDef of taskTypes) { 472 try { 473 const taskId = await createAgentTask({ 474 task_type: taskDef.task_type, 475 assigned_to: 'monitor', 476 created_by: 'cron', 477 priority: taskDef.priority, 478 context: { source: 'cron_tier3', recurring: true }, 479 }); 480 481 if (taskId) created++; 482 else skipped++; 483 } catch { 484 skipped++; 485 } 486 } 487 488 return { 489 summary: `System health: ${created} tasks created, ${skipped} skipped (duplicates)`, 490 details: { task_types: taskTypes.map(t => t.task_type), created, skipped }, 491 metrics: { created, skipped }, 492 }; 493 }, 494 495 // Classify unknown errors — Phase 2 LLM call delegated to claude-orchestrator.sh --type classify_errors 496 // This cron job now runs only Phase 1 (retry reclassification) and Phase 3 (auto-apply proposals). 497 classifyUnknownErrors: async () => { 498 const result = await classifyUnknownErrors(); 499 return { 500 summary: `Retry reclassified: ${result.sites_retried || 0} sites, ${result.outreaches_retried || 0} outreaches`, 501 details: result, 502 metrics: { 503 sites_retried: result.sites_retried || 0, 504 outreaches_retried: result.outreaches_retried || 0, 505 patterns_applied: result.patterns_applied || 0, 506 }, 507 }; 508 }, 509 510 // ===== MAINTENANCE ===== 511 databaseMaintenance: async () => { 512 logger.info('Running database maintenance...'); 513 514 const stats = await getOne('SELECT COUNT(*) as sites FROM sites', []); 515 const outreaches = await getOne("SELECT COUNT(*) as count FROM messages WHERE direction = 'outbound'", []); 516 const conversations = await getOne("SELECT COUNT(*) as count FROM messages WHERE direction = 'inbound'", []); 517 const keywords = await getOne('SELECT COUNT(*) as count FROM keywords', []); 518 519 // PostgreSQL does not have PRAGMA integrity_check. Report row counts only. 520 const healthy = true; 521 522 logger.success( 523 `Database healthy: ${stats.sites} sites, ${outreaches.count} outreaches` 524 ); 525 526 return { 527 summary: `Database healthy: ${stats.sites} sites, ${outreaches.count} outreaches`, 528 details: { 529 integrity_status: 'ok', 530 table_counts: { 531 sites: stats.sites, 532 outreaches: outreaches.count, 533 conversations: conversations.count, 534 keywords: keywords.count, 535 }, 536 operations: ['Row counts collected (PostgreSQL — no PRAGMA optimize/integrity_check)'], 537 }, 538 metrics: { 539 sites: stats.sites, 540 outreaches: outreaches.count, 541 conversations: conversations.count, 542 keywords: keywords.count, 543 healthy: healthy ? 1 : 0, 544 }, 545 }; 546 }, 547 548 vacuumDatabase: async () => { 549 // PostgreSQL runs autovacuum automatically; manual VACUUM FULL requires a superuser 550 // connection and exclusive lock — not appropriate for a cron job on a shared pool. 551 // This handler is retained for scheduler compatibility but performs no action. 552 logger.info('vacuumDatabase: skipped — PostgreSQL manages autovacuum automatically'); 553 return { 554 summary: 'Vacuum skipped: PostgreSQL autovacuum handles this automatically', 555 details: { note: 'Use pg_stat_user_tables to monitor bloat if needed.' }, 556 metrics: { saved_bytes: 0, saved_mb: 0, databases_vacuumed: 0 }, 557 }; 558 }, 559 560 walCheckpoint: async () => { 561 // Repurposed: clean up old WAL archive files (>7 days) from the store volume. 562 // PG's archive_command copies WAL to /run/media/jason/store/backups/pg/wal/. 563 // Without cleanup, this grows indefinitely (~16MB per file). 564 const walDir = '/run/media/jason/store/backups/pg/wal'; 565 const maxAgeDays = 7; 566 const cutoff = Date.now() - maxAgeDays * 86400000; 567 let deleted = 0; 568 let freedBytes = 0; 569 let errors = 0; 570 571 try { 572 if (!existsSync(walDir)) { 573 logger.warn(`walCheckpoint: WAL archive dir not found: ${walDir}`); 574 return { summary: 'WAL cleanup skipped: archive dir not found', details: { walDir }, metrics: { deleted: 0 } }; 575 } 576 577 const files = readdirSync(walDir).filter(f => /^[0-9A-F]{24}$/.test(f)); 578 for (const f of files) { 579 try { 580 const fp = join(walDir, f); 581 const { mtimeMs, size } = statSync(fp); 582 if (mtimeMs < cutoff) { 583 unlinkSync(fp); 584 deleted++; 585 freedBytes += size; 586 } 587 } catch (e) { 588 errors++; 589 if (errors <= 3) logger.warn(`walCheckpoint: failed to delete ${f}: ${e.message}`); 590 } 591 } 592 593 const totalFiles = files.length; 594 const freedMB = (freedBytes / 1024 / 1024).toFixed(1); 595 logger.info(`walCheckpoint: deleted ${deleted}/${totalFiles} WAL archives older than ${maxAgeDays}d (freed ${freedMB} MB)${errors ? `, ${errors} errors` : ''}`); 596 597 return { 598 summary: `Cleaned ${deleted} WAL archives (${freedMB} MB freed)`, 599 details: { walDir, totalFiles, deleted, errors, freedMB, maxAgeDays }, 600 metrics: { wal_archives_deleted: deleted, wal_freed_mb: parseFloat(freedMB), wal_total_files: totalFiles - deleted }, 601 }; 602 } catch (e) { 603 logger.error(`walCheckpoint: ${e.message}`); 604 return { summary: `WAL cleanup failed: ${e.message}`, details: {}, metrics: { deleted: 0 } }; 605 } 606 }, 607 608 backupDatabase: async () => { 609 // pg_dump the mmo database to the store volume with tiered retention. 610 const dumpDir = '/run/media/jason/store/backups/pg/dumps'; 611 const prefix = 'mmo'; 612 613 try { 614 mkdirSync(dumpDir, { recursive: true }); 615 } catch (e) { 616 logger.error(`backupDatabase: cannot create dump dir ${dumpDir}: ${e.message}`); 617 return { summary: `Backup failed: ${e.message}`, details: {}, metrics: { success: 0 } }; 618 } 619 620 const tier = determineTier(dumpDir, prefix, 'dump.gz'); 621 const filename = backupFileName(prefix, tier, 'dump.gz'); 622 const filepath = join(dumpDir, filename); 623 624 logger.info(`backupDatabase: starting pg_dump → ${filename} (${tier})`); 625 const startMs = Date.now(); 626 627 try { 628 // pg_dump → gzip → file 629 await new Promise((resolve, reject) => { 630 const dump = execFile('pg_dump', [ 631 '-h', '/run/postgresql', 632 '-d', 'mmo', 633 '--format=custom', 634 '--no-owner', 635 '--compress=6', 636 '-f', filepath, 637 ], { timeout: 300000 }, (err) => { 638 if (err) reject(err); 639 else resolve(); 640 }); 641 }); 642 643 const { size } = statSync(filepath); 644 const sizeMB = (size / 1024 / 1024).toFixed(1); 645 const durationSec = ((Date.now() - startMs) / 1000).toFixed(1); 646 logger.info(`backupDatabase: ${filename} — ${sizeMB} MB in ${durationSec}s`); 647 648 // Update sync-snapshot symlink 649 updateSyncSnapshot(dumpDir, prefix, filename, 'dump.gz'); 650 651 // Rotate old backups (keep 4 per tier) 652 const rotation = rotateBackups(dumpDir, prefix, 'dump.gz'); 653 654 // Clean up legacy SQLite WAL orphans in the old backup dir 655 const oldBackupDir = join(projectRoot, 'db', 'backup'); 656 if (existsSync(oldBackupDir)) { 657 cleanupWalOrphans(oldBackupDir); 658 } 659 660 return { 661 summary: `pg_dump ${tier}: ${filename} (${sizeMB} MB, ${durationSec}s)`, 662 details: { filename, sizeMB, durationSec, tier, rotation }, 663 metrics: { success: 1, size_mb: parseFloat(sizeMB), duration_sec: parseFloat(durationSec) }, 664 }; 665 } catch (e) { 666 logger.error(`backupDatabase: pg_dump failed: ${e.message}`); 667 // Clean up partial file 668 try { unlinkSync(filepath); } catch (_) { /* */ } 669 return { summary: `Backup failed: ${e.message}`, details: {}, metrics: { success: 0 } }; 670 } 671 }, 672 673 analyzePerformance: async () => { 674 logger.info('Analyzing database performance...'); 675 676 // Gather table row counts from PostgreSQL catalog 677 const tableRows = await getAll(` 678 SELECT relname AS table_name, 679 reltuples::bigint AS rows 680 FROM pg_class 681 JOIN pg_namespace ON pg_namespace.oid = pg_class.relnamespace 682 WHERE pg_namespace.nspname = 'm333' 683 AND pg_class.relkind = 'r' 684 ORDER BY reltuples DESC 685 `, []); 686 687 const tableStats = tableRows.map(r => ({ table_name: r.table_name, rows: Number(r.rows) })); 688 689 // Gather existing indexes from PostgreSQL catalog 690 const indexRows = await getAll(` 691 SELECT indexname AS name, 692 tablename AS tbl_name, 693 indexdef AS sql 694 FROM pg_indexes 695 WHERE schemaname = 'm333' 696 `, []); 697 698 const indexNames = new Set(indexRows.map(i => i.name)); 699 700 // Build set of indexed column combos per table for smarter detection 701 const indexedColumns = {}; 702 for (const idx of indexRows) { 703 if (!indexedColumns[idx.tbl_name]) indexedColumns[idx.tbl_name] = []; 704 const colMatch = idx.sql && idx.sql.match(/\(([^)]+)\)/); 705 if (colMatch) { 706 const cols = colMatch[1] 707 .split(',') 708 .map(c => c.trim().replace(/ (ASC|DESC|COLLATE \w+)/gi, '')); 709 indexedColumns[idx.tbl_name].push(cols); 710 } 711 } 712 713 const recommendations = []; 714 715 // Common query patterns to check via EXPLAIN 716 const commonQueries = [ 717 { 718 sql: "SELECT * FROM sites WHERE status = 'prog_scored' AND error_message IS NOT NULL LIMIT 1", 719 table: 'sites', 720 cols: ['status', 'error_message'], 721 desc: 'Retry failed scoring (status + error_message filter)', 722 }, 723 { 724 sql: "SELECT * FROM messages WHERE direction = 'inbound' AND read_at IS NULL LIMIT 1", 725 table: 'messages', 726 cols: ['direction', 'read_at'], 727 desc: 'Unread inbound messages', 728 }, 729 { 730 sql: "SELECT * FROM sites WHERE country_code = 'US' AND status = 'found' LIMIT 1", 731 table: 'sites', 732 cols: ['country_code', 'status'], 733 desc: 'Country-filtered pipeline queries', 734 }, 735 { 736 sql: "SELECT * FROM messages WHERE contact_method = 'email' AND direction = 'outbound' AND delivery_status = 'sent' LIMIT 1", 737 table: 'messages', 738 cols: ['contact_method', 'direction', 'delivery_status'], 739 desc: 'Channel-specific outreach queries', 740 }, 741 { 742 sql: "SELECT * FROM sites WHERE recapture_at IS NOT NULL AND recapture_at <= NOW() LIMIT 1", 743 table: 'sites', 744 cols: ['recapture_at'], 745 desc: 'Sites scheduled for recapture', 746 }, 747 { 748 sql: "SELECT * FROM messages WHERE direction = 'inbound' AND read_at IS NULL ORDER BY created_at DESC LIMIT 1", 749 table: 'messages', 750 cols: ['direction', 'read_at', 'created_at'], 751 desc: 'Recent unread inbound messages', 752 }, 753 { 754 sql: "SELECT * FROM tel.llm_usage WHERE stage = 'scoring' AND created_at >= NOW() - INTERVAL '7 days' LIMIT 1", 755 table: 'llm_usage', 756 cols: ['stage', 'created_at'], 757 desc: 'Recent LLM usage by stage', 758 }, 759 { 760 sql: "SELECT * FROM ops.cron_job_logs WHERE job_name = 'test' AND status = 'failed' LIMIT 1", 761 table: 'cron_job_logs', 762 cols: ['job_name', 'status'], 763 desc: 'Failed cron job lookup', 764 }, 765 ]; 766 767 // Run EXPLAIN to detect sequential scans 768 // SAFETY: sql comes from the hardcoded commonQueries array above — never user input 769 for (const { sql, table, cols, desc } of commonQueries) { 770 try { 771 const planRows = await getAll(`EXPLAIN ${sql}`, []); 772 const planText = planRows.map(r => Object.values(r)[0]).join('\n'); 773 const hasSeqScan = planText.includes('Seq Scan'); 774 775 if (hasSeqScan) { 776 // Check if a useful composite index already covers these columns 777 const tableIndexes = indexedColumns[table] || []; 778 const alreadyCovered = tableIndexes.some(idxCols => 779 cols.every((c, i) => idxCols[i] && idxCols[i].toLowerCase() === c.toLowerCase()) 780 ); 781 782 if (!alreadyCovered) { 783 const idxName = `idx_${table}_${cols.join('_')}`; 784 const idxSql = `CREATE INDEX IF NOT EXISTS ${idxName} ON ${table}(${cols.join(', ')});`; 785 recommendations.push({ 786 file: `${table}:${cols.join(',')}`, 787 priority: 'medium', 788 reason: 789 `**Sequential scan detected** on \`${table}\` for: ${desc}\n\n` + 790 `**Suggested fix:**\n\`\`\`sql\n${idxSql}\n\`\`\``, 791 }); 792 } 793 } 794 } catch { 795 // Query may reference missing tables; skip 796 } 797 } 798 799 // Check for large tables with sparse indexing 800 for (const stat of tableStats) { 801 if (stat.rows > 5000) { 802 const count = (indexedColumns[stat.table_name] || []).length; 803 if (count < 2) { 804 recommendations.push({ 805 file: `${stat.table_name}:sparse_indexes`, 806 priority: 'medium', 807 reason: 808 `**Sparse indexing** on large table \`${stat.table_name}\` ` + 809 `(${stat.rows.toLocaleString()} rows, only ${count} user-defined indexes).\n\n` + 810 `Review query patterns and add indexes on frequently filtered/sorted columns.`, 811 }); 812 } 813 } 814 } 815 816 // Store new recommendations in human_review_queue (deduplicated) 817 let newCount = 0; 818 for (const rec of recommendations) { 819 const existing = await getOne( 820 "SELECT COUNT(*) as count FROM human_review_queue WHERE type = 'performance' AND file = $1 AND status = 'pending'", 821 [rec.file] 822 ); 823 if (Number(existing.count) === 0) { 824 await run( 825 "INSERT INTO human_review_queue (file, reason, type, priority) VALUES ($1, $2, 'performance', $3)", 826 [rec.file, rec.reason, rec.priority] 827 ); 828 newCount++; 829 } 830 } 831 832 const summaryMsg = 833 `Analyzed ${tableStats.length} tables, ${indexRows.length} indexes. ` + 834 `Found ${recommendations.length} recommendations (${newCount} new).`; 835 logger.success(summaryMsg); 836 837 return { 838 summary: summaryMsg, 839 details: { 840 table_stats: tableStats, 841 index_count: indexRows.length, 842 recommendations, 843 new_recommendations: newCount, 844 }, 845 metrics: { 846 tables_analyzed: tableStats.length, 847 index_count: indexRows.length, 848 recommendations_found: recommendations.length, 849 new_recommendations: newCount, 850 }, 851 }; 852 }, 853 854 rotateLogs: async () => { 855 const result = rotateLogs({ logDir: join(projectRoot, 'logs'), retentionDays: 3 }); 856 const summary = `Log rotation: deleted ${result.deleted} files (${(result.freedSpace / (1024 * 1024)).toFixed(2)} MB freed), kept ${result.kept}`; 857 logger.success(summary); 858 return { 859 summary, 860 details: result, 861 metrics: { 862 deleted: result.deleted, 863 kept: result.kept, 864 freed_mb: result.freedSpace / (1024 * 1024), 865 }, 866 }; 867 }, 868 869 checkRateLimits: async () => { 870 const checks = { 871 zenrows: process.env.ZENROWS_API_KEY ? 'configured' : 'missing', 872 openrouter: process.env.OPENROUTER_API_KEY ? 'configured' : 'missing', 873 resend: process.env.RESEND_API_KEY ? 'configured' : 'missing', 874 twilio: process.env.TWILIO_ACCOUNT_SID ? 'configured' : 'missing', 875 }; 876 877 const configuredCount = Object.values(checks).filter(v => v === 'configured').length; 878 const missingCount = Object.values(checks).filter(v => v === 'missing').length; 879 880 logger.info('Rate limit health check:', checks); 881 882 return { 883 summary: `API keys: ${configuredCount} configured, ${missingCount} missing`, 884 details: { 885 api_status: checks, 886 configured_count: configuredCount, 887 missing_count: missingCount, 888 note: 'Full rate limit checks require API calls - not yet implemented', 889 }, 890 metrics: { 891 configured: configuredCount, 892 missing: missingCount, 893 }, 894 }; 895 }, 896 897 purgeSiteStatusHistory: async () => { 898 // Keep only the last 5 status entries per active site. 899 // For ignored sites, keep only 1 (they're terminal — history is irrelevant). 900 // This prevents site_status from growing unboundedly (observed: 2.3M rows = ~400MB). 901 const beforeRow = await getOne('SELECT COUNT(*) as n FROM site_status', []); 902 const beforeCount = Number(beforeRow.n); 903 904 await run(` 905 DELETE FROM site_status 906 WHERE id NOT IN ( 907 SELECT id FROM site_status s2 908 WHERE s2.site_id = site_status.site_id 909 ORDER BY s2.id DESC 910 LIMIT 5 911 ) 912 `, []); 913 914 // For ignored sites, keep only 1 entry (the most recent) 915 await run(` 916 DELETE FROM site_status 917 WHERE id NOT IN ( 918 SELECT MAX(ss.id) FROM site_status ss 919 JOIN sites s ON ss.site_id = s.id 920 WHERE s.status = 'ignored' 921 GROUP BY ss.site_id 922 ) 923 AND site_id IN (SELECT id FROM sites WHERE status = 'ignored') 924 `, []); 925 926 const afterRow = await getOne('SELECT COUNT(*) as n FROM site_status', []); 927 const afterCount = Number(afterRow.n); 928 const deleted = beforeCount - afterCount; 929 const summary = `Purged ${deleted.toLocaleString()} old site_status rows (${beforeCount.toLocaleString()} → ${afterCount.toLocaleString()})`; 930 logger.success(summary); 931 return { summary, metrics: { rows_deleted: deleted, rows_remaining: afterCount } }; 932 }, 933 934 diskCleanup: async () => { 935 const { rmSync, readdirSync, statSync, existsSync } = await import('fs'); 936 const { join: pathJoin } = await import('path'); 937 938 const results = []; 939 let totalFreedBytes = 0; 940 941 const removeDir = dir => { 942 const full = pathJoin(projectRoot, dir); 943 if (existsSync(full)) { 944 const before = statSync(full).size || 0; 945 rmSync(full, { recursive: true, force: true }); 946 results.push({ path: dir, action: 'removed_dir' }); 947 // Approximate: use 0 since rmSync doesn't return size; caller tracks separately 948 return before; 949 } 950 return 0; 951 }; 952 953 // 1. Coverage & test artifact directories 954 for (const dir of ['coverage', '.coverage-backup-testrunner']) { 955 removeDir(dir); 956 } 957 958 // Any .nyc_output* dirs at project root 959 const rootEntries = readdirSync(projectRoot); 960 for (const entry of rootEntries) { 961 if (entry.startsWith('.nyc_output')) { 962 removeDir(entry); 963 } 964 } 965 966 // 2. Test DBs in db/ (keep test-e2e.db) 967 const dbDir = pathJoin(projectRoot, 'db'); 968 const testDbs = readdirSync(dbDir).filter( 969 f => f.startsWith('test-') && f.endsWith('.db') && f !== 'test-e2e.db' 970 ); 971 for (const f of testDbs) { 972 const full = pathJoin(dbDir, f); 973 const { size } = statSync(full); 974 rmSync(full, { force: true }); 975 totalFreedBytes += size; 976 results.push({ 977 path: `db/${f}`, 978 action: 'removed_file', 979 size_mb: (size / 1024 / 1024).toFixed(2), 980 }); 981 } 982 983 // 3. Old files in backups/ (root-level, not db/backup which vacuumDatabase manages) 984 const backupsDir = pathJoin(projectRoot, 'backups'); 985 if (existsSync(backupsDir)) { 986 const cutoff = Date.now() - 3 * 24 * 60 * 60 * 1000; 987 const walk = dir => { 988 for (const entry of readdirSync(dir, { withFileTypes: true })) { 989 const full = pathJoin(dir, entry.name); 990 if (entry.isDirectory()) { 991 walk(full); 992 } else { 993 const { mtimeMs, size } = statSync(full); 994 if (mtimeMs < cutoff) { 995 rmSync(full, { force: true }); 996 totalFreedBytes += size; 997 results.push({ 998 path: `backups/${entry.name}`, 999 action: 'removed_file', 1000 size_mb: (size / 1024 / 1024).toFixed(2), 1001 }); 1002 } 1003 } 1004 } 1005 }; 1006 walk(backupsDir); 1007 } 1008 1009 const freedMB = (totalFreedBytes / 1024 / 1024).toFixed(2); 1010 const summary = `Disk cleanup: ${results.length} actions, ${freedMB} MB freed`; 1011 logger.success(summary); 1012 return { 1013 summary, 1014 details: results, 1015 metrics: { actions: results.length, freed_mb: parseFloat(freedMB) }, 1016 }; 1017 }, 1018 1019 technicalDebtReview: async () => { 1020 const { readFileSync } = await import('fs'); 1021 const { join } = await import('path'); 1022 1023 try { 1024 const todoPath = join(projectRoot, 'docs/TODO.md'); 1025 const todoContent = readFileSync(todoPath, 'utf-8'); 1026 const incompleteTasks = (todoContent.match(/- \[ \]/g) || []).length; 1027 const completedTasks = (todoContent.match(/- \[x\]/gi) || []).length; 1028 const totalTasks = incompleteTasks + completedTasks; 1029 const completionRate = totalTasks > 0 ? ((completedTasks / totalTasks) * 100).toFixed(1) : 0; 1030 1031 logger.info( 1032 `Technical debt: ${incompleteTasks} incomplete tasks, ${completedTasks} completed` 1033 ); 1034 1035 return { 1036 summary: `TODO.md: ${incompleteTasks} incomplete, ${completedTasks} completed (${completionRate}% done)`, 1037 details: { 1038 incomplete_tasks: incompleteTasks, 1039 completed_tasks: completedTasks, 1040 total_tasks: totalTasks, 1041 completion_rate: parseFloat(completionRate), 1042 file_path: todoPath, 1043 }, 1044 metrics: { 1045 incomplete: incompleteTasks, 1046 completed: completedTasks, 1047 completion_rate: parseFloat(completionRate), 1048 }, 1049 }; 1050 } catch (error) { 1051 logger.warn('Could not read TODO.md'); 1052 return { 1053 summary: 'Could not read TODO.md', 1054 details: { error: error.message }, 1055 metrics: { success: 0 }, 1056 }; 1057 } 1058 }, 1059 1060 // NOTE: claudeCodeDocCheck, deepCodeAnalysis, updateDependencies, and sageAutofix 1061 // have been removed as they are all included in unifiedAutofix. 1062 // Use `npm run autofix` to run all maintenance tasks together. 1063 1064 unifiedAutofix: async () => { 1065 logger.info('Running unified auto-fix (all maintenance tasks)...'); 1066 1067 try { 1068 const { execSync } = await import('child_process'); 1069 execSync('node scripts/unified-autofix.js', { 1070 encoding: 'utf8', 1071 stdio: 'inherit', 1072 cwd: process.cwd(), 1073 }); 1074 1075 // Get branch summary 1076 const summaryPath = join(projectRoot, 'scripts/autofix-branch.js'); 1077 let commitCount = 0; 1078 1079 try { 1080 // Import and call getAutofixSummary 1081 const { getAutofixSummary } = await import(summaryPath); 1082 const summary = getAutofixSummary(); 1083 commitCount = summary.commitCount || 0; 1084 } catch { 1085 // Fallback if summary not available 1086 commitCount = 1; 1087 } 1088 1089 return { 1090 summary: `Unified auto-fix completed - ${commitCount} commits in autofix branch`, 1091 details: { 1092 automated: true, 1093 branch: 'autofix', 1094 tasks_run: [ 1095 'Prettier formatting', 1096 'ESLint auto-fix', 1097 'Security audit', 1098 'Dependency updates', 1099 'Sage AI fixes', 1100 'Documentation check', 1101 'Test generation', 1102 ], 1103 note: 'Review autofix branch and merge if satisfied. Run: git diff main autofix', 1104 }, 1105 metrics: { 1106 success: 1, 1107 commits: commitCount, 1108 }, 1109 }; 1110 } catch (error) { 1111 logger.error(`Unified auto-fix failed: ${error.message}`); 1112 return { 1113 summary: 'Unified auto-fix failed', 1114 details: { 1115 error: error.message, 1116 note: 'Check logs for details. May need claude CLI in PATH or OPENROUTER_API_KEY in .env', 1117 }, 1118 metrics: { 1119 success: 0, 1120 failed: 1, 1121 }, 1122 }; 1123 } 1124 }, 1125 1126 // ===== BACKUP: ops + telemetry (DR-096) ===== 1127 // DEPRECATED: was a SQLite db.backup() call. Now a no-op stub kept so the cron runner 1128 // can still schedule/reference the key. Real backups use pg_dump on the NixOS host. 1129 backupOpsAndTelemetry: async () => { 1130 logger.warn('backupOpsAndTelemetry: skipped — use pg_dump for PostgreSQL backups'); 1131 return { 1132 summary: 'Backup skipped: use pg_dump for PostgreSQL (ops + tel schemas)', 1133 details: { note: 'Configure pg_dump on the NixOS host. All schemas are in the mmo database.' }, 1134 metrics: { success: 0, skipped: 1 }, 1135 }; 1136 }, 1137 1138 // ===== BACKUP: 2step.db (DR-096) ===== 1139 // DEPRECATED: was a SQLite file copy. Now a no-op stub kept so the cron runner 1140 // can still schedule/reference the key. Real backups use pg_dump on the NixOS host. 1141 backup2StepDb: async () => { 1142 logger.warn('backup2StepDb: skipped — 2Step uses PostgreSQL; use pg_dump on the host'); 1143 return { 1144 summary: 'Backup skipped: 2Step uses PostgreSQL', 1145 details: { note: 'Configure pg_dump for the 2Step schema on the NixOS host.' }, 1146 metrics: { success: 0, skipped: 1 }, 1147 }; 1148 }, 1149 1150 // ===== BACKUP: AdManager SQLite DB ===== 1151 backupAdManagerDb: async () => { 1152 const srcPath = join(projectRoot, '..', 'AdManager', 'db', 'admanager.db'); 1153 const backupDir = '/run/media/jason/store/backups/pg/dumps'; // co-locate with pg_dump backups 1154 if (!existsSync(srcPath)) { 1155 logger.warn(`backupAdManagerDb: source not found: ${ srcPath}`); 1156 return { summary: 'Skipped: admanager.db not found', metrics: { success: 0 } }; 1157 } 1158 try { 1159 mkdirSync(backupDir, { recursive: true }); 1160 const tier = determineTier(backupDir, 'admanager', 'db.gz'); 1161 const ts = new Date().toISOString().replace(/:/g, '-').slice(0, 16); 1162 const filename = `admanager-${tier}-${ts}.db.gz`; 1163 const dest = join(backupDir, filename); 1164 1165 // Stream copy with gzip 1166 const src = createReadStream(srcPath); 1167 const gz = createGzip({ level: 6 }); 1168 const out = createWriteStream(dest); 1169 await pipeline(src, gz, out); 1170 1171 const { size } = statSync(dest); 1172 const sizeMB = (size / 1024 / 1024).toFixed(2); 1173 logger.info(`backupAdManagerDb: ${filename} (${sizeMB} MB)`); 1174 1175 updateSyncSnapshot(backupDir, 'admanager', filename, 'db.gz'); 1176 rotateBackups(backupDir, 'admanager', 'db.gz'); 1177 1178 return { 1179 summary: `AdManager backup ${tier}: ${filename} (${sizeMB} MB)`, 1180 details: { filename, sizeMB, tier }, 1181 metrics: { success: 1, size_mb: parseFloat(sizeMB) }, 1182 }; 1183 } catch (e) { 1184 logger.error(`backupAdManagerDb: ${e.message}`); 1185 return { summary: `Backup failed: ${e.message}`, metrics: { success: 0 } }; 1186 } 1187 }, 1188 1189 // ===== BACKUP: data/scores/ + data/contacts/ tarballs (DR-096) ===== 1190 backupDataDirs: async () => { 1191 const backupDir = join(projectRoot, 'db', 'backup'); 1192 if (!existsSync(backupDir)) { 1193 return { summary: 'Skipped: backup dir not accessible', metrics: { success: 0, skipped: 1 } }; 1194 } 1195 1196 const { promisify } = await import('util'); 1197 const execFileAsync = promisify(execFile); 1198 const results = []; 1199 1200 for (const [prefix, srcDir] of [ 1201 ['scores', join(projectRoot, 'data', 'scores')], 1202 ['contacts', join(projectRoot, 'data', 'contacts')], 1203 ]) { 1204 if (!existsSync(srcDir)) { 1205 logger.warn(`${prefix} dir not found: ${srcDir}`); 1206 continue; 1207 } 1208 1209 const tier = determineTier(backupDir, prefix, 'tar.gz'); 1210 const tarFile = backupFileName(prefix, tier, 'tar.gz'); 1211 const tarPath = join(backupDir, tarFile); 1212 const tmpPath = `${tarPath }.tmp`; 1213 1214 logger.info(`Archiving ${prefix}/ → ${tarFile} (tier: ${tier})...`); 1215 try { 1216 await execFileAsync('tar', ['czf', tmpPath, '-C', dirname(srcDir), prefix], { timeout: 15 * 60 * 1000 }); 1217 renameSync(tmpPath, tarPath); 1218 } catch (err) { 1219 try { unlinkSync(tmpPath); } catch (_e) { /* cleanup */ } 1220 logger.error(`Failed to archive ${prefix}: ${err.message}`); 1221 continue; 1222 } 1223 1224 const tarSize = statSync(tarPath).size; 1225 if (tarSize < 1024) { 1226 try { unlinkSync(tarPath); } catch (_e) { /* cleanup */ } 1227 logger.error(`${prefix} tarball suspiciously small (${tarSize} bytes), deleted`); 1228 continue; 1229 } 1230 1231 updateSyncSnapshot(backupDir, prefix, tarFile, 'tar.gz'); 1232 rotateBackups(backupDir, prefix, 'tar.gz'); 1233 1234 logger.success(`${prefix}: ${(tarSize / 1024 / 1024).toFixed(1)} MB (${tier})`); 1235 results.push({ prefix, tier, size_mb: tarSize / 1024 / 1024 }); 1236 } 1237 1238 return { 1239 summary: `Archived ${results.map(r => r.prefix).join(', ')}`, 1240 details: { archives: results }, 1241 metrics: { success: 1, dirs_archived: results.length }, 1242 }; 1243 }, 1244 }; 1245 1246 /** 1247 * Execute a command-based job 1248 */ 1249 async function executeCommand(command, lockKey = null, timeoutSeconds = null) { 1250 try { 1251 // Set lock if needed 1252 if (lockKey) { 1253 const canRun = await checkAndClearStaleLock(lockKey, command); 1254 if (!canRun) { 1255 return { 1256 summary: 'Skipped - already running', 1257 details: { skipped: true, reason: 'Singleton check failed' }, 1258 metrics: { skipped: 1 }, 1259 }; 1260 } 1261 1262 await run( 1263 `INSERT INTO ops.cron_locks (lock_key, description, updated_at) 1264 VALUES ($1, $2, NOW()) 1265 ON CONFLICT (lock_key) DO UPDATE SET 1266 description = EXCLUDED.description, 1267 updated_at = EXCLUDED.updated_at`, 1268 [lockKey, `${command} running lock`] 1269 ); 1270 } 1271 1272 const { spawn } = await import('child_process'); 1273 const startTime = Date.now(); 1274 // Default timeout: 30 minutes. Prevents one hung cron command from blocking all others. 1275 const timeoutMs = (timeoutSeconds || 1800) * 1000; 1276 1277 // Security: only allow commands with known-safe prefixes (P1 finding) 1278 const ALLOWED_COMMAND_PREFIXES = ['node scripts/', 'node src/', 'npm run']; 1279 if (!ALLOWED_COMMAND_PREFIXES.some(p => command.startsWith(p))) { 1280 throw new Error(`Blocked: command '${command}' not in allowlist (allowed: ${ALLOWED_COMMAND_PREFIXES.join(', ')})`); 1281 } 1282 1283 return new Promise((resolve, reject) => { 1284 // Run all cron commands with lowest CPU/IO priority to prevent system slowdown 1285 const [cmd, ...args] = command.split(' '); 1286 const proc = spawn('nice', ['-n', '19', 'ionice', '-c', '3', cmd, ...args], { 1287 cwd: projectRoot, 1288 stdio: 'pipe', // Capture stdout/stderr 1289 env: process.env, // Pass environment to child process (needed for PATH) 1290 }); 1291 1292 // Kill the process if it exceeds the timeout 1293 const killTimer = setTimeout(() => { 1294 proc.kill('SIGTERM'); 1295 setTimeout(() => proc.kill('SIGKILL'), 5000); // Force kill after 5s 1296 reject(new Error(`Command timed out after ${timeoutMs / 1000}s: ${command}`)); 1297 }, timeoutMs); 1298 1299 // Capture stdout and stderr 1300 let stdout = ''; 1301 let stderr = ''; 1302 1303 proc.stdout.on('data', data => { 1304 const output = data.toString(); 1305 stdout += output; 1306 process.stdout.write(output); // Also write to console for real-time viewing 1307 }); 1308 1309 proc.stderr.on('data', data => { 1310 const output = data.toString(); 1311 stderr += output; 1312 process.stderr.write(output); // Also write to console for real-time viewing 1313 }); 1314 1315 // Use 'exit' not 'close': 'close' waits for all stdio streams to close, but 1316 // background processes spawned by the command (e.g. `nohup npm run dashboard &`) 1317 // can inherit the pipe FD and keep it open indefinitely, causing 'close' to never fire. 1318 // 'exit' fires as soon as the direct child process exits, regardless of stdio state. 1319 proc.on('exit', code => { 1320 // Destroy stdio streams to prevent the pipe from staying open 1321 proc.stdout.destroy(); 1322 proc.stderr.destroy(); 1323 1324 clearTimeout(killTimer); 1325 1326 // Release lock asynchronously — fire and forget; errors are logged but don't block resolution 1327 if (lockKey) { 1328 run('DELETE FROM ops.cron_locks WHERE lock_key = $1', [lockKey]).catch(error => { 1329 logger.warn(`Failed to release lock for ${lockKey}: ${error.message}`); 1330 }); 1331 } 1332 1333 const duration = ((Date.now() - startTime) / 1000).toFixed(2); 1334 const output = { stdout: stdout.trim(), stderr: stderr.trim() }; 1335 1336 if (code === 0) { 1337 resolve({ 1338 summary: `Command completed in ${duration}s`, 1339 details: { 1340 duration_seconds: parseFloat(duration), 1341 exit_code: code, 1342 command, 1343 output, 1344 }, 1345 metrics: { 1346 duration_seconds: parseFloat(duration), 1347 exit_code: 0, 1348 }, 1349 }); 1350 } else { 1351 // Include captured output in error 1352 const errorLines = stderr.split('\n').filter(line => line.trim()); 1353 const errorSummary = 1354 errorLines.length > 0 1355 ? errorLines[errorLines.length - 1] 1356 : `Command exited with code ${code}`; 1357 1358 const error = new Error(errorSummary); 1359 error.details = { 1360 exit_code: code, 1361 command, 1362 stdout: stdout.trim(), 1363 stderr: stderr.trim(), 1364 }; 1365 reject(error); 1366 } 1367 }); 1368 }); 1369 } catch (error) { 1370 // Ensure lock is released on error 1371 if (lockKey) { 1372 run('DELETE FROM ops.cron_locks WHERE lock_key = $1', [lockKey]).catch(dbErr => { 1373 logger.warn(`Failed to release lock for ${lockKey} after error: ${dbErr.message}`); 1374 }); 1375 } 1376 throw error; 1377 } 1378 } 1379 1380 /** 1381 * Load jobs from database, sorted by interval (shortest first). 1382 * This ensures critical high-frequency jobs (processGuardian: 1min, monitorPipeline: 5min) 1383 * always run before low-frequency maintenance jobs (diskCleanup: daily, rotateLogs: 25h). 1384 * Without this ordering, a single slow daily job can consume the systemd TimeoutStartSec 1385 * (10min) window before the critical monitoring jobs ever get a chance to run. 1386 */ 1387 async function loadJobs() { 1388 const jobs = await getAll('SELECT * FROM ops.cron_jobs WHERE enabled = true', []); 1389 // Sort by interval in minutes ascending (1-min jobs first, weekly jobs last) 1390 jobs.sort( 1391 (a, b) => 1392 intervalToMinutes(a.interval_value, a.interval_unit) - 1393 intervalToMinutes(b.interval_value, b.interval_unit) 1394 ); 1395 return jobs; 1396 } 1397 1398 /** 1399 * Convert interval to minutes 1400 */ 1401 function intervalToMinutes(value, unit) { 1402 switch (unit) { 1403 case 'minutes': 1404 return value; 1405 case 'hours': 1406 return value * 60; 1407 case 'days': 1408 return value * 24 * 60; 1409 case 'weeks': 1410 return value * 7 * 24 * 60; 1411 default: 1412 throw new Error(`Unknown interval unit: ${unit}`); 1413 } 1414 } 1415 1416 /** 1417 * Check if job should run based on last_run_at and interval 1418 */ 1419 function shouldRun(job) { 1420 if (!job.last_run_at) return true; // Never run before 1421 1422 const now = new Date(); 1423 // PostgreSQL returns timestamps as JS Date objects via the pg driver. 1424 // Ensure we handle both Date objects and ISO strings safely. 1425 const lastRun = job.last_run_at instanceof Date 1426 ? job.last_run_at 1427 : new Date(job.last_run_at.endsWith('Z') ? job.last_run_at : `${job.last_run_at}Z`); 1428 const minutesSinceLastRun = (now - lastRun) / 1000 / 60; 1429 const intervalMinutes = intervalToMinutes(job.interval_value, job.interval_unit); 1430 1431 return minutesSinceLastRun >= intervalMinutes; 1432 } 1433 1434 /** 1435 * Update job's last_run_at timestamp 1436 */ 1437 async function updateLastRun(taskKey) { 1438 await run('UPDATE ops.cron_jobs SET last_run_at = NOW() WHERE task_key = $1', [taskKey]); 1439 } 1440 1441 /** 1442 * Create summary from result 1443 */ 1444 function generateSummary(result) { 1445 if (!result) return 'Task completed'; 1446 if (result.summary) return result.summary; 1447 if (result.manual) return 'Manual task - reminder sent'; 1448 1449 const parts = []; 1450 if (result.processed !== undefined) parts.push(`Processed: ${result.processed}`); 1451 if (result.items_processed !== undefined) parts.push(`Processed: ${result.items_processed}`); 1452 if (result.sites !== undefined) parts.push(`Sites: ${result.sites}`); 1453 if (result.outreaches !== undefined) parts.push(`Outreaches: ${result.outreaches}`); 1454 if (result.success !== undefined) parts.push(`Success: ${result.success}`); 1455 1456 return parts.length > 0 ? parts.join(', ') : 'Task completed successfully'; 1457 } 1458 1459 /** 1460 * Log task execution to database 1461 */ 1462 async function logTaskStart(taskName) { 1463 const result = await run( 1464 `INSERT INTO ops.cron_job_logs (job_name, started_at, status, items_processed, items_failed) 1465 VALUES ($1, NOW(), 'running', 0, 0)`, 1466 [taskName] 1467 ); 1468 return result.lastInsertRowid; 1469 } 1470 1471 /** 1472 * Update task log on completion 1473 */ 1474 async function logTaskComplete(logId, result) { 1475 const summary = generateSummary(result); 1476 const fullLog = JSON.stringify(result, null, 2); 1477 1478 let itemsProcessed = 1; 1479 if (result?.metrics?.processed !== undefined) { 1480 itemsProcessed = result.metrics.processed; 1481 } else if (result?.items_processed !== undefined) { 1482 itemsProcessed = result.items_processed; 1483 } else if (result?.processed !== undefined) { 1484 itemsProcessed = result.processed; 1485 } 1486 1487 await run( 1488 `UPDATE ops.cron_job_logs 1489 SET finished_at = NOW(), 1490 status = 'success', 1491 summary = $1, 1492 full_log = $2, 1493 items_processed = $3 1494 WHERE id = $4`, 1495 [summary, fullLog, itemsProcessed, logId] 1496 ); 1497 } 1498 1499 /** 1500 * Update task log on failure 1501 */ 1502 async function logTaskFailed(logId, error) { 1503 // Extract error name/type for summary 1504 const errorName = error.name || 'Error'; 1505 const summary = `${errorName}: ${error.message}`; 1506 1507 // Build full error details with same structure as success entries 1508 const fullLog = JSON.stringify( 1509 { 1510 summary, 1511 error: error.message, 1512 details: error.details || { 1513 stack: error.stack, 1514 }, 1515 metrics: { 1516 success: 0, 1517 failed: 1, 1518 }, 1519 }, 1520 null, 1521 2 1522 ); 1523 1524 await run( 1525 `UPDATE ops.cron_job_logs 1526 SET finished_at = NOW(), 1527 status = 'failed', 1528 summary = $1, 1529 full_log = $2, 1530 error_message = $3, 1531 items_failed = 1 1532 WHERE id = $4`, 1533 [summary, fullLog, error.message, logId] 1534 ); 1535 } 1536 1537 /** 1538 * Run all eligible jobs from database 1539 */ 1540 async function runCron() { 1541 logger.success('Cron started (database-driven v2)'); 1542 1543 const results = { 1544 ran: [], 1545 skipped: [], 1546 failed: [], 1547 }; 1548 1549 // Check circuit breaker - global kill switch for all cron jobs 1550 const circuitBreaker = await getOne( 1551 "SELECT value FROM ops.settings WHERE key = $1", 1552 ['cron_circuit_breaker_enabled'] 1553 ); 1554 if (circuitBreaker?.value === 'false') { 1555 logger.warn( 1556 'Circuit breaker is DISABLED - all cron jobs are disabled (systemd timer still runs, but jobs are skipped)' 1557 ); 1558 return; 1559 } 1560 1561 // Global singleton lock - prevent multiple cron instances from running simultaneously 1562 const GLOBAL_LOCK_KEY = 'cron_runner_global_lock'; 1563 const canRun = await checkAndClearStaleLock(GLOBAL_LOCK_KEY, 'Cron Runner'); 1564 1565 if (!canRun) { 1566 logger.warn('Another cron instance is already running, exiting'); 1567 return; 1568 } 1569 1570 // Set global lock (uses cron_locks table) 1571 await run( 1572 `INSERT INTO ops.cron_locks (lock_key, description, updated_at) 1573 VALUES ($1, $2, NOW()) 1574 ON CONFLICT (lock_key) DO UPDATE SET 1575 description = EXCLUDED.description, 1576 updated_at = EXCLUDED.updated_at`, 1577 [GLOBAL_LOCK_KEY, 'Global cron runner lock (singleton)'] 1578 ); 1579 1580 try { 1581 const jobs = await loadJobs(); 1582 1583 if (jobs.length === 0) { 1584 logger.warn('No enabled jobs found in database. Run npm run cron:migrate to seed jobs.'); 1585 return; 1586 } 1587 1588 for (const job of jobs) { 1589 // Check if job should run 1590 if (!shouldRun(job)) { 1591 const lastRun = job.last_run_at instanceof Date 1592 ? job.last_run_at 1593 : new Date(job.last_run_at); 1594 const nextRun = new Date( 1595 lastRun.getTime() + 1596 intervalToMinutes(job.interval_value, job.interval_unit) * 60 * 1000 1597 ); 1598 logger.info(`Skipping ${job.name} (next run: ${nextRun.toLocaleString()})`); 1599 results.skipped.push(job.name); 1600 continue; 1601 } 1602 1603 // Execute job 1604 let logId; 1605 try { 1606 logger.info(`Running: ${job.name}`); 1607 const start = Date.now(); 1608 1609 // Log task start 1610 logId = await logTaskStart(job.name); 1611 1612 // Update last_run_at BEFORE executing the job. 1613 // This prevents systemd SIGKILL (at TimeoutStartSec=10min) from leaving last_run_at 1614 // un-updated, which would cause the job to re-run on every subsequent cron cycle. 1615 // Jobs that need to retry sooner can explicitly clear last_run_at in their handler. 1616 await updateLastRun(job.task_key); 1617 1618 // Execute based on handler type 1619 let result; 1620 if (job.handler_type === 'function') { 1621 // Call handler function with timeout to prevent blocking the cron cycle. 1622 // Function-type jobs can hang just as command-type jobs do (e.g. slow DB queries, 1623 // hung LLM calls). Default: 8min (below systemd's 10min TimeoutStartSec). 1624 const handler = HANDLERS[job.task_key]; 1625 if (!handler) { 1626 throw new Error(`Handler function not found: ${job.task_key}`); 1627 } 1628 const fnTimeoutMs = (job.timeout_seconds || 480) * 1000; 1629 result = await Promise.race([ 1630 handler(), 1631 new Promise((_, reject) => 1632 setTimeout( 1633 () => 1634 reject( 1635 new Error( 1636 `Function handler timed out after ${fnTimeoutMs / 1000}s: ${job.task_key}` 1637 ) 1638 ), 1639 fnTimeoutMs 1640 ) 1641 ), 1642 ]); 1643 } else if (job.handler_type === 'command') { 1644 // Execute command with lock for pipeline stages 1645 const isPipelineStage = ['scoring', 'rescoring', 'enrich', 'proposals'].some(stage => 1646 job.handler_value.includes(stage) 1647 ); 1648 const lockKey = isPipelineStage ? `cron_${job.task_key}_running` : null; 1649 // Use job's timeout_seconds if set; otherwise default to 8 min (below systemd's 10min limit) 1650 const cmdTimeout = job.timeout_seconds || 480; 1651 result = await executeCommand(job.handler_value, lockKey, cmdTimeout); 1652 } else { 1653 throw new Error(`Unknown handler type: ${job.handler_type}`); 1654 } 1655 1656 const duration = ((Date.now() - start) / 1000).toFixed(2); 1657 1658 // Log task completion 1659 await logTaskComplete(logId, result); 1660 1661 // Update last_run_at (second write — confirms actual completion) 1662 await updateLastRun(job.task_key); 1663 1664 logger.success(`${job.name} completed in ${duration}s`); 1665 results.ran.push({ name: job.name, duration, result }); 1666 } catch (error) { 1667 logger.error(`${job.name} failed:`, error); 1668 1669 // Log task failure 1670 if (logId) { 1671 await logTaskFailed(logId, error); 1672 } 1673 1674 // Update last_run_at even on failure to prevent immediate retries 1675 await updateLastRun(job.task_key); 1676 1677 results.failed.push({ 1678 name: job.name, 1679 error: error.message, 1680 critical: Boolean(job.critical ?? 1), 1681 }); 1682 } 1683 } 1684 } finally { 1685 // Release global lock 1686 try { 1687 await run('DELETE FROM ops.cron_locks WHERE lock_key = $1', [GLOBAL_LOCK_KEY]); 1688 } catch (error) { 1689 logger.warn(`Failed to release global cron lock: ${error.message}`); 1690 } 1691 } 1692 1693 // Summary 1694 console.log('\n=== Cron Summary ==='); 1695 console.log(`Ran: ${results.ran.length} tasks`); 1696 console.log(`Skipped: ${results.skipped.length} tasks`); 1697 console.log(`Failed: ${results.failed.length} tasks`); 1698 1699 if (results.ran.length > 0) { 1700 console.log('\nCompleted:'); 1701 results.ran.forEach(r => console.log(` ${r.name} (${r.duration}s)`)); 1702 } 1703 1704 if (results.failed.length > 0) { 1705 console.log('\nFailed:'); 1706 results.failed.forEach(f => console.log(` ${f.name}: ${f.error}`)); 1707 } 1708 1709 console.log('====================\n'); 1710 1711 // Exit with error only if critical tasks failed 1712 const criticalFailures = results.failed.filter(f => f.critical); 1713 if (criticalFailures.length > 0) { 1714 console.log(`\n${criticalFailures.length} critical task(s) failed - exiting with error\n`); 1715 process.exit(1); 1716 } else if (results.failed.length > 0) { 1717 console.log( 1718 `\n${results.failed.length} non-critical task(s) failed - continuing normally\n` 1719 ); 1720 } 1721 } 1722 1723 // CLI functionality 1724 if (import.meta.url === `file://${process.argv[1]}`) { 1725 runCron() 1726 .then(() => { 1727 logger.success('Cron completed successfully'); 1728 process.exit(0); 1729 }) 1730 .catch(error => { 1731 logger.error('Cron failed:', error); 1732 process.exit(1); 1733 }); 1734 } 1735 1736 export default { runCron, HANDLERS };