/ src / pipeline-service.js
pipeline-service.js
  1  #!/usr/bin/env node
  2  /**
  3   * Continuous Pipeline Service
  4   *
  5   * Runs two parallel loops so local CPU and external APIs each run at full capacity:
  6   *
  7   *   browserLoop  — Assets, Enrich (Playwright)
  8   *                  Gated by instantaneous CPU (BROWSER_CPU_GATE, default 80%).
  9   *                  Per-site adaptive concurrency via cpu-monitor (200ms samples).
 10   *
 11   *   apiLoop      — SERPs
 12   *                  No CPU gate — ZenRows rate limiter governs concurrency.
 13   *                  Scoring/Rescoring moved to orchestrator (score_sites/score_semantic batches).
 14   *
 15   * Sites advance through the pipeline via DB status changes; the two loops are
 16   * independent and never block each other.
 17   *
 18   * Managed by systemd/NixOS service (auto-restart on crash).
 19   * Hung pipeline detection (log silent > 10min) lives in cron/pipeline-status-monitor.js.
 20   *
 21   * Usage: node src/pipeline-service.js
 22   */
 23  
 24  import { existsSync } from 'fs';
 25  import { join } from 'path';
 26  import { fileURLToPath } from 'url';
 27  import './utils/load-env.js';
 28  import { config as dotenvConfig } from 'dotenv';
 29  import { runSerpsStage } from './stages/serps.js';
 30  import { runAssetsStage } from './stages/assets.js';
 31  // Scoring/Rescoring removed from pipeline — handled by orchestrator via Claude Max (zero cost).
 32  // score_sites batch: HTML-only CRO scoring (Sonnet). score_semantic: semantic re-score (Sonnet).
 33  // If vision scoring is ever needed, add a new orchestrator batch (screenshot → claude -p --image).
 34  import { runEnrichmentStage } from './stages/enrich.js';
 35  // Proposals removed from pipeline — handled by orchestrator via Claude Max (zero OpenRouter cost)
 36  import { runOutreachStage } from './stages/outreach.js';
 37  import { runRepliesStage } from './stages/replies.js';
 38  import Logger from './utils/logger.js';
 39  import { getCurrentCpuUsage, stopCpuMonitor } from './utils/cpu-monitor.js';
 40  import { getSkipStages, getRateLimitStatus, isRateLimited } from './utils/rate-limit-scheduler.js';
 41  import { run, getOne, getAll, query, withTransaction, closePool, getPool } from './utils/db.js';
 42  
 43  const logger = new Logger('Pipeline');
 44  const _psRoot = join(fileURLToPath(new URL('.', import.meta.url)), '..');
 45  
 46  // ──────────────────────────────────────────────────────────────────────────────
 47  // 2Step integration — lazy-imported so missing 2Step project doesn't crash 333Method.
 48  // Each wrapper catches errors independently; 2Step failures never affect 333Method.
 49  // ──────────────────────────────────────────────────────────────────────────────
 50  
 51  const TWOSTEP_ENABLED = process.env.TWOSTEP_PIPELINE_ENABLED !== 'false';
 52  const TWOSTEP_ROOT = process.env.TWOSTEP_ROOT || new URL('../../2Step', import.meta.url).pathname;
 53  
 54  async function safe2StepImport(modulePath) {
 55    try {
 56      return await import(modulePath);
 57    } catch (err) {
 58      logger.warn(`[2Step] Could not import ${modulePath}: ${err.message}`);
 59      return null;
 60    }
 61  }
 62  
 63  // Lazy-cached stage function references (resolved on first call)
 64  const _2stepStages = {};
 65  
 66  async function run2StepStage(stageName, exportName, options = {}) {
 67    if (!TWOSTEP_ENABLED) return { processed: 0, succeeded: 0, failed: 0 };
 68  
 69    if (!_2stepStages[stageName]) {
 70      const mod = await safe2StepImport(`${TWOSTEP_ROOT}/src/stages/${stageName}.js`);
 71      if (!mod || !mod[exportName]) {
 72        logger.info(`[2Step] ${stageName} stage not available — skipping`);
 73        return { processed: 0, succeeded: 0, failed: 0 };
 74      }
 75      _2stepStages[stageName] = mod[exportName];
 76    }
 77  
 78    try {
 79      logger.info(`[2Step] Running ${stageName} stage`);
 80      const startTime = Date.now();
 81      const result = await _2stepStages[stageName](options);
 82      const duration = ((Date.now() - startTime) / 1000).toFixed(2);
 83      const summary = JSON.stringify(result).slice(0, 200);
 84      logger.info(`[2Step] ${stageName} complete in ${duration}s: ${summary}`);
 85      return result;
 86    } catch (err) {
 87      logger.error(`[2Step] ${stageName} failed (non-fatal): ${err.message}`);
 88      return { processed: 0, succeeded: 0, failed: 0 };
 89    }
 90  }
 91  
 92  // PIPELINE_BATCH_MAX_SIZE: hard ceiling for any single stage batch.
 93  // Replaces PIPELINE_BATCH_SIZE — each stage gets an adaptive size ≤ this limit.
 94  // PIPELINE_BATCH_SIZE is kept as a deprecated alias for backwards compatibility.
 95  const BATCH_MAX = parseInt(
 96    process.env.PIPELINE_BATCH_MAX_SIZE || process.env.PIPELINE_BATCH_SIZE || '200',
 97    10
 98  );
 99  
