get-structured-status.js
1 #!/usr/bin/env node 2 /** 3 * get-structured-status.js — Machine-readable system state snapshot 4 * 5 * Outputs a single JSON object to stdout. Used by monitoring-checks.sh for: 6 * - Structured comparison against the previous snapshot (trend detection) 7 * - Programmatic anomaly detection (stall detection, error rate thresholds) 8 * - AFK monitoring progress summaries 9 * 10 * Fields emitted: 11 * ts ISO timestamp 12 * queue Queue depths per pipeline stage 13 * outreach Outreach counts (approved, sent, failed) 14 * throughput_1h Sites that advanced per stage in last 60 min 15 * errors_30m Error counts per stage from pipeline_logs in last 30 min 16 * circuit_breakers State of each circuit breaker (from rate-limits.json) 17 * cron_health Overdue cron jobs 18 * orchestrator Last batch run times and conservation mode state 19 * inbound_since_last New inbound messages since last snapshot (requires --since=ISO) 20 * 21 * Usage: 22 * node scripts/get-structured-status.js 23 * node scripts/get-structured-status.js --since=2026-03-13T07:00:00Z 24 * node scripts/get-structured-status.js --pretty (human-readable JSON) 25 */ 26 27 import { readFileSync, existsSync } from 'fs'; 28 import { join, dirname } from 'path'; 29 import { fileURLToPath } from 'url'; 30 import { getAll, getOne, closePool } from '../src/utils/db.js'; 31 import '../src/utils/load-env.js'; 32 33 const projectRoot = join(dirname(fileURLToPath(import.meta.url)), '..'); 34 35 const args = process.argv.slice(2); 36 const pretty = args.includes('--pretty'); 37 const sinceArg = args.find(a => a.startsWith('--since=')); 38 const since = sinceArg ? sinceArg.replace('--since=', '') : null; 39 40 const rateLimitsPath = join(projectRoot, 'logs/rate-limits.json'); 41 const gatesPath = join(projectRoot, 'logs/orchestrator-gates.json'); 42 const usageCachePath = join(process.env.HOME || '/root', '.claude/usage-cache.json'); 43 44 const now = new Date(); 45 const nowIso = now.toISOString(); 46 const ago30m = new Date(now - 30 * 60 * 1000).toISOString(); 47 const ago1h = new Date(now - 60 * 60 * 1000).toISOString(); 48 49 async function main() { 50 // ── Queue depths ────────────────────────────────────────────────────────────── 51 const queueRows = await getAll(` 52 SELECT status, COUNT(*) as n 53 FROM sites 54 WHERE status IN ( 55 'found','assets_captured','prog_scored','semantic_scored','vision_scored', 56 'enriched_regex','enriched_llm','enriched', 57 'proposals_drafted','outreach_partial','outreach_sent', 58 'ignored','failing','high_score' 59 ) 60 GROUP BY status 61 `); 62 const queue = {}; 63 for (const { status, n } of queueRows) queue[status] = n; 64 65 // ── Outreach counts ─────────────────────────────────────────────────────────── 66 const outreach = { 67 approved_unsent: 68 (await getOne( 69 `SELECT COUNT(*) as n FROM messages WHERE direction='outbound' AND approval_status='approved' AND delivery_status IS NULL AND sent_at IS NULL` 70 ))?.n ?? 0, 71 sent_total: 72 (await getOne( 73 `SELECT COUNT(*) as n FROM messages WHERE direction='outbound' AND delivery_status IN ('sent','delivered','bounced')` 74 ))?.n ?? 0, 75 sent_24h: 76 (await getOne( 77 `SELECT COUNT(*) as n FROM messages WHERE direction='outbound' AND delivery_status IN ('sent','delivered','bounced') AND updated_at > NOW() - INTERVAL '1 day'` 78 ))?.n ?? 0, 79 failed: 80 (await getOne( 81 `SELECT COUNT(*) as n FROM messages WHERE direction='outbound' AND delivery_status='failed'` 82 ))?.n ?? 0, 83 reword_remaining: 84 (await getOne( 85 `SELECT COUNT(*) as n FROM messages WHERE direction='outbound' AND message_type='outreach' AND approval_status='approved' AND sent_at IS NULL AND reworded_at IS NULL` 86 ))?.n ?? 0, 87 }; 88 89 // ── Throughput: sites that transitioned IN LAST 60 MIN ──────────────────────── 90 // Uses site_status log (transition events) rather than updated_at 91 const throughput_1h = {}; 92 const tpRows = await getAll( 93 `SELECT status, COUNT(*) as n 94 FROM site_status 95 WHERE created_at > $1 96 GROUP BY status`, 97 [ago1h] 98 ); 99 for (const { status, n } of tpRows) throughput_1h[status] = n; 100 101 // ── Error counts: last 30 min from pipeline_logs ────────────────────────────── 102 const errors_30m = {}; 103 const errRows = await getAll( 104 `SELECT stage, COUNT(*) as n 105 FROM pipeline_logs 106 WHERE created_at > $1 AND level IN ('error','warn') 107 GROUP BY stage`, 108 [ago30m] 109 ); 110 for (const { stage, n } of errRows) errors_30m[stage] = n; 111 112 // Also count error_message on sites updated recently 113 const siteErrRows = await getAll(` 114 SELECT 'sites_with_errors' as stage, COUNT(*) as n 115 FROM sites 116 WHERE error_message IS NOT NULL AND status='failing' 117 `); 118 for (const { stage, n } of siteErrRows) errors_30m[stage] = n; 119 120 // ── Cron health ─────────────────────────────────────────────────────────────── 121 const cronRows = await getAll(` 122 SELECT task_key, last_run_at, interval_value, interval_unit, 123 (last_run_at + (interval_value || ' ' || interval_unit)::interval) AS next_expected_at, 124 CASE WHEN NOW() > (last_run_at + (interval_value || ' ' || interval_unit)::interval + INTERVAL '60 seconds') 125 THEN 1 ELSE 0 END AS is_overdue 126 FROM ops.cron_jobs 127 WHERE enabled = 1 128 ORDER BY next_expected_at ASC 129 `); 130 const cron_health = { 131 total_enabled: cronRows.length, 132 overdue: cronRows 133 .filter(r => r.is_overdue) 134 .map(r => ({ 135 task_key: r.task_key, 136 last_run_at: r.last_run_at, 137 overdue_since: r.next_expected_at, 138 })), 139 healthy: cronRows.filter(r => !r.is_overdue).length, 140 }; 141 142 // ── Circuit breakers / rate limits ─────────────────────────────────────────── 143 let circuit_breakers = {}; 144 if (existsSync(rateLimitsPath)) { 145 try { 146 const raw = JSON.parse(readFileSync(rateLimitsPath, 'utf8')); 147 const nowMs = Date.now(); 148 for (const [api, data] of Object.entries(raw)) { 149 const resetAt = data?.resetAt ? Number(data.resetAt) : 0; 150 circuit_breakers[api] = { 151 limited: resetAt > nowMs, 152 reset_at: resetAt ? new Date(resetAt).toISOString() : null, 153 reason: data?.reason || null, 154 minutes_remaining: resetAt > nowMs ? Math.ceil((resetAt - nowMs) / 60000) : 0, 155 }; 156 } 157 } catch { 158 circuit_breakers = { error: 'parse_failed' }; 159 } 160 } 161 162 // ── Orchestrator gates (last batch run times) ───────────────────────────────── 163 const orchestrator = { 164 gates: {}, 165 conservation_mode: false, 166 claude_max_5h_pct: null, 167 claude_max_7d_pct: null, 168 }; 169 if (existsSync(gatesPath)) { 170 try { 171 orchestrator.gates = JSON.parse(readFileSync(gatesPath, 'utf8')); 172 const overseeTs = orchestrator.gates.oversee; 173 if (overseeTs) { 174 const overseeAge = Math.round((now - new Date(overseeTs)) / 60000); 175 orchestrator.oversee_age_min = overseeAge; 176 orchestrator.oversee_stale = overseeAge > 35; 177 } 178 } catch (_err) { 179 // ignore gate parse failure 180 } 181 } 182 if (existsSync(rateLimitsPath) && circuit_breakers.claude_cli?.limited) { 183 orchestrator.conservation_mode = true; 184 } 185 if (existsSync(usageCachePath)) { 186 try { 187 const usage = JSON.parse(readFileSync(usageCachePath, 'utf8')); 188 orchestrator.claude_max_5h_pct = Math.round(usage.five_hour || 0); 189 orchestrator.claude_max_7d_pct = Math.round(usage.seven_day || 0); 190 orchestrator.claude_max_stale = usage.stale || false; 191 orchestrator.claude_max_resets_at = usage.five_hour_resets_at || null; 192 } catch (_err) { 193 // ignore gate parse failure 194 } 195 } 196 197 // ── Inbound messages since last snapshot ───────────────────────────────────── 198 let inbound_since_last = null; 199 if (since) { 200 const inboundRows = await getAll( 201 `SELECT m.id, m.site_id, m.content, m.created_at, m.contact_method, 202 s.domain, 203 (SELECT content FROM messages WHERE site_id=m.site_id AND direction='outbound' AND message_type='reply' 204 AND created_at > m.created_at ORDER BY created_at ASC LIMIT 1) AS reply_sent 205 FROM messages m 206 JOIN sites s ON s.id = m.site_id 207 WHERE m.direction='inbound' AND m.created_at > $1 208 ORDER BY m.created_at ASC`, 209 [since] 210 ); 211 inbound_since_last = inboundRows.map(r => ({ 212 site_id: r.site_id, 213 domain: r.domain, 214 contact_method: r.contact_method, 215 received_at: r.created_at, 216 content_snippet: (r.content || '').substring(0, 120), 217 reply_sent: r.reply_sent ? r.reply_sent.substring(0, 120) : null, 218 })); 219 } 220 221 // ── Assemble output ─────────────────────────────────────────────────────────── 222 const output = { 223 ts: nowIso, 224 queue, 225 outreach, 226 throughput_1h, 227 errors_30m, 228 circuit_breakers, 229 cron_health, 230 orchestrator, 231 ...(inbound_since_last !== null ? { inbound_since_last } : {}), 232 }; 233 234 process.stdout.write(`${JSON.stringify(output, null, pretty ? 2 : 0)}\n`); 235 } 236 237 main() 238 .catch(err => { 239 process.stderr.write(`get-structured-status: failed: ${err.message}\n`); 240 process.exit(1); 241 }) 242 .finally(() => closePool());