/ src / utils / circuit-breaker.js
circuit-breaker.js
  1  /**
  2   * Circuit Breaker Module
  3   * Prevents cascading failures and excessive API costs during repeated failures.
  4   *
  5   * Circuit breaker states:
  6   * - CLOSED:    Normal operation, requests pass through
  7   * - OPEN:      Failures exceeded threshold, requests fail immediately
  8   * - HALF_OPEN: Testing if service recovered, limited requests allowed
  9   *
 10   * Error Classification:
 11   * - API errors (5xx, timeouts, network) → count towards circuit breaker
 12   * - Business logic errors (400, validation, missing data) → bypass circuit breaker
 13   *
 14   * Rate Limit Integration:
 15   * When a breaker opens due to a detected rate limit error, it calls
 16   * setRateLimit() from rate-limit-scheduler.js, which automatically adds the
 17   * affected pipeline stages to the dynamic SKIP_STAGES list until the limit resets.
 18   * When the breaker later closes (API recovered), clearRateLimit() is called.
 19   *
 20   * Pattern for new APIs: add a rateLimitConfig to the factory function below.
 21   * See CLAUDE.md "Rate Limit Pattern" for full documentation.
 22   */
 23  
 24  import CircuitBreaker from 'opossum';
 25  import Logger from './logger.js';
 26  import { setRateLimit, clearRateLimit } from './rate-limit-scheduler.js';
 27  
 28  const logger = new Logger('CircuitBreaker');
 29  
 30  // ─── Error classification helpers ────────────────────────────────────────────
 31  
 32  /**
 33   * Determine if an error should trigger the circuit breaker.
 34   * Business logic errors (4xx, validation) should NOT open the circuit.
 35   * @param {Error} error
 36   * @returns {boolean} true = count as failure, false = ignore
 37   */
 38  function shouldTriggerBreaker(error) {
 39    const message = error?.message || '';
 40  
 41    const businessLogicPatterns = [
 42      /\b400\b/, // Bad Request
 43      /\b401\b/, // Unauthorized
 44      /\b403\b/, // Forbidden
 45      /\b404\b/, // Not Found
 46      /\b422\b/, // Unprocessable Entity
 47      /validation/i,
 48      /invalid/i,
 49      /missing/i,
 50      /not found/i,
 51      /unauthorized/i,
 52    ];
 53  
 54    for (const pattern of businessLogicPatterns) {
 55      if (pattern.test(message)) return false;
 56    }
 57  
 58    const serviceErrorPatterns = [
 59      /5[0-9]{2}/, // 5xx errors
 60      /timeout/i,
 61      /ECONNREFUSED/,
 62      /ETIMEDOUT/,
 63      /ENOTFOUND/,
 64      /network/i,
 65      /rate.?limit/i,
 66      /too.?many.?request/i,
 67      /quota/i,
 68    ];
 69  
 70    for (const pattern of serviceErrorPatterns) {
 71      if (pattern.test(message)) return true;
 72    }
 73  
 74    // Default: trigger breaker for unknown errors (safer)
 75    return true;
 76  }
 77  
 78  /**
 79   * Check if an error is a rate limit or quota exhaustion response.
 80   * Used to decide whether to schedule a stage skip via rate-limit-scheduler.
 81   *
 82   * HTTP status codes treated as quota/rate-limit signals:
 83   *   429 — Too Many Requests (standard rate limit)
 84   *   402 — Payment Required (ZenRows monthly quota exhausted, ZeroBounce credits exhausted)
 85   */
 86  function isRateLimitError(error) {
 87    const msg = error?.message || '';
 88    const status = error?.response?.status ?? error?.status ?? 0;
 89    return (
 90      status === 429 ||
 91      status === 402 ||
 92      /rate.?limit|too.?many.?request|quota.?exceed|payment.?required|402|429/i.test(msg)
 93    );
 94  }
 95  
 96  /**
 97   * Check if an error is a timeout (axios timeout, ETIMEDOUT, ECONNABORTED, opossum timeout).
 98   */
 99  function isTimeoutError(error) {
100    const msg = (error?.message || '') + (error?.code || '');
101    return /timeout|ETIMEDOUT|ECONNABORTED/i.test(msg);
102  }
103  
104  /**
105   * Extract seconds from a Retry-After header (numeric seconds or RFC-date string).
106   * Returns 0 if not present or unparseable.
107   */
108  function extractRetryAfter(error) {
109    const header =
110      error?.response?.headers?.['retry-after'] ??
111      error?.response?.headers?.['x-ratelimit-reset-requests'] ??
112      null;
113  
114    if (!header) return 0;
115  
116    const secs = parseFloat(header);
117    if (!isNaN(secs)) return Math.max(0, secs);
118  
119    // RFC date string (e.g. "Sat, 01 Jan 2026 00:00:00 GMT")
120    const date = new Date(header);
121    if (!isNaN(date.getTime())) return Math.max(0, Math.ceil((date.getTime() - Date.now()) / 1000));
122  
123    return 0;
124  }
125  
126  // ─── Core factory ────────────────────────────────────────────────────────────
127  
128  /**
129   * Default circuit breaker options
130   */
131  const DEFAULT_OPTIONS = {
132    timeout: 30000, // 30s timeout
133    errorThresholdPercentage: 50, // Open circuit at 50% failures
134    resetTimeout: 60000, // Try again after 1 minute
135    rollingCountTimeout: 10000, // 10s rolling window for error calculation
136    rollingCountBuckets: 10,
137    volumeThreshold: 5, // Minimum requests before circuit can open
138  };
139  
140  /**
141   * Create a circuit breaker with logging, smart error classification,
142   * and optional rate limit scheduling.
143   *
144   * @param {string} name - Display name for logs
145   * @param {Function} asyncFunction - The async function to wrap
146   * @param {object} [options] - opossum options + rateLimitConfig
147   * @param {object} [options.rateLimitConfig] - Rate limit scheduling config (extracted before passing to opossum)
148   * @param {string} options.rateLimitConfig.api - API identifier (must exist in API_STAGE_MAP)
149   * @param {string[]} [options.rateLimitConfig.stages] - Stages to skip (default: API_STAGE_MAP[api])
150   * @param {'daily'|'hourly'|'minute'|'custom'} options.rateLimitConfig.limitType - Reset window
151   * @param {boolean} [options.rateLimitConfig.detectFromTimeouts] - If true, treat consecutive
152   *   timeouts (>=3) as a rate limit / quota exhaustion signal even without a 429 response.
153   *   Use this for APIs like ZenRows that return timeouts instead of 429 when quota is exhausted.
154   */
155  function createBreaker(name, asyncFunction, options = {}) {
156    // Extract our custom config before passing the rest to opossum
157    const { rateLimitConfig, ...breakerOptions } = options;
158  
159    // Per-breaker rate limit tracking (closure-scoped, not shared)
160    let consecutiveTimeouts = 0;
161    let lastRateLimitError = null;
162  
163    const breaker = new CircuitBreaker(asyncFunction, {
164      ...DEFAULT_OPTIONS,
165      ...breakerOptions,
166      name,
167      // opossum errorFilter: return true = ignore error (don't count as failure)
168      //                      return false = count as failure (may open circuit)
169      errorFilter: error => {
170        const shouldTrigger = shouldTriggerBreaker(error);
171        if (!shouldTrigger) {
172          logger.debug(`${name} ignoring business logic error: ${error.message}`);
173        }
174        return !shouldTrigger;
175      },
176    });
177  
178    // ── Event: Circuit opened (too many failures) ──
179    breaker.on('open', () => {
180      logger.error(`⚠️  Circuit breaker OPENED for ${name} - too many failures, blocking requests`);
181  
182      if (!rateLimitConfig) return;
183  
184      const timedOut = rateLimitConfig.detectFromTimeouts && consecutiveTimeouts >= 3;
185      const rateLimited = lastRateLimitError !== null;
186  
187      if (!timedOut && !rateLimited) return; // Genuine service failure, not a rate limit
188  
189      const limitType = rateLimitConfig.limitType ?? 'daily';
190      let retryAfterSeconds = 0;
191  
192      if (rateLimited) {
193        // Explicit 429 — extract Retry-After header if present
194        retryAfterSeconds = extractRetryAfter(lastRateLimitError);
195        const reason = lastRateLimitError.message || 'rate limit response';
196        logger.warn(
197          `🚦 ${name} rate limited (${reason}). Retry-After: ${retryAfterSeconds || 'none'}`
198        );
199      } else {
200        // Timeout-based detection (ZenRows quota exhaustion)
201        logger.warn(
202          `🚦 ${name} quota likely exhausted (${consecutiveTimeouts} consecutive timeouts)`
203        );
204      }
205  
206      const result = setRateLimit(rateLimitConfig.api, {
207        stages: rateLimitConfig.stages,
208        limitType,
209        retryAfterSeconds,
210        reason: `${name} ${rateLimited ? 'rate limited' : 'quota exhausted'} — circuit breaker opened`,
211      });
212  
213      const resetDate = new Date(result.resetAt);
214      const waitMin = Math.ceil((result.resetAt - Date.now()) / 60_000);
215      logger.warn(
216        `⏸️  Stages [${result.stages.join(', ')}] paused until ${resetDate.toISOString()} (~${waitMin} min)`
217      );
218    });
219  
220    // ── Event: Circuit half-open (testing recovery) ──
221    breaker.on('halfOpen', () => {
222      logger.info(`🔄 Circuit breaker HALF-OPEN for ${name} - testing if service recovered`);
223    });
224  
225    // ── Event: Circuit closed (recovery successful) ──
226    breaker.on('close', () => {
227      logger.success(`✅ Circuit breaker CLOSED for ${name} - service recovered`);
228      if (rateLimitConfig) {
229        clearRateLimit(rateLimitConfig.api);
230        consecutiveTimeouts = 0;
231        lastRateLimitError = null;
232      }
233    });
234  
235    // ── Event: Request rejected (circuit is open) ──
236    breaker.on('reject', () => {
237      logger.warn(`❌ Request rejected by circuit breaker for ${name} - circuit is OPEN`);
238    });
239  
240    // ── Event: opossum-level timeout (breaker's own timeout fired) ──
241    breaker.on('timeout', () => {
242      logger.warn(`⏱️  Request timeout for ${name}`);
243      if (rateLimitConfig) consecutiveTimeouts++;
244    });
245  
246    // ── Event: Request failed ──
247    breaker.on('failure', error => {
248      const triggersBreaker = shouldTriggerBreaker(error);
249  
250      if (triggersBreaker) {
251        logger.error(`Circuit breaker failure for ${name}: ${error.message}`);
252      } else {
253        logger.debug(`Business logic error for ${name} (not counted): ${error.message}`);
254      }
255  
256      if (rateLimitConfig && triggersBreaker) {
257        if (isRateLimitError(error)) {
258          lastRateLimitError = error;
259          consecutiveTimeouts = 0; // 429 resets the timeout counter
260        } else if (isTimeoutError(error)) {
261          consecutiveTimeouts++;
262        } else {
263          consecutiveTimeouts = 0; // Non-timeout, non-rate-limit service error
264        }
265      }
266    });
267  
268    // ── Event: Request succeeded ──
269    breaker.on('success', () => {
270      logger.debug(`Circuit breaker success for ${name}`);
271      if (rateLimitConfig) {
272        consecutiveTimeouts = 0;
273        lastRateLimitError = null;
274      }
275    });
276  
277    return breaker;
278  }
279  
280  // ─── Per-API factory functions ────────────────────────────────────────────────
281  
282  /**
283   * OpenRouter API Circuit Breaker
284   * Used for AI scoring, rescoring, and proposal generation.
285   *
286   * Rate limit detection:
287   *   - 429 with Retry-After → use Retry-After seconds
288   *   - 429 without Retry-After → wait until top of next hour (hourly quota)
289   *   - Affected stages: scoring, rescoring (proposals use claude -p / Claude Max, not OpenRouter)
290   */
291  export function createOpenRouterBreaker() {
292    return createBreaker('OpenRouter', payload => payload(), {
293      timeout: 120000, // Vision API with large screenshots can be slow (120s)
294      errorThresholdPercentage: 50,
295      resetTimeout: 120000, // Wait 2 minutes before half-open test
296      rateLimitConfig: {
297        api: 'openrouter',
298        limitType: 'hourly', // Default if no Retry-After header; many plans have per-hour limits
299        // detectFromTimeouts not set — OpenRouter returns proper 429, no need for timeout heuristic
300      },
301    });
302  }
303  
304  /**
305   * ZenRows API Circuit Breaker
306   * Used for SERP scraping.
307   *
308   * Rate limit detection:
309   *   - Monthly quota: ZenRows returns 402 Payment Required → skip serps until midnight UTC.
310   *     (Monthly quota resets on billing cycle; midnight UTC is a safe daily proxy until confirmed.)
311   *   - Concurrency exceeded: ZenRows returns 429 → hourly backoff via Retry-After.
312   *   - Timeout-based: 3+ consecutive timeouts treated as transient overload → daily backoff.
313   *   - Affected stages: serps
314   */
315  export function createZenRowsBreaker() {
316    // opossum timeout must be >= axios timeout so opossum doesn't fire first and block recovery.
317    const slowTimeout = parseInt(process.env.ZENROWS_SLOW_TIMEOUT || '300000', 10);
318    return createBreaker('ZenRows', payload => payload(), {
319      timeout: slowTimeout,
320      errorThresholdPercentage: 50,
321      resetTimeout: 120000, // 2 minutes before half-open test
322      rateLimitConfig: {
323        api: 'zenrows',
324        limitType: 'daily', // 402 quota exhausted → skip until midnight UTC; 429 uses Retry-After
325        detectFromTimeouts: true, // ZenRows can also signal overload via timeouts
326      },
327    });
328  }
329  
330  /**
331   * Twilio API Circuit Breaker
332   * Used for SMS sending.
333   *
334   * Rate limit detection:
335   *   - 429 with Retry-After → use Retry-After seconds
336   *   - 429 without Retry-After → wait until top of next hour
337   *   - Affected stages: outreach
338   */
339  export function createTwilioBreaker() {
340    return createBreaker('Twilio', payload => payload(), {
341      timeout: 30000,
342      errorThresholdPercentage: 50,
343      resetTimeout: 60000,
344      rateLimitConfig: {
345        api: 'twilio',
346        limitType: 'hourly',
347      },
348    });
349  }
350  
351  /**
352   * Resend API Circuit Breaker
353   * Used for email sending.
354   *
355   * Rate limit detection:
356   *   - 429 with Retry-After → use Retry-After seconds
357   *   - 429 without Retry-After → wait until midnight UTC (daily sending quota)
358   *   - Affected stages: outreach
359   *
360   * NOTE: Pro plan has 50,000 emails/month with NO daily cap (upgraded 2026-03-06).
361   * 429s should be transient rate-limit bursts with a Retry-After header, not
362   * daily quota exhaustion. limitType='hourly' gives a 1-hour fallback window
363   * if Retry-After is absent, which is far more conservative than midnight-UTC.
364   */
365  export function createResendBreaker() {
366    return createBreaker('Resend', payload => payload(), {
367      timeout: 30000,
368      errorThresholdPercentage: 50,
369      resetTimeout: 60000,
370      rateLimitConfig: {
371        api: 'resend',
372        limitType: 'hourly', // No daily cap on Pro — 429s are burst limits, reset within the hour
373      },
374    });
375  }
376  
377  /**
378   * ZeroBounce Email Validation API Circuit Breaker
379   * Used for pre-send email validation.
380   *
381   * Validation is fail-open: if the breaker opens (e.g. credits exhausted),
382   * sendEmail() proceeds without validation rather than blocking outreach.
383   * The breaker prevents hammering the API while it's down.
384   *
385   * Rate limit detection:
386   *   - Credits exhausted: ZeroBounce returns an error message (not 429)
387   *   - 429 with Retry-After → use Retry-After seconds
388   *   - Affected stages: none (validation-only, fail-open keeps outreach running)
389   */
390  export function createZeroBounceBreaker() {
391    return createBreaker('ZeroBounce', payload => payload(), {
392      timeout: 90000, // Batch calls (200 emails) can take 30-60s; must exceed AbortSignal timeout
393      errorThresholdPercentage: 50,
394      resetTimeout: 300000, // 5 minutes — give time for credits to be topped up
395      rateLimitConfig: {
396        api: 'zerobounce',
397        limitType: 'hourly',
398      },
399    });
400  }
401  
402  /**
403   * DataForSEO API Circuit Breaker
404   * Used for keyword validation/expansion (not a pipeline stage).
405   *
406   * Rate limit detection:
407   *   - 429 with Retry-After → use Retry-After seconds
408   *   - Affected stages: none (keyword validation is manual/offline)
409   */
410  export function createDataForSEOBreaker() {
411    return createBreaker('DataForSEO', payload => payload(), {
412      timeout: 60000, // Batch keyword lookups can be slow
413      errorThresholdPercentage: 50,
414      resetTimeout: 120000, // 2 minutes before half-open test
415      rateLimitConfig: {
416        api: 'dataforseo',
417        limitType: 'hourly',
418      },
419    });
420  }
421  
422  // ─── Stats ───────────────────────────────────────────────────────────────────
423  
424  /**
425   * Get circuit breaker stats
426   */
427  export function getBreakerStats(breaker) {
428    const { stats } = breaker;
429    return {
430      name: breaker.name,
431      state: breaker.opened ? 'OPEN' : breaker.halfOpen ? 'HALF_OPEN' : 'CLOSED',
432      fires: stats.fires,
433      successes: stats.successes,
434      failures: stats.failures,
435      rejects: stats.rejects,
436      timeouts: stats.timeouts,
437      failureRate: stats.fires > 0 ? `${((stats.failures / stats.fires) * 100).toFixed(2)}%` : '0%',
438    };
439  }
440  
441  // ─── Singleton exports ────────────────────────────────────────────────────────
442  
443  export const openRouterBreaker = createOpenRouterBreaker();
444  export const zenRowsBreaker = createZenRowsBreaker();
445  export const twilioBreaker = createTwilioBreaker();
446  export const resendBreaker = createResendBreaker();
447  export const zeroBounceBreaker = createZeroBounceBreaker();
448  export const dataForSEOBreaker = createDataForSEOBreaker();
449  
450  export default {
451    createOpenRouterBreaker,
452    createZenRowsBreaker,
453    createTwilioBreaker,
454    createResendBreaker,
455    createZeroBounceBreaker,
456    createDataForSEOBreaker,
457    getBreakerStats,
458    openRouterBreaker,
459    zenRowsBreaker,
460    twilioBreaker,
461    resendBreaker,
462    zeroBounceBreaker,
463    dataForSEOBreaker,
464  };