/ src / utils / adaptive-concurrency.js
adaptive-concurrency.js
  1  /**
  2   * Adaptive concurrency based on system load average.
  3   *
  4   * Scales concurrency between min and the env-var ceiling based on the
  5   * 1-minute CPU load average.  Re-reads the .env file on every call so
  6   * changing BROWSER_CONCURRENCY / ENRICHMENT_CONCURRENCY while the stage
  7   * is running takes effect on the very next batch — no restart needed.
  8   *
  9   * Thresholds (normalised load = os.loadavg()[0] / os.cpus().length):
 10   *   Screen OFF (AFK): ≤ 0.4 → full concurrency, ≥ 0.8 → minimum
 11   *   Screen ON  (user present): ≤ 0.2 → full concurrency, ≥ 0.5 → minimum
 12   *   Override via SCREEN_ON_EASE_LOAD / SCREEN_ON_MAX_LOAD in .env
 13   *   in between → linear interpolation
 14   *
 15   * Memory floor (ADAPTIVE_FREE_MEM_FLOOR_MB, default 768):
 16   *   If free RAM drops below this, return min regardless of load.
 17   *   Pipeline stages are CPU-load aware but memory-blind by default;
 18   *   swap thrashing causes git hangs and zombie accumulation.
 19   */
 20  
 21  import { execSync } from 'child_process';
 22  import { readFileSync, appendFileSync } from 'fs';
 23  import { join, dirname } from 'path';
 24  import { fileURLToPath } from 'url';
 25  import { loadavg, cpus, freemem } from 'os';
 26  import { getCurrentCpuUsage } from './cpu-monitor.js';
 27  
 28  const __dirname = dirname(fileURLToPath(import.meta.url));
 29  const envPath = join(__dirname, '../../.env');
 30  const logPath = join(__dirname, '../../logs/autoscaler.log');
 31  
 32  // Track high-water marks for concurrency per env key
 33  const _highWaterMarks = {};
 34  
 35  /** Log autoscaler decision for observability */
 36  function logDecision(
 37    envKey,
 38    { load, loadType, ceiling, result, reason, freeMB, easeLoad, maxLoad }
 39  ) {
 40    const ts = new Date().toISOString();
 41    const key = envKey || 'unknown';
 42    // Update high-water mark
 43    if (!_highWaterMarks[key] || result > _highWaterMarks[key]) _highWaterMarks[key] = result;
 44    const hwm = _highWaterMarks[key];
 45    const line = `[${ts}] ${key}: load=${load.toFixed(3)} (${loadType}) ease=${easeLoad} max=${maxLoad} ceiling=${ceiling} → concurrency=${result} (${reason}) hwm=${hwm} freeMB=${Math.round(freeMB)}\n`;
 46    try {
 47      appendFileSync(logPath, line);
 48    } catch {
 49      /* logs dir may not exist */
 50    }
 51  }
 52  
 53  /** Get high-water marks for all tracked keys */
 54  export function getHighWaterMarks() {
 55    return { ..._highWaterMarks };
 56  }
 57  
 58  // Default thresholds when screen is off (AFK / full-throttle mode)
 59  const EASE_LOAD = 0.4; // below this → full concurrency (was 0.5)
 60  const MAX_LOAD = 0.95; // above this → minimum concurrency (was 0.8)
 61  
 62  // Screen state cache — re-check xset every 10 s to avoid subprocess overhead
 63  let _screenCache = { active: false, ts: 0 };
 64  const SCREEN_CACHE_TTL_MS = 10_000;
 65  
 66  /**
 67   * Returns true if the physical monitor is currently on (DPMS "Monitor is On").
 68   * Falls back to false (assume screen off / AFK) if xset is unavailable.
 69   * Result is cached for 10 seconds.
 70   */
 71  export function isScreenActive() {
 72    const now = Date.now();
 73    if (now - _screenCache.ts < SCREEN_CACHE_TTL_MS) return _screenCache.active;
 74  
 75    try {
 76      const env = { ...process.env, DISPLAY: process.env.DISPLAY || ':0' };
 77      const out = execSync('xset q', { timeout: 1000, env }).toString();
 78      _screenCache = { active: out.includes('Monitor is On'), ts: now };
 79    } catch {
 80      _screenCache = { active: false, ts: now };
 81    }
 82  
 83    return _screenCache.active;
 84  }
 85  
 86  /** Return the load thresholds to use, tightened when the screen is on. */
 87  function getThresholds() {
 88    if (!isScreenActive()) return { easeLoad: EASE_LOAD, maxLoad: MAX_LOAD };
 89    return {
 90      easeLoad: parseFloat(readEnvVar('SCREEN_ON_EASE_LOAD', '0.2')),
 91      maxLoad: parseFloat(readEnvVar('SCREEN_ON_MAX_LOAD', '0.5')),
 92    };
 93  }
 94  
 95  /** Read a single variable from the .env file (bypasses process.env cache). */
 96  function readEnvVar(key, defaultValue) {
 97    try {
 98      const content = readFileSync(envPath, 'utf8');
 99      const match = content.match(new RegExp(`^${key}\\s*=\\s*(.+)$`, 'm'));
100      return match ? match[1].trim() : defaultValue;
101    } catch {
102      return defaultValue;
103    }
104  }
105  
106  /** Return min if free RAM is below the configured floor. */
107  function memoryConstrained(min) {
108    const floorMB = parseInt(readEnvVar('ADAPTIVE_FREE_MEM_FLOOR_MB', '768'), 10);
109    const freeMB = freemem() / 1024 / 1024;
110    return freeMB < floorMB ? min : null; // null = not constrained
111  }
112  
113  /**
114   * Return the concurrency to use for the current batch.
115   *
116   * @param {number} min        - Hard floor (never returns below this)
117   * @param {number} defaultMax - Ceiling when envKey is absent from .env
118   * @param {string|null} envKey - .env variable name for the ceiling (e.g. 'BROWSER_CONCURRENCY')
119   * @returns {number}
120   */
121  export function getAdaptiveConcurrency(min, defaultMax, envKey = null) {
122    const ceiling = envKey ? parseInt(readEnvVar(envKey, String(defaultMax)), 10) : defaultMax;
123    const freeMB = freemem() / 1024 / 1024;
124  
125    // Memory guard first — swap thrashing is worse than CPU load
126    const memMin = memoryConstrained(min);
127    if (memMin !== null) {
128      logDecision(envKey, {
129        load: 0,
130        loadType: 'loadavg',
131        ceiling,
132        result: memMin,
133        reason: 'memory_floor',
134        freeMB,
135        easeLoad: 0,
136        maxLoad: 0,
137      });
138      return memMin;
139    }
140  
141    const { easeLoad, maxLoad } = getThresholds();
142    const load = loadavg()[0] / cpus().length; // normalised: 1.0 = fully saturated
143  
144    let result, reason;
145    if (load <= easeLoad) {
146      result = ceiling;
147      reason = 'full';
148    } else if (load >= maxLoad) {
149      result = min;
150      reason = 'throttled';
151    } else {
152      const t = (load - easeLoad) / (maxLoad - easeLoad);
153      result = Math.max(min, Math.round(ceiling - t * (ceiling - min)));
154      reason = `ramp_${Math.round(t * 100)}%`;
155    }
156  
157    logDecision(envKey, {
158      load,
159      loadType: 'loadavg',
160      ceiling,
161      result,
162      reason,
163      freeMB,
164      easeLoad,
165      maxLoad,
166    });
167    return result;
168  }
169  
170  /**
171   * Like getAdaptiveConcurrency but uses instantaneous CPU usage (200ms sample)
172   * instead of the slow-moving load average (30-60s lag).
173   * Use this for browser stages where fast reaction to CPU spikes matters.
174   *
175   * @param {number} min        - Hard floor
176   * @param {number} defaultMax - Ceiling when envKey is absent
177   * @param {string|null} envKey - .env variable name for the ceiling
178   * @returns {number}
179   */
180  export function getAdaptiveConcurrencyFast(min, defaultMax, envKey = null) {
181    const ceiling = envKey ? parseInt(readEnvVar(envKey, String(defaultMax)), 10) : defaultMax;
182    const freeMB = freemem() / 1024 / 1024;
183  
184    // Memory guard first
185    const memMin = memoryConstrained(min);
186    if (memMin !== null) {
187      logDecision(envKey, {
188        load: 0,
189        loadType: 'cpu_instant',
190        ceiling,
191        result: memMin,
192        reason: 'memory_floor',
193        freeMB,
194        easeLoad: 0,
195        maxLoad: 0,
196      });
197      return memMin;
198    }
199  
200    const { easeLoad, maxLoad } = getThresholds();
201    const load = getCurrentCpuUsage(); // real-time, not lagging average
202  
203    let result, reason;
204    if (load <= easeLoad) {
205      result = ceiling;
206      reason = 'full';
207    } else if (load >= maxLoad) {
208      result = min;
209      reason = 'throttled';
210    } else {
211      const t = (load - easeLoad) / (maxLoad - easeLoad);
212      result = Math.max(min, Math.round(ceiling - t * (ceiling - min)));
213      reason = `ramp_${Math.round(t * 100)}%`;
214    }
215  
216    logDecision(envKey, {
217      load,
218      loadType: 'cpu_instant',
219      ceiling,
220      result,
221      reason,
222      freeMB,
223      easeLoad,
224      maxLoad,
225    });
226    return result;
227  }