rate-limit-scheduler.js
1 /** 2 * Rate Limit Scheduler 3 * 4 * Tracks API rate limits and automatically manages dynamic SKIP_STAGES. 5 * When a circuit breaker opens due to rate limiting, this module: 6 * 1. Records which pipeline stages are affected and until when 7 * 2. Makes getSkipStages() return those stages during the wait 8 * 3. Automatically lifts the skip once the reset time is reached 9 * 4. Persists state to logs/rate-limits.json (survives pipeline restarts) 10 * 11 * Pattern for future APIs (see also CLAUDE.md "Rate Limit Pattern"): 12 * 1. Add the api → stages mapping to API_STAGE_MAP below 13 * 2. Add a rateLimitConfig to the circuit breaker factory in circuit-breaker.js 14 * 3. For 429 responses: the breaker on('open') handler calls setRateLimit automatically 15 * 4. For timeout-based quota (e.g. ZenRows): set detectFromTimeouts: true in rateLimitConfig 16 * 5. getSkipStages() in pipeline-service.js picks it up automatically each cycle 17 */ 18 19 import { readFileSync, writeFileSync, appendFileSync, mkdirSync, existsSync } from 'node:fs'; 20 import { join } from 'node:path'; 21 22 const LOGS_DIR = process.env.LOGS_DIR || join(process.cwd(), 'logs'); 23 const STATE_FILE = join(LOGS_DIR, 'rate-limits.json'); 24 25 /** 26 * Append-only event log for the Monitor agent to analyse. 27 * Each line is a JSON object with a timestamp and event details. 28 * Events: 'set' (rate limit activated), 'clear' (manually cleared), 29 * 'expired' (rate limit reached its scheduled end, auto-removed). 30 * Analysed by MonitorAgent.checkRateLimitPatterns() to detect: 31 * - wait_too_short: Same API rate-limited again <2h after clearing 32 * - false_positive: Cleared >50% of the scheduled wait remaining (API recovered early) 33 * - high_frequency: Same API triggered >3 times in 24h 34 */ 35 export const EVENTS_FILE = join(LOGS_DIR, 'rate-limit-events.jsonl'); 36 37 function appendEvent(event) { 38 try { 39 mkdirSync(LOGS_DIR, { recursive: true }); 40 appendFileSync( 41 EVENTS_FILE, 42 `${JSON.stringify({ timestamp: new Date().toISOString(), ...event })}\n`, 43 'utf8' 44 ); 45 } catch { 46 /* non-critical — in-memory state still works without the event log */ 47 } 48 } 49 50 /** 51 * Maps API identifiers → pipeline stage names they affect. 52 * When an API is rate-limited, these stages are skipped until the limit resets. 53 * Add a new entry here when integrating a new API. 54 * 55 * @type {Record<string, string[]>} 56 */ 57 export const API_STAGE_MAP = { 58 zenrows: ['serps'], 59 openrouter: ['scoring', 'rescoring'], // proposals use claude -p (Claude Max), not OpenRouter 60 twilio: ['outreach'], 61 resend: ['outreach'], 62 anthropic: ['proposals'], 63 zerobounce: [], // Validation-only; fail-open keeps outreach running when credits are exhausted 64 dataforseo: [], // Keyword validation only; not a pipeline stage — breaker prevents wasting credits 65 claude_cli: [], // Claude Max CLI — orchestrator-only; all batches blocked when credits exhausted 66 }; 67 68 // ─── Persistence ──────────────────────────────────────────────────────────── 69 70 function loadState() { 71 try { 72 if (existsSync(STATE_FILE)) { 73 const data = JSON.parse(readFileSync(STATE_FILE, 'utf8')); 74 for (const info of Object.values(data)) { 75 info.resetAt = Number(info.resetAt); 76 info.setAt = Number(info.setAt); 77 } 78 return data; 79 } 80 } catch { 81 /* corrupt/missing file — start fresh */ 82 } 83 return {}; 84 } 85 86 function saveState(state) { 87 try { 88 mkdirSync(LOGS_DIR, { recursive: true }); 89 writeFileSync(STATE_FILE, JSON.stringify(state, null, 2), 'utf8'); 90 } catch { 91 /* non-critical — in-memory state still works */ 92 } 93 } 94 95 // In-memory state (module singleton, shared across all importers in the same process) 96 const _state = loadState(); 97 98 // ─── Reset time calculation ────────────────────────────────────────────────── 99 100 /** 101 * Calculate when a rate limit resets. 102 * 103 * @param {'daily'|'hourly'|'minute'|'custom'} limitType 104 * daily → midnight UTC (ZenRows daily quota, OpenRouter daily credits) 105 * hourly → top of next UTC hour (common per-hour API quotas) 106 * minute → next 60-second boundary (per-minute rate limits) 107 * custom → falls back to 5-minute wait (use retryAfterSeconds instead) 108 * @param {number} [retryAfterSeconds=0] - Seconds from Retry-After header (overrides limitType) 109 * @returns {number} Unix ms timestamp when the limit resets 110 */ 111 export function calculateResetTime(limitType, retryAfterSeconds = 0) { 112 if (retryAfterSeconds > 0) { 113 return Date.now() + retryAfterSeconds * 1000; 114 } 115 116 const now = new Date(); 117 118 switch (limitType) { 119 case 'daily': { 120 // Midnight UTC tonight 121 return Date.UTC(now.getUTCFullYear(), now.getUTCMonth(), now.getUTCDate() + 1, 0, 0, 0, 0); 122 } 123 case 'hourly': { 124 // Top of the next UTC hour 125 return Date.UTC( 126 now.getUTCFullYear(), 127 now.getUTCMonth(), 128 now.getUTCDate(), 129 now.getUTCHours() + 1, 130 0, 131 0, 132 0 133 ); 134 } 135 case 'minute': { 136 // Top of the next minute 137 const next = new Date(now); 138 next.setUTCSeconds(0, 0); 139 next.setTime(next.getTime() + 60_000); 140 return next.getTime(); 141 } 142 default: 143 // Unknown limit type — wait 5 minutes 144 return Date.now() + 5 * 60 * 1000; 145 } 146 } 147 148 // ─── Public API ────────────────────────────────────────────────────────────── 149 150 /** 151 * Register a rate limit for an API. 152 * Called automatically from circuit-breaker.js on('open') when a rate limit is detected. 153 * 154 * @param {string} api 155 * One of: 'zenrows' | 'openrouter' | 'twilio' | 'resend' | 'anthropic' 156 * Or any key from API_STAGE_MAP. 157 * @param {object} [options] 158 * @param {string[]} [options.stages] 159 * Pipeline stage names to skip. Defaults to API_STAGE_MAP[api]. 160 * @param {'daily'|'hourly'|'minute'|'custom'} [options.limitType='daily'] 161 * Window type used to calculate resetAt when no Retry-After is available. 162 * @param {number} [options.retryAfterSeconds=0] 163 * Seconds from Retry-After header — takes priority over limitType. 164 * @param {number} [options.resetAt] 165 * Explicit reset timestamp (ms). Overrides everything else. 166 * @param {string} [options.reason] 167 * Human-readable description for logs and status output. 168 * @returns {{ stages: string[], resetAt: number, reason: string }} 169 */ 170 export function setRateLimit(api, options = {}) { 171 const stages = options.stages ?? API_STAGE_MAP[api] ?? []; 172 const resetAt = 173 options.resetAt ?? 174 calculateResetTime(options.limitType ?? 'daily', options.retryAfterSeconds ?? 0); 175 const reason = options.reason ?? `${api} rate limit (${options.limitType ?? 'daily'})`; 176 177 const setAt = Date.now(); 178 _state[api] = { stages, resetAt, reason, setAt }; 179 saveState(_state); 180 181 appendEvent({ 182 type: 'set', 183 api, 184 stages, 185 reason, 186 resetAt: new Date(resetAt).toISOString(), 187 scheduledWaitMs: resetAt - setAt, 188 }); 189 190 return { stages, resetAt, reason }; 191 } 192 193 /** 194 * Manually clear a rate limit (e.g. after confirming API quota is restored, 195 * or to recover from a false-positive detection). 196 * Also called automatically from circuit-breaker.js on('close'). 197 */ 198 export function clearRateLimit(api) { 199 if (_state[api]) { 200 const { resetAt: scheduledResetAt, setAt, stages, reason } = _state[api]; 201 const clearedAt = Date.now(); 202 appendEvent({ 203 type: 'clear', 204 api, 205 stages, 206 reason, 207 scheduledResetAt: new Date(scheduledResetAt).toISOString(), 208 earlyByMs: Math.max(0, scheduledResetAt - clearedAt), 209 actualWaitMs: clearedAt - setAt, 210 }); 211 delete _state[api]; 212 saveState(_state); 213 } 214 } 215 216 /** 217 * Get the set of pipeline stage names that should be skipped right now. 218 * Automatically removes entries whose resetAt has passed. 219 * Called by pipeline-service.js before each stage execution. 220 * 221 * @returns {Set<string>} lowercase stage names (e.g. Set { 'serps' }) 222 */ 223 export function getSkipStages() { 224 const now = Date.now(); 225 const skipped = new Set(); 226 let changed = false; 227 228 for (const [api, info] of Object.entries(_state)) { 229 if (info.resetAt <= now) { 230 appendEvent({ 231 type: 'expired', 232 api, 233 stages: info.stages, 234 reason: info.reason, 235 scheduledResetAt: new Date(info.resetAt).toISOString(), 236 actualWaitMs: now - (info.setAt || now), 237 }); 238 delete _state[api]; 239 changed = true; 240 } else { 241 for (const stage of info.stages) { 242 skipped.add(stage.toLowerCase()); 243 } 244 } 245 } 246 247 if (changed) saveState(_state); 248 return skipped; 249 } 250 251 /** 252 * Check whether a specific API is currently rate-limited. 253 */ 254 export function isRateLimited(api) { 255 const info = _state[api]; 256 return !!(info && info.resetAt > Date.now()); 257 } 258 259 /** 260 * Get human-readable status of all active rate limits. 261 * Used by `npm run rate-limits` and the pipeline log on resume. 262 * 263 * @returns {Array<{ api, stages, reason, resetAt, waitMinutes }>} 264 */ 265 export function getRateLimitStatus() { 266 const now = Date.now(); 267 return Object.entries(_state) 268 .filter(([, info]) => info.resetAt > now) 269 .map(([api, info]) => ({ 270 api, 271 stages: info.stages, 272 reason: info.reason, 273 resetAt: new Date(info.resetAt).toISOString(), 274 waitMinutes: Math.ceil((info.resetAt - now) / 60_000), 275 })); 276 }