/ src / utils / rate-limit-scheduler.js
rate-limit-scheduler.js
  1  /**
  2   * Rate Limit Scheduler
  3   *
  4   * Tracks API rate limits and automatically manages dynamic SKIP_STAGES.
  5   * When a circuit breaker opens due to rate limiting, this module:
  6   *   1. Records which pipeline stages are affected and until when
  7   *   2. Makes getSkipStages() return those stages during the wait
  8   *   3. Automatically lifts the skip once the reset time is reached
  9   *   4. Persists state to logs/rate-limits.json (survives pipeline restarts)
 10   *
 11   * Pattern for future APIs (see also CLAUDE.md "Rate Limit Pattern"):
 12   *   1. Add the api → stages mapping to API_STAGE_MAP below
 13   *   2. Add a rateLimitConfig to the circuit breaker factory in circuit-breaker.js
 14   *   3. For 429 responses: the breaker on('open') handler calls setRateLimit automatically
 15   *   4. For timeout-based quota (e.g. ZenRows): set detectFromTimeouts: true in rateLimitConfig
 16   *   5. getSkipStages() in pipeline-service.js picks it up automatically each cycle
 17   */
 18  
 19  import { readFileSync, writeFileSync, appendFileSync, mkdirSync, existsSync } from 'node:fs';
 20  import { join } from 'node:path';
 21  
 22  const LOGS_DIR = process.env.LOGS_DIR || join(process.cwd(), 'logs');
 23  const STATE_FILE = join(LOGS_DIR, 'rate-limits.json');
 24  
 25  /**
 26   * Append-only event log for the Monitor agent to analyse.
 27   * Each line is a JSON object with a timestamp and event details.
 28   * Events: 'set' (rate limit activated), 'clear' (manually cleared),
 29   *         'expired' (rate limit reached its scheduled end, auto-removed).
 30   * Analysed by MonitorAgent.checkRateLimitPatterns() to detect:
 31   *   - wait_too_short:  Same API rate-limited again <2h after clearing
 32   *   - false_positive:  Cleared >50% of the scheduled wait remaining (API recovered early)
 33   *   - high_frequency:  Same API triggered >3 times in 24h
 34   */
 35  export const EVENTS_FILE = join(LOGS_DIR, 'rate-limit-events.jsonl');
 36  
 37  function appendEvent(event) {
 38    try {
 39      mkdirSync(LOGS_DIR, { recursive: true });
 40      appendFileSync(
 41        EVENTS_FILE,
 42        `${JSON.stringify({ timestamp: new Date().toISOString(), ...event })}\n`,
 43        'utf8'
 44      );
 45    } catch {
 46      /* non-critical — in-memory state still works without the event log */
 47    }
 48  }
 49  
 50  /**
 51   * Maps API identifiers → pipeline stage names they affect.
 52   * When an API is rate-limited, these stages are skipped until the limit resets.
 53   * Add a new entry here when integrating a new API.
 54   *
 55   * @type {Record<string, string[]>}
 56   */
 57  export const API_STAGE_MAP = {
 58    zenrows: ['serps'],
 59    openrouter: ['scoring', 'rescoring'],  // proposals use claude -p (Claude Max), not OpenRouter
 60    twilio: ['outreach'],
 61    resend: ['outreach'],
 62    anthropic: ['proposals'],
 63    zerobounce: [], // Validation-only; fail-open keeps outreach running when credits are exhausted
 64    dataforseo: [], // Keyword validation only; not a pipeline stage — breaker prevents wasting credits
 65    claude_cli: [], // Claude Max CLI — orchestrator-only; all batches blocked when credits exhausted
 66  };
 67  
 68  // ─── Persistence ────────────────────────────────────────────────────────────
 69  
 70  function loadState() {
 71    try {
 72      if (existsSync(STATE_FILE)) {
 73        const data = JSON.parse(readFileSync(STATE_FILE, 'utf8'));
 74        for (const info of Object.values(data)) {
 75          info.resetAt = Number(info.resetAt);
 76          info.setAt = Number(info.setAt);
 77        }
 78        return data;
 79      }
 80    } catch {
 81      /* corrupt/missing file — start fresh */
 82    }
 83    return {};
 84  }
 85  
 86  function saveState(state) {
 87    try {
 88      mkdirSync(LOGS_DIR, { recursive: true });
 89      writeFileSync(STATE_FILE, JSON.stringify(state, null, 2), 'utf8');
 90    } catch {
 91      /* non-critical — in-memory state still works */
 92    }
 93  }
 94  
 95  // In-memory state (module singleton, shared across all importers in the same process)
 96  const _state = loadState();
 97  
 98  // ─── Reset time calculation ──────────────────────────────────────────────────
 99  