100  // Minimum backlog a stage must have before we bother spinning it up.
101  // Stages with fewer sites queued than this are skipped for the cycle to avoid
102  // wasting spin-up/down overhead on tiny batches.
103  const BATCH_MIN_THRESHOLD = parseInt(process.env.PIPELINE_BATCH_MIN_THRESHOLD || '5', 10);
104  
105  const CYCLE_DELAY_MS = parseInt(process.env.PIPELINE_CYCLE_DELAY_MS || '1000', 10);
106  const PAUSE_CHECK_DELAY_MS = parseInt(process.env.PIPELINE_PAUSE_CHECK_MS || '5000', 10);
107  // Outreach has a 3-day cooldown between sends per site — no need to poll every second.
108  // When nothing is sent, sleep longer to avoid spin loop filling pipeline_metrics with 0-site rows
109  // and triggering false-positive crash loop alerts.
110  const OUTREACH_IDLE_DELAY_MS = parseInt(process.env.PIPELINE_OUTREACH_IDLE_DELAY_MS || '60000', 10);
111  
112  // CPU threshold above which the browser_loop waits before starting new work.
113  // Separate from per-site adaptive scaling — this is a hard gate on the whole loop.
114  const BROWSER_CPU_GATE = parseFloat(process.env.BROWSER_CPU_GATE || '0.80');
115  
116  // ──────────────────────────────────────────────────────────────────────────────
117  // Adaptive batch sizing
118  //
119  // Priority: push sites as far toward outreach_sent as possible before clearing
120  // upper-funnel backlogs. Later stages get proportionally larger batches.
121  //
122  // Stage weight controls what fraction of BATCH_MAX each stage requests when it
123  // has a large backlog. Stages with weight 1.0 can use the full max; lower weights
124  // cap the batch proportionally. Computed batch = min(backlog, weight × BATCH_MAX).
125  // If backlog < BATCH_MIN_THRESHOLD the stage is skipped this cycle.
126  // ──────────────────────────────────────────────────────────────────────────────
127  
128  // Higher weight = closer to revenue = gets more of the batch budget.
129  const STAGE_BATCH_WEIGHTS = {
130    outreach: 1.0, // revenue-critical — always run full batches
131    replies: 1.0, // revenue-critical
132    enrich: 0.7, // feeds proposals (orchestrator generates proposals from enriched sites)
133    assets: 0.5,
134    serps: 0.3, // top-of-funnel — run smaller batches to avoid overwhelming downstream
135  };
136  
137  // Map of stage name (lowercase) → SQL to count its current backlog.
138  // These mirror the WHERE clauses used in each stage's fetch query.
139  const STAGE_BACKLOG_SQL = {
140    serps: `SELECT COUNT(*) AS n FROM keywords WHERE status='active'`,
141    assets: `SELECT COUNT(*) AS n FROM sites WHERE status='found'`,
142    enrich: `SELECT COUNT(*) AS n FROM sites WHERE status IN ('semantic_scored','vision_scored')`,
143    outreach: `SELECT COUNT(*) AS n FROM messages WHERE direction='outbound' AND approval_status='approved' AND delivery_status IS NULL`,
144    replies: `SELECT COUNT(*) AS n FROM messages WHERE direction='inbound' AND processed_at IS NULL`,
145  };
146  
147  // Downstream surplus detection: if the next stage already has a large queue,
148  // skip this stage to let the pipeline drain toward outreach.
149  // Map: stage → the status it PRODUCES (i.e., the next stage's input queue).
150  const STAGE_OUTPUT_STATUS = {
151    serps: 'found',
152    assets: 'assets_captured',
153    enrich: 'enriched_regex',
154  };
155  
156  // Stages that should NEVER be skipped for surplus (revenue-critical or quota-bound).
157  const SURPLUS_SKIP_EXEMPT = new Set([
158    'outreach',
159    'replies',
160    'serps', // ZenRows daily quota — always use it
161  ]);
162  
163  // ZenRows has no daily limit (monthly subscription, confirmed with support).
164  // No STAGE_DAILY_LIMIT_GATE needed — surplus skipping for assets uses normal downstream
165  // queue depth logic. If ZenRows times out (concurrency overload), the circuit breaker
166  // handles backoff independently.
167  const STAGE_DAILY_LIMIT_GATE = {};
168  
169  // If the next stage's queue is >N× the batch size, the upstream stage can wait.
170  const SURPLUS_THRESHOLD_MULTIPLIER = parseFloat(process.env.STAGE_THROTTLE_MULTIPLIER || '3');
171  
172  async function adaptiveBatchSize(stageName) {
173    const key = stageName.toLowerCase();
174    const sql = STAGE_BACKLOG_SQL[key];
175    if (!sql) return BATCH_MAX; // unknown stage — use max
176  
177    let backlog = 0;
178    try {
179      const row = await getOne(sql);
180      backlog = row?.n ?? 0;
181    } catch {
182      return BATCH_MAX; // query failed — use max rather than skip
183    }
184  
185    if (backlog < BATCH_MIN_THRESHOLD) return 0; // signal: skip this cycle
186  
187    // Downstream surplus: skip upper-funnel stages when the next queue is already large.
188    // For stages gated by a daily-limit API (e.g. assets→zenrows), only skip if the
189    // daily quota has been exhausted — don't pause the feeder while ZenRows still has quota.
190    if (!SURPLUS_SKIP_EXEMPT.has(key)) {
191      const outputStatus = STAGE_OUTPUT_STATUS[key];
192      if (outputStatus) {
193        const gateApi = STAGE_DAILY_LIMIT_GATE[key];
194        const dailyLimitReached = !gateApi || isRateLimited(gateApi);
195        if (dailyLimitReached) {
196          try {
197            const downstream = await getOne(
198              'SELECT COUNT(*) AS n FROM sites WHERE status = $1',
199              [outputStatus]
200            );
201            const threshold = SURPLUS_THRESHOLD_MULTIPLIER * BATCH_MAX;
202            if ((downstream?.n ?? 0) > threshold) {
203              return -1; // signal: skip due to downstream surplus
204            }
205          } catch {
206            // ignore — just run the stage
207          }
208        }
209      }
210    }
211  
212    const weight = STAGE_BATCH_WEIGHTS[key] ?? 0.5;
213    return Math.min(backlog, Math.max(BATCH_MIN_THRESHOLD, Math.round(weight * BATCH_MAX)));
214  }
215  
216  // SKIP_STAGES: re-read from .env every 30s so changes take effect without restart.
217  let _skipStagesCache = new Set();
218  let _skipStagesCacheAt = 0;
219  
220  function getStaticSkipStages() {
221    const now = Date.now();
222    if (now - _skipStagesCacheAt > 30_000) {
223      dotenvConfig({ override: true });
224      const prev = _skipStagesCache;
225      _skipStagesCache = new Set(
226        (process.env.SKIP_STAGES || '')
227          .split(',')
228          .map(s => s.trim().toLowerCase())
229          .filter(s => s.length > 0)
230      );
231      _skipStagesCacheAt = now;
232      // Log changes
233      for (const s of _skipStagesCache) {
234        if (!prev.has(s)) logger.info(`⏭️  SKIP_STAGES: added '${s}' (hot-reload)`);
235      }
236      for (const s of prev) {
237        if (!_skipStagesCache.has(s)) logger.info(`▶️  SKIP_STAGES: removed '${s}' (hot-reload)`);
238      }
239    }
240    return _skipStagesCache;
241  }
242  
243  // Tracks which stages are currently paused via the rate-limit-scheduler so we
244  // only log the "stage paused" / "stage resumed" messages on state transitions,
245  // not on every pipeline cycle.
246  const dynamicallySkipped = new Set();
247  
248  const BROWSER_STAGES = [
249    { name: 'Assets', fn: runAssetsStage },
250    { name: 'Enrich', fn: runEnrichmentStage },
251    // 2Step: enrich runs after 333Method enrich (uses browser for contact extraction + logo treatment)
252    { name: '2Step-Enrich', fn: (opts) => run2StepStage('enrich', 'runEnrichStage', opts), is2Step: true },
253  ];
254  
255  const API_STAGES = [
256    { name: 'SERPs', fn: runSerpsStage },
257    // Scoring → orchestrator score_sites batch (Sonnet, Claude Max)
258    // Rescoring → orchestrator score_semantic batch (Sonnet, Claude Max)
259    // 2Step: reviews uses Outscraper API (no browser needed)
260    { name: '2Step-Reviews', fn: (opts) => run2StepStage('reviews', 'runReviewsStage', opts), is2Step: true },
261    // 2Step: video uses ElevenLabs API + ffmpeg (no browser)
262    { name: '2Step-Video', fn: (opts) => run2StepStage('video', 'runVideoStage', opts), is2Step: true },
263    // 2Step: proposals uses templates (lightweight, no browser/API)
264    { name: '2Step-Proposals', fn: (opts) => run2StepStage('proposals', 'runProposalsStage', opts), is2Step: true },
265  ];
266  
267  // Outreach and Replies run in their own parallel loop so they are never blocked
268  // by long-running scoring/proposals batches. With 23k+ approved outreaches queued
269  // they need continuous throughput, not a slot at the end of a 30-min scoring batch.
270  const OUTREACH_STAGES = [
271    { name: 'Outreach', fn: runOutreachStage },
272    { name: 'Replies', fn: runRepliesStage },
273    // 2Step: outreach sends approved emails/SMS from msgs.messages where project='2step'
274    { name: '2Step-Outreach', fn: (opts) => run2StepStage('outreach', 'runOutreachStage', opts), is2Step: true },
275    // 2Step: replies processes inbound messages and auto-responds
276    { name: '2Step-Replies', fn: (opts) => run2StepStage('replies', 'runRepliesStage', opts), is2Step: true },
277  ];
278  
279  const sleep = ms => new Promise(resolve => setTimeout(resolve, ms));
280  
281  let running = true;
282  
283  async function isPaused() {
284    const control = await getOne('SELECT paused FROM ops.pipeline_control WHERE id = 1');
285    return control?.paused === true || control?.paused === 1;
286  }
287  
288  async function getPausedBy() {
289    return await getOne('SELECT paused_by, paused_at FROM ops.pipeline_control WHERE id = 1');
290  }
291  
292  async function updateCurrentStage(stageName) {
293    await run(
294      `UPDATE ops.pipeline_control SET current_stage = $1, updated_at = NOW() WHERE id = 1`,
295      [stageName]
296    );
297  }
298  
299  async function recordLoopCycle(loopType) {
300    const colMap = {
301      browser_loop: 'last_browser_loop_at',
302      api_loop: 'last_api_loop_at',
303      outreach_loop: 'last_outreach_loop_at',
304    };
305    const col = colMap[loopType];
306    if (!col) return;
307    try {
308      await run(
309        `UPDATE ops.pipeline_control SET ${col} = NOW(), updated_at = NOW() WHERE id = 1`
310      );
311    } catch {
312      // Columns may not exist on older schemas — non-critical
313    }
314  }
315  
316  async function recordStageMetrics(stageName, metrics, startTime, endTime) {
317    try {
318      await run(
319        `INSERT INTO tel.pipeline_metrics
320           (stage_name, sites_processed, sites_succeeded, sites_failed, duration_ms, started_at, finished_at)
321         VALUES ($1, $2, $3, $4, $5, $6, $7)`,
322        [
323          stageName.toLowerCase(),
324          metrics.processed || 0,
325          metrics.succeeded || 0,
326          metrics.failed || 0,
327          endTime - startTime,
328          new Date(startTime).toISOString(),
329          new Date(endTime).toISOString(),
330        ]
331      );
332    } catch (error) {
333      logger.warn(`Failed to record metrics for ${stageName}:`, error.message);
334    }
335  }
336  
337  async function runStage(stage) {
338    const stageLower = stage.name.toLowerCase();
339  
340    // Static skip (SKIP_STAGES env var — hot-reloaded from .env every 30s)
341    if (getStaticSkipStages().has(stageLower)) {
342      logger.info(`⏭️  Skipping ${stage.name} stage (SKIP_STAGES configured)`);
343      return { processed: 0, succeeded: 0, failed: 0 };
344    }
345  
346    // Dynamic skip (rate-limit-scheduler: API quota / rate limit detected at runtime)
347    const rateLimitedStages = getSkipStages();
348    if (rateLimitedStages.has(stageLower)) {
349      if (!dynamicallySkipped.has(stageLower)) {
350        // First cycle in this rate-limit window — log once with the reset time
351        const active = getRateLimitStatus();
352        const info = active.find(s => s.stages.includes(stageLower));
353        if (info) {
354          logger.warn(
355            `⏸️  ${stage.name} paused: ${info.reason}. Resumes ~${info.resetAt} (~${info.waitMinutes} min)`
356          );
357        }
358        dynamicallySkipped.add(stageLower);
359      }
360      return { processed: 0, succeeded: 0, failed: 0 };
361    }
362  
363    // Stage was dynamically skipped last cycle but is now clear — log resume
364    if (dynamicallySkipped.has(stageLower)) {
365      logger.success(`▶️  ${stage.name} resumed — rate limit cleared`);
366      dynamicallySkipped.delete(stageLower);
367    }
368  
369    // 2Step stages manage their own batch sizing and DB — skip 333Method adaptive sizing.
370    // They still respect SKIP_STAGES and rate-limit checks above.
371    let batchSize;
372    if (stage.is2Step) {
373      batchSize = BATCH_MAX; // 2Step stages apply their own internal limits
374    } else {
375      batchSize = await adaptiveBatchSize(stage.name);
376      if (batchSize === 0) {
377        logger.info(
378          `⊘ ${stage.name}: backlog below threshold (${BATCH_MIN_THRESHOLD}) — skipping cycle`
379        );
380        return { processed: 0, succeeded: 0, failed: 0 };
381      }
382      if (batchSize === -1) {
383        logger.info(
384          `⏩ ${stage.name}: downstream queue full (>${Math.round(SURPLUS_THRESHOLD_MULTIPLIER * BATCH_MAX)}) — skipping to let pipeline drain`
385        );
386        return { processed: 0, succeeded: 0, failed: 0 };
387      }
388    }
389  
390    await updateCurrentStage(stage.name);
391    logger.info(`▶️  Running ${stage.name} stage${stage.is2Step ? '' : ` (batch: ${batchSize}/${BATCH_MAX})`}`);
392    const startTime = Date.now();
393  
394    try {
395      const metrics = await stage.fn({ limit: batchSize });
396      const endTime = Date.now();
397      await recordStageMetrics(stage.name, metrics, startTime, endTime);
398  
399      const duration = ((endTime - startTime) / 1000).toFixed(2);
400      const succeeded = metrics.succeeded || 0;
401      const failed = metrics.failed || 0;
402      const skipped = metrics.skipped || 0; // e.g. sites marked ignore by blocklist/dedup
403      const processed = metrics.processed || 0;
404  
405      if (succeeded > 0 || failed > 0) {
406        logger.success(
407          `✓ ${stage.name} completed: ${succeeded} succeeded, ${failed} failed (${duration}s)`
408        );
409      } else if (skipped > 0 || processed > 0) {
410        // Work was done (dedup/blocklist) even if no captures succeeded
411        logger.info(
412          `⊘ ${stage.name}: ${skipped || processed} sites deduplicated/ignored (${duration}s)`
413        );
414      } else {
415        logger.info(`⊘ ${stage.name}: No work available (${duration}s)`);
416      }
417  
418      return metrics;
419    } catch (error) {
420      logger.error(`✗ ${stage.name} failed:`, error);
421      return { processed: 0, succeeded: 0, failed: 0 };
422    }
423  }
424  
425  async function runBrowserLoop() {
426    logger.info('🖥️  Browser loop started (Assets, Enrich)');
427    let pauseLogCounter = 0;
428  
429    while (running) {
430      try {
431        if (await isPaused()) {
432          pauseLogCounter++;
433          if (pauseLogCounter % 12 === 0) {
434            const control = await getPausedBy();
435            const pausedDuration = Math.round(
436              (Date.now() - new Date(control.paused_at).getTime()) / 1000
437            );
438            logger.info(`⏸️  Browser loop paused by ${control.paused_by} (${pausedDuration}s)`);
439          }
440          await sleep(PAUSE_CHECK_DELAY_MS);
441          continue;
442        }
443  
444        if (pauseLogCounter > 0) {
445          logger.success('▶️  Browser loop resumed');
446          pauseLogCounter = 0;
447        }
448  
449        // Hard CPU gate — wait if machine is hot before spawning browsers
450        const cpu = getCurrentCpuUsage();
451        if (cpu > BROWSER_CPU_GATE) {
452          logger.info(
453            `🌡️  CPU at ${(cpu * 100).toFixed(0)}% > ${(BROWSER_CPU_GATE * 100).toFixed(0)}% gate — browser_loop waiting 5s`
454          );
455          await sleep(5000);
456          continue;
457        }
458  
459        // Record start of browser loop iteration immediately so pipeline-status-monitor
460        // doesn't see last_browser_loop_at as stale while a long enrich batch is running.
461        await recordLoopCycle('browser_loop');
462  
463        let didWork = false;
464        for (const stage of BROWSER_STAGES) {
465          if (!running) break;
466          const metrics = await runStage(stage);
467          // Only count as "work done" if something succeeded or was processed (dedup/blocklist).
468          // Failures alone (e.g. all circuit breaker rejections) should NOT prevent idle sleep —
469          // otherwise the loop spin-loops at ~400ms generating 100K+ useless metrics rows/day.
470          if ((metrics.succeeded || 0) + (metrics.skipped || 0) + (metrics.processed || 0) > 0)
471            didWork = true;
472        }
473  
474        if (!didWork) await sleep(CYCLE_DELAY_MS);
475      } catch (error) {
476        logger.error('❌ Browser loop error:', error);
477        await sleep(5000);
478      }
479    }
480  
481    logger.info('🖥️  Browser loop stopped');
482  }
483  
484  // ── Outreach efficiency tracking ────────────────────────────────────────────
485  // RULE: If outreach sends < 95% of its batch size, this is the #1 pipeline
486  // priority. A starving outreach module means the entire upstream is wasted work.
487  // The pipeline should never be generating more proposals/enrichments/scores
488  // while outreach has no sendable inventory.
489  const OUTREACH_EFFICIENCY_TARGET = 0.95;
490  const OUTREACH_EFFICIENCY_WINDOW = 10; // cycles to track
491  const outreachEfficiencyHistory = [];
492  
493  function trackOutreachEfficiency(sent, batchSize) {
494    if (batchSize <= 0) return;
495    outreachEfficiencyHistory.push({ sent, batchSize, at: Date.now() });
496    if (outreachEfficiencyHistory.length > OUTREACH_EFFICIENCY_WINDOW)
497      outreachEfficiencyHistory.shift();
498  
499    // Log warning if sustained underperformance
500    if (outreachEfficiencyHistory.length >= OUTREACH_EFFICIENCY_WINDOW) {
501      const avgRate = outreachEfficiencyHistory.reduce((sum, e) => sum + e.sent, 0) /
502        outreachEfficiencyHistory.reduce((sum, e) => sum + e.batchSize, 0);
503      if (avgRate < OUTREACH_EFFICIENCY_TARGET) {
504        logger.warn(
505          `⚠️  OUTREACH EFFICIENCY: ${(avgRate * 100).toFixed(0)}% over last ${OUTREACH_EFFICIENCY_WINDOW} cycles ` +
506          `(target: ${(OUTREACH_EFFICIENCY_TARGET * 100).toFixed(0)}%). ` +
507          `Pipeline is generating work that outreach cannot send. ` +
508          `Check: cooldowns, GDPR blocks, SMS-blocked countries, missing templates.`
509        );
510      }
511    }
512  }
513  
514  async function runOutreachLoop() {
515    logger.info('📤 Outreach loop started (Outreach, Replies)');
516    let pauseLogCounter = 0;
517  
518    while (running) {
519      try {
520        if (await isPaused()) {
521          pauseLogCounter++;
522          if (pauseLogCounter % 12 === 0) {
523            const control = await getPausedBy();
524            const pausedDuration = Math.round(
525              (Date.now() - new Date(control.paused_at).getTime()) / 1000
526            );
527            logger.info(`⏸️  Outreach loop paused by ${control.paused_by} (${pausedDuration}s)`);
528          }
529          await sleep(PAUSE_CHECK_DELAY_MS);
530          continue;
531        }
532  
533        if (pauseLogCounter > 0) {
534          logger.success('▶️  Outreach loop resumed');
535          pauseLogCounter = 0;
536        }
537  
538        let didWork = false;
539        for (const stage of OUTREACH_STAGES) {
540          if (!running) break;
541          const metrics = await runStage(stage);
542          // Only succeeded counts as work — failures alone (breaker rejections) trigger idle sleep
543          if ((metrics.succeeded || 0) > 0) didWork = true;
544  
545          // Track outreach send efficiency (333Method Outreach stage only)
546          if (stage.name === 'Outreach') {
547            trackOutreachEfficiency(metrics.succeeded || 0, BATCH_MAX);
548          }
549        }
550  
551        await recordLoopCycle('outreach_loop');
552  
553        // Use longer idle delay for outreach: 3-day cooldown means nothing new to send for a while.
554        // Prevents spin loop that fills pipeline_metrics with 0-site rows and trips crash-loop alerts.
555        if (!didWork) await sleep(OUTREACH_IDLE_DELAY_MS);
556      } catch (error) {
557        logger.error('❌ Outreach loop error:', error);
558        await sleep(5000);
559      }
560    }
561  
562    logger.info('📤 Outreach loop stopped');
563  }
564  
565  async function runApiLoop() {
566    logger.info('🌐 API loop started (SERPs only — Scoring/Rescoring handled by orchestrator)');
567    let pauseLogCounter = 0;
568  
569    while (running) {
570      try {
571        if (await isPaused()) {
572          pauseLogCounter++;
573          if (pauseLogCounter % 12 === 0) {
574            const control = await getPausedBy();
575            const pausedDuration = Math.round(
576              (Date.now() - new Date(control.paused_at).getTime()) / 1000
577            );
578            logger.info(`⏸️  API loop paused by ${control.paused_by} (${pausedDuration}s)`);
579          }
580          await sleep(PAUSE_CHECK_DELAY_MS);
581          continue;
582        }
583  
584        if (pauseLogCounter > 0) {
585          logger.success('▶️  API loop resumed');
586          pauseLogCounter = 0;
587        }
588  
589        let didWork = false;
590        for (const stage of API_STAGES) {
591          if (!running) break;
592          const metrics = await runStage(stage);
593          // Only succeeded counts as work — failures alone (breaker rejections) trigger idle sleep
594          if ((metrics.succeeded || 0) > 0) didWork = true;
595        }
596  
597        await recordLoopCycle('api_loop');
598  
599        if (!didWork) await sleep(CYCLE_DELAY_MS);
600      } catch (error) {
601        logger.error('❌ API loop error:', error);
602        await sleep(5000);
603      }
604    }
605  
606    logger.info('🌐 API loop stopped');
607  }
608  
609  async function main() {
610    logger.success('🚀 Pipeline Service starting (parallel mode)...');
611    logger.info(
612      `Batch max: ${BATCH_MAX} sites/stage (adaptive, min threshold: ${BATCH_MIN_THRESHOLD})`
613    );
614    logger.info(`Cycle delay: ${CYCLE_DELAY_MS}ms`);
615    logger.info(`Browser CPU gate: ${(BROWSER_CPU_GATE * 100).toFixed(0)}% instantaneous CPU`);
616    logger.info(`2Step integration: ${TWOSTEP_ENABLED ? 'enabled' : 'disabled (TWOSTEP_PIPELINE_ENABLED=false)'}`);
617  
618    const initialSkip = getStaticSkipStages();
619    if (initialSkip.size > 0) {
620      logger.info(`Skipping stages (static): ${Array.from(initialSkip).join(', ')}`);
621    }
622  
623    // Restore any rate limits that were active before a pipeline restart
624    const activeLimits = getRateLimitStatus();
625    for (const limit of activeLimits) {
626      logger.warn(
627        `⏸️  Rate limit restored from disk: [${limit.stages.join(', ')}] paused until ${limit.resetAt} (~${limit.waitMinutes} min) — ${limit.reason}`
628      );
629    }
630  
631    logger.info('');
632  
633    // Ensure the pipeline_control row exists (idempotent upsert)
634    await run(
635      `INSERT INTO ops.pipeline_control (id, paused) VALUES (1, false)
636       ON CONFLICT (id) DO NOTHING`
637    );
638  
639    // Run browser_loop, api_loop, and outreach_loop in parallel — they never block each other.
640    // Outreach is separated so it is not starved by long scoring/proposals batches.
641    await Promise.all([runBrowserLoop(), runApiLoop(), runOutreachLoop()]);
642  }
643  
644  async function gracefulShutdown(signal) {
645    logger.warn(`Received ${signal}, shutting down gracefully...`);
646    running = false;
647    stopCpuMonitor();
648    try {
649      await run(
650        `UPDATE ops.pipeline_control SET current_stage = NULL, updated_at = NOW() WHERE id = 1`
651      );
652      logger.success('✓ Cleaned up pipeline state');
653    } catch (error) {
654      logger.error('Failed to cleanup:', error.message);
655    }
656    await closePool();
657    process.exit(0);
658  }
659  
660  process.on('SIGTERM', () => gracefulShutdown('SIGTERM'));
661  process.on('SIGINT', () => gracefulShutdown('SIGINT'));
662  process.on('uncaughtException', error => {
663    logger.error('Uncaught exception:', error);
664    gracefulShutdown('uncaughtException');
665  });
666  process.on('unhandledRejection', (reason, _promise) => {
667    logger.error('Unhandled rejection', {
668      reason: reason instanceof Error ? reason.message : String(reason),
669      stack: reason instanceof Error ? reason.stack : undefined,
670      type: reason?.constructor?.name || typeof reason,
671    });
672    gracefulShutdown('unhandledRejection');
673  });
674  
675  main().catch(error => {
676    logger.error('Fatal error:', error);
677    process.exit(1);
678  });