adaptive-concurrency.js
1 /** 2 * Adaptive concurrency based on system load average. 3 * 4 * Scales concurrency between min and the env-var ceiling based on the 5 * 1-minute CPU load average. Re-reads the .env file on every call so 6 * changing BROWSER_CONCURRENCY / ENRICHMENT_CONCURRENCY while the stage 7 * is running takes effect on the very next batch — no restart needed. 8 * 9 * Thresholds (normalised load = os.loadavg()[0] / os.cpus().length): 10 * Screen OFF (AFK): ≤ 0.4 → full concurrency, ≥ 0.8 → minimum 11 * Screen ON (user present): ≤ 0.2 → full concurrency, ≥ 0.5 → minimum 12 * Override via SCREEN_ON_EASE_LOAD / SCREEN_ON_MAX_LOAD in .env 13 * in between → linear interpolation 14 * 15 * Memory floor (ADAPTIVE_FREE_MEM_FLOOR_MB, default 768): 16 * If free RAM drops below this, return min regardless of load. 17 * Pipeline stages are CPU-load aware but memory-blind by default; 18 * swap thrashing causes git hangs and zombie accumulation. 19 */ 20 21 import { execSync } from 'child_process'; 22 import { readFileSync, appendFileSync } from 'fs'; 23 import { join, dirname } from 'path'; 24 import { fileURLToPath } from 'url'; 25 import { loadavg, cpus, freemem } from 'os'; 26 import { getCurrentCpuUsage } from './cpu-monitor.js'; 27 28 const __dirname = dirname(fileURLToPath(import.meta.url)); 29 const envPath = join(__dirname, '../../.env'); 30 const logPath = join(__dirname, '../../logs/autoscaler.log'); 31 32 // Track high-water marks for concurrency per env key 33 const _highWaterMarks = {}; 34 35 /** Log autoscaler decision for observability */ 36 function logDecision( 37 envKey, 38 { load, loadType, ceiling, result, reason, freeMB, easeLoad, maxLoad } 39 ) { 40 const ts = new Date().toISOString(); 41 const key = envKey || 'unknown'; 42 // Update high-water mark 43 if (!_highWaterMarks[key] || result > _highWaterMarks[key]) _highWaterMarks[key] = result; 44 const hwm = _highWaterMarks[key]; 45 const line = `[${ts}] ${key}: load=${load.toFixed(3)} (${loadType}) ease=${easeLoad} max=${maxLoad} ceiling=${ceiling} → concurrency=${result} (${reason}) hwm=${hwm} freeMB=${Math.round(freeMB)}\n`; 46 try { 47 appendFileSync(logPath, line); 48 } catch { 49 /* logs dir may not exist */ 50 } 51 } 52 53 /** Get high-water marks for all tracked keys */ 54 export function getHighWaterMarks() { 55 return { ..._highWaterMarks }; 56 } 57 58 // Default thresholds when screen is off (AFK / full-throttle mode) 59 const EASE_LOAD = 0.4; // below this → full concurrency (was 0.5) 60 const MAX_LOAD = 0.95; // above this → minimum concurrency (was 0.8) 61 62 // Screen state cache — re-check xset every 10 s to avoid subprocess overhead 63 let _screenCache = { active: false, ts: 0 }; 64 const SCREEN_CACHE_TTL_MS = 10_000; 65 66 /** 67 * Returns true if the physical monitor is currently on (DPMS "Monitor is On"). 68 * Falls back to false (assume screen off / AFK) if xset is unavailable. 69 * Result is cached for 10 seconds. 70 */ 71 export function isScreenActive() { 72 const now = Date.now(); 73 if (now - _screenCache.ts < SCREEN_CACHE_TTL_MS) return _screenCache.active; 74 75 try { 76 const env = { ...process.env, DISPLAY: process.env.DISPLAY || ':0' }; 77 const out = execSync('xset q', { timeout: 1000, env }).toString(); 78 _screenCache = { active: out.includes('Monitor is On'), ts: now }; 79 } catch { 80 _screenCache = { active: false, ts: now }; 81 } 82 83 return _screenCache.active; 84 } 85 86 /** Return the load thresholds to use, tightened when the screen is on. */ 87 function getThresholds() { 88 if (!isScreenActive()) return { easeLoad: EASE_LOAD, maxLoad: MAX_LOAD }; 89 return { 90 easeLoad: parseFloat(readEnvVar('SCREEN_ON_EASE_LOAD', '0.2')), 91 maxLoad: parseFloat(readEnvVar('SCREEN_ON_MAX_LOAD', '0.5')), 92 }; 93 } 94 95 /** Read a single variable from the .env file (bypasses process.env cache). */ 96 function readEnvVar(key, defaultValue) { 97 try { 98 const content = readFileSync(envPath, 'utf8'); 99 const match = content.match(new RegExp(`^${key}\\s*=\\s*(.+)$`, 'm')); 100 return match ? match[1].trim() : defaultValue; 101 } catch { 102 return defaultValue; 103 } 104 } 105 106 /** Return min if free RAM is below the configured floor. */ 107 function memoryConstrained(min) { 108 const floorMB = parseInt(readEnvVar('ADAPTIVE_FREE_MEM_FLOOR_MB', '768'), 10); 109 const freeMB = freemem() / 1024 / 1024; 110 return freeMB < floorMB ? min : null; // null = not constrained 111 } 112 113 /** 114 * Return the concurrency to use for the current batch. 115 * 116 * @param {number} min - Hard floor (never returns below this) 117 * @param {number} defaultMax - Ceiling when envKey is absent from .env 118 * @param {string|null} envKey - .env variable name for the ceiling (e.g. 'BROWSER_CONCURRENCY') 119 * @returns {number} 120 */ 121 export function getAdaptiveConcurrency(min, defaultMax, envKey = null) { 122 const ceiling = envKey ? parseInt(readEnvVar(envKey, String(defaultMax)), 10) : defaultMax; 123 const freeMB = freemem() / 1024 / 1024; 124 125 // Memory guard first — swap thrashing is worse than CPU load 126 const memMin = memoryConstrained(min); 127 if (memMin !== null) { 128 logDecision(envKey, { 129 load: 0, 130 loadType: 'loadavg', 131 ceiling, 132 result: memMin, 133 reason: 'memory_floor', 134 freeMB, 135 easeLoad: 0, 136 maxLoad: 0, 137 }); 138 return memMin; 139 } 140 141 const { easeLoad, maxLoad } = getThresholds(); 142 const load = loadavg()[0] / cpus().length; // normalised: 1.0 = fully saturated 143 144 let result, reason; 145 if (load <= easeLoad) { 146 result = ceiling; 147 reason = 'full'; 148 } else if (load >= maxLoad) { 149 result = min; 150 reason = 'throttled'; 151 } else { 152 const t = (load - easeLoad) / (maxLoad - easeLoad); 153 result = Math.max(min, Math.round(ceiling - t * (ceiling - min))); 154 reason = `ramp_${Math.round(t * 100)}%`; 155 } 156 157 logDecision(envKey, { 158 load, 159 loadType: 'loadavg', 160 ceiling, 161 result, 162 reason, 163 freeMB, 164 easeLoad, 165 maxLoad, 166 }); 167 return result; 168 } 169 170 /** 171 * Like getAdaptiveConcurrency but uses instantaneous CPU usage (200ms sample) 172 * instead of the slow-moving load average (30-60s lag). 173 * Use this for browser stages where fast reaction to CPU spikes matters. 174 * 175 * @param {number} min - Hard floor 176 * @param {number} defaultMax - Ceiling when envKey is absent 177 * @param {string|null} envKey - .env variable name for the ceiling 178 * @returns {number} 179 */ 180 export function getAdaptiveConcurrencyFast(min, defaultMax, envKey = null) { 181 const ceiling = envKey ? parseInt(readEnvVar(envKey, String(defaultMax)), 10) : defaultMax; 182 const freeMB = freemem() / 1024 / 1024; 183 184 // Memory guard first 185 const memMin = memoryConstrained(min); 186 if (memMin !== null) { 187 logDecision(envKey, { 188 load: 0, 189 loadType: 'cpu_instant', 190 ceiling, 191 result: memMin, 192 reason: 'memory_floor', 193 freeMB, 194 easeLoad: 0, 195 maxLoad: 0, 196 }); 197 return memMin; 198 } 199 200 const { easeLoad, maxLoad } = getThresholds(); 201 const load = getCurrentCpuUsage(); // real-time, not lagging average 202 203 let result, reason; 204 if (load <= easeLoad) { 205 result = ceiling; 206 reason = 'full'; 207 } else if (load >= maxLoad) { 208 result = min; 209 reason = 'throttled'; 210 } else { 211 const t = (load - easeLoad) / (maxLoad - easeLoad); 212 result = Math.max(min, Math.round(ceiling - t * (ceiling - min))); 213 reason = `ramp_${Math.round(t * 100)}%`; 214 } 215 216 logDecision(envKey, { 217 load, 218 loadType: 'cpu_instant', 219 ceiling, 220 result, 221 reason, 222 freeMB, 223 easeLoad, 224 maxLoad, 225 }); 226 return result; 227 }