100  /**
101   * Calculate when a rate limit resets.
102   *
103   * @param {'daily'|'hourly'|'minute'|'custom'} limitType
104   *   daily  → midnight UTC (ZenRows daily quota, OpenRouter daily credits)
105   *   hourly → top of next UTC hour (common per-hour API quotas)
106   *   minute → next 60-second boundary (per-minute rate limits)
107   *   custom → falls back to 5-minute wait (use retryAfterSeconds instead)
108   * @param {number} [retryAfterSeconds=0] - Seconds from Retry-After header (overrides limitType)
109   * @returns {number} Unix ms timestamp when the limit resets
110   */
111  export function calculateResetTime(limitType, retryAfterSeconds = 0) {
112    if (retryAfterSeconds > 0) {
113      return Date.now() + retryAfterSeconds * 1000;
114    }
115  
116    const now = new Date();
117  
118    switch (limitType) {
119      case 'daily': {
120        // Midnight UTC tonight
121        return Date.UTC(now.getUTCFullYear(), now.getUTCMonth(), now.getUTCDate() + 1, 0, 0, 0, 0);
122      }
123      case 'hourly': {
124        // Top of the next UTC hour
125        return Date.UTC(
126          now.getUTCFullYear(),
127          now.getUTCMonth(),
128          now.getUTCDate(),
129          now.getUTCHours() + 1,
130          0,
131          0,
132          0
133        );
134      }
135      case 'minute': {
136        // Top of the next minute
137        const next = new Date(now);
138        next.setUTCSeconds(0, 0);
139        next.setTime(next.getTime() + 60_000);
140        return next.getTime();
141      }
142      default:
143        // Unknown limit type — wait 5 minutes
144        return Date.now() + 5 * 60 * 1000;
145    }
146  }
147  
148  // ─── Public API ──────────────────────────────────────────────────────────────
149  
150  /**
151   * Register a rate limit for an API.
152   * Called automatically from circuit-breaker.js on('open') when a rate limit is detected.
153   *
154   * @param {string} api
155   *   One of: 'zenrows' | 'openrouter' | 'twilio' | 'resend' | 'anthropic'
156   *   Or any key from API_STAGE_MAP.
157   * @param {object} [options]
158   * @param {string[]} [options.stages]
159   *   Pipeline stage names to skip. Defaults to API_STAGE_MAP[api].
160   * @param {'daily'|'hourly'|'minute'|'custom'} [options.limitType='daily']
161   *   Window type used to calculate resetAt when no Retry-After is available.
162   * @param {number} [options.retryAfterSeconds=0]
163   *   Seconds from Retry-After header — takes priority over limitType.
164   * @param {number} [options.resetAt]
165   *   Explicit reset timestamp (ms). Overrides everything else.
166   * @param {string} [options.reason]
167   *   Human-readable description for logs and status output.
168   * @returns {{ stages: string[], resetAt: number, reason: string }}
169   */
170  export function setRateLimit(api, options = {}) {
171    const stages = options.stages ?? API_STAGE_MAP[api] ?? [];
172    const resetAt =
173      options.resetAt ??
174      calculateResetTime(options.limitType ?? 'daily', options.retryAfterSeconds ?? 0);
175    const reason = options.reason ?? `${api} rate limit (${options.limitType ?? 'daily'})`;
176  
177    const setAt = Date.now();
178    _state[api] = { stages, resetAt, reason, setAt };
179    saveState(_state);
180  
181    appendEvent({
182      type: 'set',
183      api,
184      stages,
185      reason,
186      resetAt: new Date(resetAt).toISOString(),
187      scheduledWaitMs: resetAt - setAt,
188    });
189  
190    return { stages, resetAt, reason };
191  }
192  
193  /**
194   * Manually clear a rate limit (e.g. after confirming API quota is restored,
195   * or to recover from a false-positive detection).
196   * Also called automatically from circuit-breaker.js on('close').
197   */
198  export function clearRateLimit(api) {
199    if (_state[api]) {
200      const { resetAt: scheduledResetAt, setAt, stages, reason } = _state[api];
201      const clearedAt = Date.now();
202      appendEvent({
203        type: 'clear',
204        api,
205        stages,
206        reason,
207        scheduledResetAt: new Date(scheduledResetAt).toISOString(),
208        earlyByMs: Math.max(0, scheduledResetAt - clearedAt),
209        actualWaitMs: clearedAt - setAt,
210      });
211      delete _state[api];
212      saveState(_state);
213    }
214  }
215  
216  /**
217   * Get the set of pipeline stage names that should be skipped right now.
218   * Automatically removes entries whose resetAt has passed.
219   * Called by pipeline-service.js before each stage execution.
220   *
221   * @returns {Set<string>} lowercase stage names (e.g. Set { 'serps' })
222   */
223  export function getSkipStages() {
224    const now = Date.now();
225    const skipped = new Set();
226    let changed = false;
227  
228    for (const [api, info] of Object.entries(_state)) {
229      if (info.resetAt <= now) {
230        appendEvent({
231          type: 'expired',
232          api,
233          stages: info.stages,
234          reason: info.reason,
235          scheduledResetAt: new Date(info.resetAt).toISOString(),
236          actualWaitMs: now - (info.setAt || now),
237        });
238        delete _state[api];
239        changed = true;
240      } else {
241        for (const stage of info.stages) {
242          skipped.add(stage.toLowerCase());
243        }
244      }
245    }
246  
247    if (changed) saveState(_state);
248    return skipped;
249  }
250  
251  /**
252   * Check whether a specific API is currently rate-limited.
253   */
254  export function isRateLimited(api) {
255    const info = _state[api];
256    return !!(info && info.resetAt > Date.now());
257  }
258  
259  /**
260   * Get human-readable status of all active rate limits.
261   * Used by `npm run rate-limits` and the pipeline log on resume.
262   *
263   * @returns {Array<{ api, stages, reason, resetAt, waitMinutes }>}
264   */
265  export function getRateLimitStatus() {
266    const now = Date.now();
267    return Object.entries(_state)
268      .filter(([, info]) => info.resetAt > now)
269      .map(([api, info]) => ({
270        api,
271        stages: info.stages,
272        reason: info.reason,
273        resetAt: new Date(info.resetAt).toISOString(),
274        waitMinutes: Math.ceil((info.resetAt - now) / 60_000),
275      }));
276  }