pipeline-service.js
1 #!/usr/bin/env node 2 /** 3 * Continuous Pipeline Service 4 * 5 * Runs two parallel loops so local CPU and external APIs each run at full capacity: 6 * 7 * browserLoop — Assets, Enrich (Playwright) 8 * Gated by instantaneous CPU (BROWSER_CPU_GATE, default 80%). 9 * Per-site adaptive concurrency via cpu-monitor (200ms samples). 10 * 11 * apiLoop — SERPs 12 * No CPU gate — ZenRows rate limiter governs concurrency. 13 * Scoring/Rescoring moved to orchestrator (score_sites/score_semantic batches). 14 * 15 * Sites advance through the pipeline via DB status changes; the two loops are 16 * independent and never block each other. 17 * 18 * Managed by systemd/NixOS service (auto-restart on crash). 19 * Hung pipeline detection (log silent > 10min) lives in cron/pipeline-status-monitor.js. 20 * 21 * Usage: node src/pipeline-service.js 22 */ 23 24 import { existsSync } from 'fs'; 25 import { join } from 'path'; 26 import { fileURLToPath } from 'url'; 27 import './utils/load-env.js'; 28 import { config as dotenvConfig } from 'dotenv'; 29 import { runSerpsStage } from './stages/serps.js'; 30 import { runAssetsStage } from './stages/assets.js'; 31 // Scoring/Rescoring removed from pipeline — handled by orchestrator via Claude Max (zero cost). 32 // score_sites batch: HTML-only CRO scoring (Sonnet). score_semantic: semantic re-score (Sonnet). 33 // If vision scoring is ever needed, add a new orchestrator batch (screenshot → claude -p --image). 34 import { runEnrichmentStage } from './stages/enrich.js'; 35 // Proposals removed from pipeline — handled by orchestrator via Claude Max (zero OpenRouter cost) 36 import { runOutreachStage } from './stages/outreach.js'; 37 import { runRepliesStage } from './stages/replies.js'; 38 import Logger from './utils/logger.js'; 39 import { getCurrentCpuUsage, stopCpuMonitor } from './utils/cpu-monitor.js'; 40 import { getSkipStages, getRateLimitStatus, isRateLimited } from './utils/rate-limit-scheduler.js'; 41 import { run, getOne, getAll, query, withTransaction, closePool, getPool } from './utils/db.js'; 42 43 const logger = new Logger('Pipeline'); 44 const _psRoot = join(fileURLToPath(new URL('.', import.meta.url)), '..'); 45 46 // ────────────────────────────────────────────────────────────────────────────── 47 // 2Step integration — lazy-imported so missing 2Step project doesn't crash 333Method. 48 // Each wrapper catches errors independently; 2Step failures never affect 333Method. 49 // ────────────────────────────────────────────────────────────────────────────── 50 51 const TWOSTEP_ENABLED = process.env.TWOSTEP_PIPELINE_ENABLED !== 'false'; 52 const TWOSTEP_ROOT = process.env.TWOSTEP_ROOT || new URL('../../2Step', import.meta.url).pathname; 53 54 async function safe2StepImport(modulePath) { 55 try { 56 return await import(modulePath); 57 } catch (err) { 58 logger.warn(`[2Step] Could not import ${modulePath}: ${err.message}`); 59 return null; 60 } 61 } 62 63 // Lazy-cached stage function references (resolved on first call) 64 const _2stepStages = {}; 65 66 async function run2StepStage(stageName, exportName, options = {}) { 67 if (!TWOSTEP_ENABLED) return { processed: 0, succeeded: 0, failed: 0 }; 68 69 if (!_2stepStages[stageName]) { 70 const mod = await safe2StepImport(`${TWOSTEP_ROOT}/src/stages/${stageName}.js`); 71 if (!mod || !mod[exportName]) { 72 logger.info(`[2Step] ${stageName} stage not available — skipping`); 73 return { processed: 0, succeeded: 0, failed: 0 }; 74 } 75 _2stepStages[stageName] = mod[exportName]; 76 } 77 78 try { 79 logger.info(`[2Step] Running ${stageName} stage`); 80 const startTime = Date.now(); 81 const result = await _2stepStages[stageName](options); 82 const duration = ((Date.now() - startTime) / 1000).toFixed(2); 83 const summary = JSON.stringify(result).slice(0, 200); 84 logger.info(`[2Step] ${stageName} complete in ${duration}s: ${summary}`); 85 return result; 86 } catch (err) { 87 logger.error(`[2Step] ${stageName} failed (non-fatal): ${err.message}`); 88 return { processed: 0, succeeded: 0, failed: 0 }; 89 } 90 } 91 92 // PIPELINE_BATCH_MAX_SIZE: hard ceiling for any single stage batch. 93 // Replaces PIPELINE_BATCH_SIZE — each stage gets an adaptive size ≤ this limit. 94 // PIPELINE_BATCH_SIZE is kept as a deprecated alias for backwards compatibility. 95 const BATCH_MAX = parseInt( 96 process.env.PIPELINE_BATCH_MAX_SIZE || process.env.PIPELINE_BATCH_SIZE || '200', 97 10 98 ); 99 100 // Minimum backlog a stage must have before we bother spinning it up. 101 // Stages with fewer sites queued than this are skipped for the cycle to avoid 102 // wasting spin-up/down overhead on tiny batches. 103 const BATCH_MIN_THRESHOLD = parseInt(process.env.PIPELINE_BATCH_MIN_THRESHOLD || '5', 10); 104 105 const CYCLE_DELAY_MS = parseInt(process.env.PIPELINE_CYCLE_DELAY_MS || '1000', 10); 106 const PAUSE_CHECK_DELAY_MS = parseInt(process.env.PIPELINE_PAUSE_CHECK_MS || '5000', 10); 107 // Outreach has a 3-day cooldown between sends per site — no need to poll every second. 108 // When nothing is sent, sleep longer to avoid spin loop filling pipeline_metrics with 0-site rows 109 // and triggering false-positive crash loop alerts. 110 const OUTREACH_IDLE_DELAY_MS = parseInt(process.env.PIPELINE_OUTREACH_IDLE_DELAY_MS || '60000', 10); 111 112 // CPU threshold above which the browser_loop waits before starting new work. 113 // Separate from per-site adaptive scaling — this is a hard gate on the whole loop. 114 const BROWSER_CPU_GATE = parseFloat(process.env.BROWSER_CPU_GATE || '0.80'); 115 116 // ────────────────────────────────────────────────────────────────────────────── 117 // Adaptive batch sizing 118 // 119 // Priority: push sites as far toward outreach_sent as possible before clearing 120 // upper-funnel backlogs. Later stages get proportionally larger batches. 121 // 122 // Stage weight controls what fraction of BATCH_MAX each stage requests when it 123 // has a large backlog. Stages with weight 1.0 can use the full max; lower weights 124 // cap the batch proportionally. Computed batch = min(backlog, weight × BATCH_MAX). 125 // If backlog < BATCH_MIN_THRESHOLD the stage is skipped this cycle. 126 // ────────────────────────────────────────────────────────────────────────────── 127 128 // Higher weight = closer to revenue = gets more of the batch budget. 129 const STAGE_BATCH_WEIGHTS = { 130 outreach: 1.0, // revenue-critical — always run full batches 131 replies: 1.0, // revenue-critical 132 enrich: 0.7, // feeds proposals (orchestrator generates proposals from enriched sites) 133 assets: 0.5, 134 serps: 0.3, // top-of-funnel — run smaller batches to avoid overwhelming downstream 135 }; 136 137 // Map of stage name (lowercase) → SQL to count its current backlog. 138 // These mirror the WHERE clauses used in each stage's fetch query. 139 const STAGE_BACKLOG_SQL = { 140 serps: `SELECT COUNT(*) AS n FROM keywords WHERE status='active'`, 141 assets: `SELECT COUNT(*) AS n FROM sites WHERE status='found'`, 142 enrich: `SELECT COUNT(*) AS n FROM sites WHERE status IN ('semantic_scored','vision_scored')`, 143 outreach: `SELECT COUNT(*) AS n FROM messages WHERE direction='outbound' AND approval_status='approved' AND delivery_status IS NULL`, 144 replies: `SELECT COUNT(*) AS n FROM messages WHERE direction='inbound' AND processed_at IS NULL`, 145 }; 146 147 // Downstream surplus detection: if the next stage already has a large queue, 148 // skip this stage to let the pipeline drain toward outreach. 149 // Map: stage → the status it PRODUCES (i.e., the next stage's input queue). 150 const STAGE_OUTPUT_STATUS = { 151 serps: 'found', 152 assets: 'assets_captured', 153 enrich: 'enriched_regex', 154 }; 155 156 // Stages that should NEVER be skipped for surplus (revenue-critical or quota-bound). 157 const SURPLUS_SKIP_EXEMPT = new Set([ 158 'outreach', 159 'replies', 160 'serps', // ZenRows daily quota — always use it 161 ]); 162 163 // ZenRows has no daily limit (monthly subscription, confirmed with support). 164 // No STAGE_DAILY_LIMIT_GATE needed — surplus skipping for assets uses normal downstream 165 // queue depth logic. If ZenRows times out (concurrency overload), the circuit breaker 166 // handles backoff independently. 167 const STAGE_DAILY_LIMIT_GATE = {}; 168 169 // If the next stage's queue is >N× the batch size, the upstream stage can wait. 170 const SURPLUS_THRESHOLD_MULTIPLIER = parseFloat(process.env.STAGE_THROTTLE_MULTIPLIER || '3'); 171 172 async function adaptiveBatchSize(stageName) { 173 const key = stageName.toLowerCase(); 174 const sql = STAGE_BACKLOG_SQL[key]; 175 if (!sql) return BATCH_MAX; // unknown stage — use max 176 177 let backlog = 0; 178 try { 179 const row = await getOne(sql); 180 backlog = row?.n ?? 0; 181 } catch { 182 return BATCH_MAX; // query failed — use max rather than skip 183 } 184 185 if (backlog < BATCH_MIN_THRESHOLD) return 0; // signal: skip this cycle 186 187 // Downstream surplus: skip upper-funnel stages when the next queue is already large. 188 // For stages gated by a daily-limit API (e.g. assets→zenrows), only skip if the 189 // daily quota has been exhausted — don't pause the feeder while ZenRows still has quota. 190 if (!SURPLUS_SKIP_EXEMPT.has(key)) { 191 const outputStatus = STAGE_OUTPUT_STATUS[key]; 192 if (outputStatus) { 193 const gateApi = STAGE_DAILY_LIMIT_GATE[key]; 194 const dailyLimitReached = !gateApi || isRateLimited(gateApi); 195 if (dailyLimitReached) { 196 try { 197 const downstream = await getOne( 198 'SELECT COUNT(*) AS n FROM sites WHERE status = $1', 199 [outputStatus] 200 ); 201 const threshold = SURPLUS_THRESHOLD_MULTIPLIER * BATCH_MAX; 202 if ((downstream?.n ?? 0) > threshold) { 203 return -1; // signal: skip due to downstream surplus 204 } 205 } catch { 206 // ignore — just run the stage 207 } 208 } 209 } 210 } 211 212 const weight = STAGE_BATCH_WEIGHTS[key] ?? 0.5; 213 return Math.min(backlog, Math.max(BATCH_MIN_THRESHOLD, Math.round(weight * BATCH_MAX))); 214 } 215 216 // SKIP_STAGES: re-read from .env every 30s so changes take effect without restart. 217 let _skipStagesCache = new Set(); 218 let _skipStagesCacheAt = 0; 219 220 function getStaticSkipStages() { 221 const now = Date.now(); 222 if (now - _skipStagesCacheAt > 30_000) { 223 dotenvConfig({ override: true }); 224 const prev = _skipStagesCache; 225 _skipStagesCache = new Set( 226 (process.env.SKIP_STAGES || '') 227 .split(',') 228 .map(s => s.trim().toLowerCase()) 229 .filter(s => s.length > 0) 230 ); 231 _skipStagesCacheAt = now; 232 // Log changes 233 for (const s of _skipStagesCache) { 234 if (!prev.has(s)) logger.info(`⏭️ SKIP_STAGES: added '${s}' (hot-reload)`); 235 } 236 for (const s of prev) { 237 if (!_skipStagesCache.has(s)) logger.info(`▶️ SKIP_STAGES: removed '${s}' (hot-reload)`); 238 } 239 } 240 return _skipStagesCache; 241 } 242 243 // Tracks which stages are currently paused via the rate-limit-scheduler so we 244 // only log the "stage paused" / "stage resumed" messages on state transitions, 245 // not on every pipeline cycle. 246 const dynamicallySkipped = new Set(); 247 248 const BROWSER_STAGES = [ 249 { name: 'Assets', fn: runAssetsStage }, 250 { name: 'Enrich', fn: runEnrichmentStage }, 251 // 2Step: enrich runs after 333Method enrich (uses browser for contact extraction + logo treatment) 252 { name: '2Step-Enrich', fn: (opts) => run2StepStage('enrich', 'runEnrichStage', opts), is2Step: true }, 253 ]; 254 255 const API_STAGES = [ 256 { name: 'SERPs', fn: runSerpsStage }, 257 // Scoring → orchestrator score_sites batch (Sonnet, Claude Max) 258 // Rescoring → orchestrator score_semantic batch (Sonnet, Claude Max) 259 // 2Step: reviews uses Outscraper API (no browser needed) 260 { name: '2Step-Reviews', fn: (opts) => run2StepStage('reviews', 'runReviewsStage', opts), is2Step: true }, 261 // 2Step: video uses ElevenLabs API + ffmpeg (no browser) 262 { name: '2Step-Video', fn: (opts) => run2StepStage('video', 'runVideoStage', opts), is2Step: true }, 263 // 2Step: proposals uses templates (lightweight, no browser/API) 264 { name: '2Step-Proposals', fn: (opts) => run2StepStage('proposals', 'runProposalsStage', opts), is2Step: true }, 265 ]; 266 267 // Outreach and Replies run in their own parallel loop so they are never blocked 268 // by long-running scoring/proposals batches. With 23k+ approved outreaches queued 269 // they need continuous throughput, not a slot at the end of a 30-min scoring batch. 270 const OUTREACH_STAGES = [ 271 { name: 'Outreach', fn: runOutreachStage }, 272 { name: 'Replies', fn: runRepliesStage }, 273 // 2Step: outreach sends approved emails/SMS from msgs.messages where project='2step' 274 { name: '2Step-Outreach', fn: (opts) => run2StepStage('outreach', 'runOutreachStage', opts), is2Step: true }, 275 // 2Step: replies processes inbound messages and auto-responds 276 { name: '2Step-Replies', fn: (opts) => run2StepStage('replies', 'runRepliesStage', opts), is2Step: true }, 277 ]; 278 279 const sleep = ms => new Promise(resolve => setTimeout(resolve, ms)); 280 281 let running = true; 282 283 async function isPaused() { 284 const control = await getOne('SELECT paused FROM ops.pipeline_control WHERE id = 1'); 285 return control?.paused === true || control?.paused === 1; 286 } 287 288 async function getPausedBy() { 289 return await getOne('SELECT paused_by, paused_at FROM ops.pipeline_control WHERE id = 1'); 290 } 291 292 async function updateCurrentStage(stageName) { 293 await run( 294 `UPDATE ops.pipeline_control SET current_stage = $1, updated_at = NOW() WHERE id = 1`, 295 [stageName] 296 ); 297 } 298 299 async function recordLoopCycle(loopType) { 300 const colMap = { 301 browser_loop: 'last_browser_loop_at', 302 api_loop: 'last_api_loop_at', 303 outreach_loop: 'last_outreach_loop_at', 304 }; 305 const col = colMap[loopType]; 306 if (!col) return; 307 try { 308 await run( 309 `UPDATE ops.pipeline_control SET ${col} = NOW(), updated_at = NOW() WHERE id = 1` 310 ); 311 } catch { 312 // Columns may not exist on older schemas — non-critical 313 } 314 } 315 316 async function recordStageMetrics(stageName, metrics, startTime, endTime) { 317 try { 318 await run( 319 `INSERT INTO tel.pipeline_metrics 320 (stage_name, sites_processed, sites_succeeded, sites_failed, duration_ms, started_at, finished_at) 321 VALUES ($1, $2, $3, $4, $5, $6, $7)`, 322 [ 323 stageName.toLowerCase(), 324 metrics.processed || 0, 325 metrics.succeeded || 0, 326 metrics.failed || 0, 327 endTime - startTime, 328 new Date(startTime).toISOString(), 329 new Date(endTime).toISOString(), 330 ] 331 ); 332 } catch (error) { 333 logger.warn(`Failed to record metrics for ${stageName}:`, error.message); 334 } 335 } 336 337 async function runStage(stage) { 338 const stageLower = stage.name.toLowerCase(); 339 340 // Static skip (SKIP_STAGES env var — hot-reloaded from .env every 30s) 341 if (getStaticSkipStages().has(stageLower)) { 342 logger.info(`⏭️ Skipping ${stage.name} stage (SKIP_STAGES configured)`); 343 return { processed: 0, succeeded: 0, failed: 0 }; 344 } 345 346 // Dynamic skip (rate-limit-scheduler: API quota / rate limit detected at runtime) 347 const rateLimitedStages = getSkipStages(); 348 if (rateLimitedStages.has(stageLower)) { 349 if (!dynamicallySkipped.has(stageLower)) { 350 // First cycle in this rate-limit window — log once with the reset time 351 const active = getRateLimitStatus(); 352 const info = active.find(s => s.stages.includes(stageLower)); 353 if (info) { 354 logger.warn( 355 `⏸️ ${stage.name} paused: ${info.reason}. Resumes ~${info.resetAt} (~${info.waitMinutes} min)` 356 ); 357 } 358 dynamicallySkipped.add(stageLower); 359 } 360 return { processed: 0, succeeded: 0, failed: 0 }; 361 } 362 363 // Stage was dynamically skipped last cycle but is now clear — log resume 364 if (dynamicallySkipped.has(stageLower)) { 365 logger.success(`▶️ ${stage.name} resumed — rate limit cleared`); 366 dynamicallySkipped.delete(stageLower); 367 } 368 369 // 2Step stages manage their own batch sizing and DB — skip 333Method adaptive sizing. 370 // They still respect SKIP_STAGES and rate-limit checks above. 371 let batchSize; 372 if (stage.is2Step) { 373 batchSize = BATCH_MAX; // 2Step stages apply their own internal limits 374 } else { 375 batchSize = await adaptiveBatchSize(stage.name); 376 if (batchSize === 0) { 377 logger.info( 378 `⊘ ${stage.name}: backlog below threshold (${BATCH_MIN_THRESHOLD}) — skipping cycle` 379 ); 380 return { processed: 0, succeeded: 0, failed: 0 }; 381 } 382 if (batchSize === -1) { 383 logger.info( 384 `⏩ ${stage.name}: downstream queue full (>${Math.round(SURPLUS_THRESHOLD_MULTIPLIER * BATCH_MAX)}) — skipping to let pipeline drain` 385 ); 386 return { processed: 0, succeeded: 0, failed: 0 }; 387 } 388 } 389 390 await updateCurrentStage(stage.name); 391 logger.info(`▶️ Running ${stage.name} stage${stage.is2Step ? '' : ` (batch: ${batchSize}/${BATCH_MAX})`}`); 392 const startTime = Date.now(); 393 394 try { 395 const metrics = await stage.fn({ limit: batchSize }); 396 const endTime = Date.now(); 397 await recordStageMetrics(stage.name, metrics, startTime, endTime); 398 399 const duration = ((endTime - startTime) / 1000).toFixed(2); 400 const succeeded = metrics.succeeded || 0; 401 const failed = metrics.failed || 0; 402 const skipped = metrics.skipped || 0; // e.g. sites marked ignore by blocklist/dedup 403 const processed = metrics.processed || 0; 404 405 if (succeeded > 0 || failed > 0) { 406 logger.success( 407 `✓ ${stage.name} completed: ${succeeded} succeeded, ${failed} failed (${duration}s)` 408 ); 409 } else if (skipped > 0 || processed > 0) { 410 // Work was done (dedup/blocklist) even if no captures succeeded 411 logger.info( 412 `⊘ ${stage.name}: ${skipped || processed} sites deduplicated/ignored (${duration}s)` 413 ); 414 } else { 415 logger.info(`⊘ ${stage.name}: No work available (${duration}s)`); 416 } 417 418 return metrics; 419 } catch (error) { 420 logger.error(`✗ ${stage.name} failed:`, error); 421 return { processed: 0, succeeded: 0, failed: 0 }; 422 } 423 } 424 425 async function runBrowserLoop() { 426 logger.info('🖥️ Browser loop started (Assets, Enrich)'); 427 let pauseLogCounter = 0; 428 429 while (running) { 430 try { 431 if (await isPaused()) { 432 pauseLogCounter++; 433 if (pauseLogCounter % 12 === 0) { 434 const control = await getPausedBy(); 435 const pausedDuration = Math.round( 436 (Date.now() - new Date(control.paused_at).getTime()) / 1000 437 ); 438 logger.info(`⏸️ Browser loop paused by ${control.paused_by} (${pausedDuration}s)`); 439 } 440 await sleep(PAUSE_CHECK_DELAY_MS); 441 continue; 442 } 443 444 if (pauseLogCounter > 0) { 445 logger.success('▶️ Browser loop resumed'); 446 pauseLogCounter = 0; 447 } 448 449 // Hard CPU gate — wait if machine is hot before spawning browsers 450 const cpu = getCurrentCpuUsage(); 451 if (cpu > BROWSER_CPU_GATE) { 452 logger.info( 453 `🌡️ CPU at ${(cpu * 100).toFixed(0)}% > ${(BROWSER_CPU_GATE * 100).toFixed(0)}% gate — browser_loop waiting 5s` 454 ); 455 await sleep(5000); 456 continue; 457 } 458 459 // Record start of browser loop iteration immediately so pipeline-status-monitor 460 // doesn't see last_browser_loop_at as stale while a long enrich batch is running. 461 await recordLoopCycle('browser_loop'); 462 463 let didWork = false; 464 for (const stage of BROWSER_STAGES) { 465 if (!running) break; 466 const metrics = await runStage(stage); 467 // Only count as "work done" if something succeeded or was processed (dedup/blocklist). 468 // Failures alone (e.g. all circuit breaker rejections) should NOT prevent idle sleep — 469 // otherwise the loop spin-loops at ~400ms generating 100K+ useless metrics rows/day. 470 if ((metrics.succeeded || 0) + (metrics.skipped || 0) + (metrics.processed || 0) > 0) 471 didWork = true; 472 } 473 474 if (!didWork) await sleep(CYCLE_DELAY_MS); 475 } catch (error) { 476 logger.error('❌ Browser loop error:', error); 477 await sleep(5000); 478 } 479 } 480 481 logger.info('🖥️ Browser loop stopped'); 482 } 483 484 // ── Outreach efficiency tracking ──────────────────────────────────────────── 485 // RULE: If outreach sends < 95% of its batch size, this is the #1 pipeline 486 // priority. A starving outreach module means the entire upstream is wasted work. 487 // The pipeline should never be generating more proposals/enrichments/scores 488 // while outreach has no sendable inventory. 489 const OUTREACH_EFFICIENCY_TARGET = 0.95; 490 const OUTREACH_EFFICIENCY_WINDOW = 10; // cycles to track 491 const outreachEfficiencyHistory = []; 492 493 function trackOutreachEfficiency(sent, batchSize) { 494 if (batchSize <= 0) return; 495 outreachEfficiencyHistory.push({ sent, batchSize, at: Date.now() }); 496 if (outreachEfficiencyHistory.length > OUTREACH_EFFICIENCY_WINDOW) 497 outreachEfficiencyHistory.shift(); 498 499 // Log warning if sustained underperformance 500 if (outreachEfficiencyHistory.length >= OUTREACH_EFFICIENCY_WINDOW) { 501 const avgRate = outreachEfficiencyHistory.reduce((sum, e) => sum + e.sent, 0) / 502 outreachEfficiencyHistory.reduce((sum, e) => sum + e.batchSize, 0); 503 if (avgRate < OUTREACH_EFFICIENCY_TARGET) { 504 logger.warn( 505 `⚠️ OUTREACH EFFICIENCY: ${(avgRate * 100).toFixed(0)}% over last ${OUTREACH_EFFICIENCY_WINDOW} cycles ` + 506 `(target: ${(OUTREACH_EFFICIENCY_TARGET * 100).toFixed(0)}%). ` + 507 `Pipeline is generating work that outreach cannot send. ` + 508 `Check: cooldowns, GDPR blocks, SMS-blocked countries, missing templates.` 509 ); 510 } 511 } 512 } 513 514 async function runOutreachLoop() { 515 logger.info('📤 Outreach loop started (Outreach, Replies)'); 516 let pauseLogCounter = 0; 517 518 while (running) { 519 try { 520 if (await isPaused()) { 521 pauseLogCounter++; 522 if (pauseLogCounter % 12 === 0) { 523 const control = await getPausedBy(); 524 const pausedDuration = Math.round( 525 (Date.now() - new Date(control.paused_at).getTime()) / 1000 526 ); 527 logger.info(`⏸️ Outreach loop paused by ${control.paused_by} (${pausedDuration}s)`); 528 } 529 await sleep(PAUSE_CHECK_DELAY_MS); 530 continue; 531 } 532 533 if (pauseLogCounter > 0) { 534 logger.success('▶️ Outreach loop resumed'); 535 pauseLogCounter = 0; 536 } 537 538 let didWork = false; 539 for (const stage of OUTREACH_STAGES) { 540 if (!running) break; 541 const metrics = await runStage(stage); 542 // Only succeeded counts as work — failures alone (breaker rejections) trigger idle sleep 543 if ((metrics.succeeded || 0) > 0) didWork = true; 544 545 // Track outreach send efficiency (333Method Outreach stage only) 546 if (stage.name === 'Outreach') { 547 trackOutreachEfficiency(metrics.succeeded || 0, BATCH_MAX); 548 } 549 } 550 551 await recordLoopCycle('outreach_loop'); 552 553 // Use longer idle delay for outreach: 3-day cooldown means nothing new to send for a while. 554 // Prevents spin loop that fills pipeline_metrics with 0-site rows and trips crash-loop alerts. 555 if (!didWork) await sleep(OUTREACH_IDLE_DELAY_MS); 556 } catch (error) { 557 logger.error('❌ Outreach loop error:', error); 558 await sleep(5000); 559 } 560 } 561 562 logger.info('📤 Outreach loop stopped'); 563 } 564 565 async function runApiLoop() { 566 logger.info('🌐 API loop started (SERPs only — Scoring/Rescoring handled by orchestrator)'); 567 let pauseLogCounter = 0; 568 569 while (running) { 570 try { 571 if (await isPaused()) { 572 pauseLogCounter++; 573 if (pauseLogCounter % 12 === 0) { 574 const control = await getPausedBy(); 575 const pausedDuration = Math.round( 576 (Date.now() - new Date(control.paused_at).getTime()) / 1000 577 ); 578 logger.info(`⏸️ API loop paused by ${control.paused_by} (${pausedDuration}s)`); 579 } 580 await sleep(PAUSE_CHECK_DELAY_MS); 581 continue; 582 } 583 584 if (pauseLogCounter > 0) { 585 logger.success('▶️ API loop resumed'); 586 pauseLogCounter = 0; 587 } 588 589 let didWork = false; 590 for (const stage of API_STAGES) { 591 if (!running) break; 592 const metrics = await runStage(stage); 593 // Only succeeded counts as work — failures alone (breaker rejections) trigger idle sleep 594 if ((metrics.succeeded || 0) > 0) didWork = true; 595 } 596 597 await recordLoopCycle('api_loop'); 598 599 if (!didWork) await sleep(CYCLE_DELAY_MS); 600 } catch (error) { 601 logger.error('❌ API loop error:', error); 602 await sleep(5000); 603 } 604 } 605 606 logger.info('🌐 API loop stopped'); 607 } 608 609 async function main() { 610 logger.success('🚀 Pipeline Service starting (parallel mode)...'); 611 logger.info( 612 `Batch max: ${BATCH_MAX} sites/stage (adaptive, min threshold: ${BATCH_MIN_THRESHOLD})` 613 ); 614 logger.info(`Cycle delay: ${CYCLE_DELAY_MS}ms`); 615 logger.info(`Browser CPU gate: ${(BROWSER_CPU_GATE * 100).toFixed(0)}% instantaneous CPU`); 616 logger.info(`2Step integration: ${TWOSTEP_ENABLED ? 'enabled' : 'disabled (TWOSTEP_PIPELINE_ENABLED=false)'}`); 617 618 const initialSkip = getStaticSkipStages(); 619 if (initialSkip.size > 0) { 620 logger.info(`Skipping stages (static): ${Array.from(initialSkip).join(', ')}`); 621 } 622 623 // Restore any rate limits that were active before a pipeline restart 624 const activeLimits = getRateLimitStatus(); 625 for (const limit of activeLimits) { 626 logger.warn( 627 `⏸️ Rate limit restored from disk: [${limit.stages.join(', ')}] paused until ${limit.resetAt} (~${limit.waitMinutes} min) — ${limit.reason}` 628 ); 629 } 630 631 logger.info(''); 632 633 // Ensure the pipeline_control row exists (idempotent upsert) 634 await run( 635 `INSERT INTO ops.pipeline_control (id, paused) VALUES (1, false) 636 ON CONFLICT (id) DO NOTHING` 637 ); 638 639 // Run browser_loop, api_loop, and outreach_loop in parallel — they never block each other. 640 // Outreach is separated so it is not starved by long scoring/proposals batches. 641 await Promise.all([runBrowserLoop(), runApiLoop(), runOutreachLoop()]); 642 } 643 644 async function gracefulShutdown(signal) { 645 logger.warn(`Received ${signal}, shutting down gracefully...`); 646 running = false; 647 stopCpuMonitor(); 648 try { 649 await run( 650 `UPDATE ops.pipeline_control SET current_stage = NULL, updated_at = NOW() WHERE id = 1` 651 ); 652 logger.success('✓ Cleaned up pipeline state'); 653 } catch (error) { 654 logger.error('Failed to cleanup:', error.message); 655 } 656 await closePool(); 657 process.exit(0); 658 } 659 660 process.on('SIGTERM', () => gracefulShutdown('SIGTERM')); 661 process.on('SIGINT', () => gracefulShutdown('SIGINT')); 662 process.on('uncaughtException', error => { 663 logger.error('Uncaught exception:', error); 664 gracefulShutdown('uncaughtException'); 665 }); 666 process.on('unhandledRejection', (reason, _promise) => { 667 logger.error('Unhandled rejection', { 668 reason: reason instanceof Error ? reason.message : String(reason), 669 stack: reason instanceof Error ? reason.stack : undefined, 670 type: reason?.constructor?.name || typeof reason, 671 }); 672 gracefulShutdown('unhandledRejection'); 673 }); 674 675 main().catch(error => { 676 logger.error('Fatal error:', error); 677 process.exit(1); 678 });