claude-batch.js
1 #!/usr/bin/env node 2 3 /** 4 * Claude Batch — Pull work from DB and output JSON for Claude processing. 5 * 6 * Usage: 7 * node scripts/claude-batch.js <batch_type> [limit] 8 * 9 * Batch types: 10 * proposals_email — Sites needing email proposals 11 * proposals_sms — Sites needing SMS proposals 12 * classify_replies — Inbound messages needing intent classification 13 * extract_names — Contacts missing names 14 * reply_responses — Inbound messages needing a response 15 * oversee — System health snapshot for overseer analysis 16 * classify_errors — Unknown pipeline errors needing pattern proposals 17 * score_sites — Sites needing HTML-only LLM scoring (ENABLE_VISION=false) 18 * enrich_sites — Sites needing LLM contact enrichment from HTML 19 * 20 * Outputs JSON to stdout. Load appropriate prompt/context docs separately. 21 */ 22 23 import { execSync } from 'child_process'; 24 import { existsSync, readFileSync, writeFileSync } from 'fs'; 25 import { join } from 'path'; 26 import { getAll, getOne, run, closePool } from '../src/utils/db.js'; 27 import { COUNTRIES } from '../src/config/countries.js'; 28 import { computeGrade } from '../src/score.js'; 29 import { categorizeError } from '../src/utils/error-categories.js'; 30 import { extractScoringText } from '../src/utils/programmatic-scorer.js'; 31 import { stripInjectionMarkers, wrapUntrusted } from '../src/utils/llm-sanitizer.js'; 32 import { safeJsonParse } from '../src/utils/error-handler.js'; 33 import { readHtmlDom, readKeyPagesHtml } from '../src/utils/html-storage.js'; 34 import { getScoreJsonWithFallback, getScoreDataWithFallback } from '../src/utils/score-storage.js'; 35 import { getContactsDataWithFallback } from '../src/utils/contacts-storage.js'; 36 import '../src/utils/load-env.js'; 37 38 const [, , batchType, limitArg] = process.argv; 39 const limit = parseInt(limitArg || '10', 10); 40 41 if (!batchType) { 42 console.error( 43 'Usage: node scripts/claude-batch.js <batch_type> [limit]\n' + 44 'Types: proposals_email, proposals_sms, classify_replies, extract_names, reply_responses, score_sites, score_semantic, enrich_sites, proofread, followup_check, followup_generate' 45 ); 46 process.exit(1); 47 } 48 49 function getSameLanguageCodes(countryCode) { 50 const lang = COUNTRIES[countryCode]?.language; 51 if (!lang) return [countryCode]; 52 return Object.entries(COUNTRIES) 53 .filter(([, c]) => c.language === lang) 54 .map(([code]) => code); 55 } 56 57 async function getCompetitorBenchmark(siteData) { 58 const scoreJson = getScoreDataWithFallback(siteData.id, siteData); 59 const industry = scoreJson?.industry_classification; 60 if (!industry || !siteData.score) return null; 61 62 const codes = getSameLanguageCodes(siteData.country_code); 63 const placeholders = codes.map((_, i) => `$${i + 3}`).join(','); 64 65 const competitor = await getOne( 66 `SELECT domain, score, grade 67 FROM sites 68 WHERE industry_classification = $1 69 AND domain != $2 70 AND country_code IN (${placeholders}) 71 AND status IN ('prog_scored', 'semantic_scored', 'vision_scored', 'enriched', 'enriched_regex', 'enriched_llm', 'proposals_drafted', 'outreach_partial', 'outreach_sent') 72 AND score IS NOT NULL 73 AND score > $${codes.length + 3} 74 ORDER BY score DESC 75 LIMIT 1`, 76 [industry, siteData.domain, ...codes, siteData.score] 77 ); 78 79 if (!competitor) return null; 80 81 return { 82 domain: competitor.domain, 83 score: competitor.score, 84 grade: competitor.grade || computeGrade(competitor.score), 85 industry, 86 language_match: true, 87 }; 88 } 89 90 async function getIndustryBenchmark(siteData) { 91 const scoreJson = getScoreDataWithFallback(siteData.id, siteData); 92 const industry = scoreJson?.industry_classification; 93 if (!industry || !siteData.score) return null; 94 95 const result = await getOne( 96 `SELECT AVG(score) as avg_score, COUNT(*) as sample_size 97 FROM sites 98 WHERE industry_classification = $1 99 AND country_code = $2 100 AND score IS NOT NULL 101 AND domain != $3`, 102 [industry, siteData.country_code, siteData.domain] 103 ); 104 105 if (!result || !result.sample_size || result.sample_size < 10) return null; 106 107 return { 108 industry, 109 country_code: siteData.country_code, 110 avg_score: Math.round(result.avg_score * 10) / 10, 111 sample_size: result.sample_size, 112 site_score: siteData.score, 113 difference: Math.round((siteData.score - result.avg_score) * 10) / 10, 114 }; 115 } 116 117 async function getAggregateStats() { 118 const stats = await getOne( 119 `SELECT COUNT(*) as total, AVG(score) as avg_score, MAX(score) as best_score 120 FROM sites WHERE score IS NOT NULL` 121 ); 122 123 return { 124 total_sites_scored: stats.total, 125 avg_score: Math.round(stats.avg_score * 10) / 10, 126 best_score: stats.best_score, 127 }; 128 } 129 130 // Industry benchmarks for ROI estimation — avg job value ($), typical site conversion rate, 131 // estimated monthly visitors for a local business website. 132 const INDUSTRY_ROI_BENCHMARKS = { 133 plumber: { avg_job: 350, conversion_rate: 0.03, monthly_visitors: 800 }, 134 electrician: { avg_job: 400, conversion_rate: 0.03, monthly_visitors: 700 }, 135 roofer: { avg_job: 500, conversion_rate: 0.025, monthly_visitors: 600 }, 136 hvac: { avg_job: 450, conversion_rate: 0.03, monthly_visitors: 700 }, 137 painter: { avg_job: 300, conversion_rate: 0.03, monthly_visitors: 500 }, 138 landscaper: { avg_job: 250, conversion_rate: 0.035, monthly_visitors: 600 }, 139 pest_control: { avg_job: 200, conversion_rate: 0.04, monthly_visitors: 700 }, 140 cleaner: { avg_job: 180, conversion_rate: 0.04, monthly_visitors: 800 }, 141 locksmith: { avg_job: 200, conversion_rate: 0.035, monthly_visitors: 500 }, 142 mechanic: { avg_job: 350, conversion_rate: 0.03, monthly_visitors: 600 }, 143 dentist: { avg_job: 250, conversion_rate: 0.04, monthly_visitors: 900 }, 144 physio: { avg_job: 100, conversion_rate: 0.04, monthly_visitors: 700 }, 145 chiropractor: { avg_job: 100, conversion_rate: 0.04, monthly_visitors: 600 }, 146 lawyer: { avg_job: 1500, conversion_rate: 0.02, monthly_visitors: 500 }, 147 accountant: { avg_job: 800, conversion_rate: 0.025, monthly_visitors: 400 }, 148 real_estate: { avg_job: 5000, conversion_rate: 0.015, monthly_visitors: 800 }, 149 restaurant: { avg_job: 40, conversion_rate: 0.05, monthly_visitors: 1500 }, 150 cafe: { avg_job: 20, conversion_rate: 0.05, monthly_visitors: 1200 }, 151 gym: { avg_job: 60, conversion_rate: 0.04, monthly_visitors: 1000 }, 152 salon: { avg_job: 80, conversion_rate: 0.04, monthly_visitors: 800 }, 153 veterinarian: { avg_job: 200, conversion_rate: 0.035, monthly_visitors: 600 }, 154 builder: { avg_job: 2000, conversion_rate: 0.02, monthly_visitors: 500 }, 155 }; 156 // Fallback for industries not in the map 157 const DEFAULT_ROI_BENCHMARK = { avg_job: 300, conversion_rate: 0.03, monthly_visitors: 600 }; 158 159 async function getROIEstimate(siteData) { 160 const scoreJson = getScoreDataWithFallback(siteData.id, siteData) || {}; 161 const industry = scoreJson.industry_classification; 162 if (!industry || !siteData.score) return null; 163 164 // Match industry — try exact, then partial match on first word 165 const industryLower = industry.toLowerCase().replace(/[^a-z_]/g, ''); 166 const benchmark = INDUSTRY_ROI_BENCHMARKS[industryLower] 167 || Object.entries(INDUSTRY_ROI_BENCHMARKS).find( 168 ([k]) => industryLower.includes(k) || k.includes(industryLower) 169 )?.[1] 170 || DEFAULT_ROI_BENCHMARK; 171 172 // Get audit price for this country 173 const countryRow = await getOne( 174 'SELECT price_usd, currency_symbol, currency_code FROM countries WHERE country_code = $1', 175 [siteData.country_code] 176 ); 177 const priceFormatted = countryRow 178 ? `${countryRow.currency_symbol}${Math.round(countryRow.price_usd / 100)} ${countryRow.currency_code}` 179 : '$297 USD'; 180 const priceValue = countryRow ? countryRow.price_usd / 100 : 297; 181 182 // Estimate: at current score, what % of potential conversions are being missed? 183 // Simple model: score of 50 = ~50% conversion efficiency, score of 80 = ~80% efficiency 184 const efficiency = siteData.score / 100; 185 const potentialMonthlyConversions = benchmark.monthly_visitors * benchmark.conversion_rate; 186 const actualMonthlyConversions = potentialMonthlyConversions * efficiency; 187 const missedConversions = potentialMonthlyConversions - actualMonthlyConversions; 188 const missedMonthlyRevenue = Math.round(missedConversions * benchmark.avg_job); 189 const missedAnnualRevenue = missedMonthlyRevenue * 12; 190 const auditFraction = missedAnnualRevenue > 0 191 ? (priceValue / missedAnnualRevenue * 100).toFixed(1) 192 : null; 193 194 return { 195 estimated_monthly_visitors: benchmark.monthly_visitors, 196 avg_job_value: benchmark.avg_job, 197 industry_conversion_rate: benchmark.conversion_rate, 198 estimated_monthly_revenue_at_stake: missedMonthlyRevenue, 199 audit_price: priceFormatted, 200 audit_as_fraction_of_annual_loss: auditFraction ? `${auditFraction}%` : null, 201 }; 202 } 203 204 async function fetchProposalsBatch(channel) { 205 const contactMethod = channel === 'email' ? 'email' : 'sms'; 206 207 // Support both contact schemas: 208 // new: contacts_json.contacts[].type = 'email'|'sms' 209 // old: contacts_json.email_addresses[] or contacts_json.phone_numbers[] 210 211 // Note: contacts_json column holds sentinel '{"_fs":true}' — real data is on filesystem. 212 // We cannot use json_extract(contacts_json, ...) for contact-type gating in SQL. 213 // Instead, fetch a wider pool and filter in JS after loading from filesystem. 214 const candidateLimit = limit * 4; // Oversample to account for filtered-out sites 215 const sites = await getAll( 216 `SELECT s.id, s.domain, s.landing_page_url, s.keyword, s.score, s.grade, 217 s.evidence_json, s.country_code, s.city, s.state 218 FROM sites s 219 WHERE s.status IN ('enriched', 'enriched_llm') 220 AND s.score IS NOT NULL 221 AND s.score < 82 222 AND s.evidence_json IS NOT NULL 223 AND NOT EXISTS ( 224 SELECT 1 FROM messages m 225 WHERE m.site_id = s.id 226 AND m.contact_method = $1 227 AND m.direction = 'outbound' 228 AND m.message_type = 'outreach' 229 ) 230 ORDER BY s.score ASC 231 LIMIT $2`, 232 [contactMethod, candidateLimit] 233 ); 234 235 const aggregateStats = await getAggregateStats(); 236 237 // Filter to sites that actually have the required contact type (filesystem read) 238 const filtered = []; 239 for (const site of sites) { 240 if (filtered.length >= limit) break; 241 const contactsJson = getContactsDataWithFallback(site.id, site) || {}; 242 const hasContact = 243 (Array.isArray(contactsJson.contacts) && 244 contactsJson.contacts.some(c => c.type === contactMethod)) || 245 (contactMethod === 'email' && (contactsJson.email_addresses?.length ?? 0) > 0) || 246 (contactMethod === 'sms' && (contactsJson.phone_numbers?.length ?? 0) > 0); 247 if (hasContact) filtered.push({ ...site, _contacts: contactsJson }); 248 } 249 250 return Promise.all(filtered.map(async site => { 251 const scoreJson = getScoreDataWithFallback(site.id, site) || {}; 252 const contactsJson = site._contacts || {}; 253 254 // Normalise both schemas into a unified contacts list 255 let contacts; 256 if (Array.isArray(contactsJson.contacts)) { 257 contacts = contactsJson.contacts.filter(c => c.type === contactMethod); 258 } else if (contactMethod === 'email') { 259 contacts = (contactsJson.email_addresses || []).map(e => ({ 260 type: 'email', 261 uri: e.email, 262 name: e.name || null, 263 })); 264 } else { 265 contacts = (contactsJson.phone_numbers || []).map(p => ({ 266 type: 'sms', 267 uri: p.number, 268 name: p.name || null, 269 })); 270 } 271 272 return { 273 site_id: site.id, 274 domain: site.domain, 275 landing_page_url: site.landing_page_url, 276 keyword: site.keyword, 277 score: site.score, 278 grade: site.grade || computeGrade(site.score), 279 industry: scoreJson.industry_classification || null, 280 country_code: site.country_code, 281 city: site.city || contactsJson.city || null, 282 state: site.state || contactsJson.state || null, 283 business_name: contactsJson.business_name || site.domain, 284 contacts: contacts.map(c => ({ 285 type: c.type, 286 uri: c.uri || c.value, 287 name: c.name || null, 288 })), 289 weaknesses: extractWeaknesses(scoreJson), 290 evidence_summary: site.evidence_json 291 ? safeJsonParse(site.evidence_json)?.evidence_summary || null 292 : null, 293 competitor_benchmark: await getCompetitorBenchmark(site), 294 industry_benchmark: await getIndustryBenchmark(site), 295 aggregate_stats: aggregateStats, 296 roi_estimate: await getROIEstimate(site), 297 }; 298 })); 299 } 300 301 function extractWeaknesses(scoreJson) { 302 if (!scoreJson?.factor_scores) return []; 303 return Object.entries(scoreJson.factor_scores) 304 .filter(([, f]) => f.score < 5) 305 .map(([name, f]) => ({ 306 factor: name.replace(/_/g, ' '), 307 score: f.score, 308 reasoning: f.reasoning, 309 })) 310 .slice(0, 5); 311 } 312 313 async function fetchProofread() { 314 // Pending outbound outreach messages awaiting QA approval 315 const messages = await getAll( 316 `SELECT m.id, m.site_id, m.message_body, m.subject_line, m.contact_method, 317 m.contact_uri, s.domain, s.score, s.grade, s.country_code, s.landing_page_url 318 FROM messages m 319 JOIN sites s ON s.id = m.site_id 320 WHERE m.direction = 'outbound' 321 AND (m.message_type = 'outreach' OR m.message_type LIKE 'followup%') 322 AND m.approval_status = 'pending' 323 AND m.sent_at IS NULL 324 AND m.delivery_status IS NULL 325 ORDER BY m.contact_method, m.created_at ASC 326 LIMIT $1`, 327 [limit] 328 ); 329 330 return messages.map(msg => ({ 331 message_id: msg.id, 332 site_id: msg.site_id, 333 domain: msg.domain, 334 landing_page_url: msg.landing_page_url, 335 score: msg.score, 336 grade: msg.grade || computeGrade(msg.score), 337 country_code: msg.country_code, 338 contact_method: msg.contact_method, 339 contact_uri: msg.contact_uri, 340 message_body: msg.message_body, 341 subject_line: msg.subject_line || null, 342 })); 343 } 344 345 async function fetchClassifyReplies() { 346 const replies = await getAll( 347 `SELECT m.id, m.site_id, m.message_body, m.contact_method, m.contact_uri, 348 s.domain 349 FROM messages m 350 JOIN sites s ON s.id = m.site_id 351 WHERE m.direction = 'inbound' 352 AND m.intent IS NULL 353 AND m.message_body IS NOT NULL 354 ORDER BY m.created_at ASC 355 LIMIT $1`, 356 [limit] 357 ); 358 359 return replies.map(r => ({ 360 message_id: r.id, 361 site_id: r.site_id, 362 domain: r.domain, 363 channel: r.contact_method, 364 message: r.message_body, 365 })); 366 } 367 368 async function fetchExtractNames() { 369 // contacts_json column holds sentinel — real data is on filesystem. 370 // Find sites with contacts_json IS NOT NULL (sentinel or real data), then filter in JS. 371 const candidateLimit = limit * 3; 372 const sites = await getAll( 373 `SELECT s.id, s.domain, s.landing_page_url 374 FROM sites s 375 ORDER BY s.id DESC 376 LIMIT $1`, 377 [candidateLimit] 378 ); 379 380 const results = []; 381 for (const s of sites) { 382 if (results.length >= limit) break; 383 const contactsJson = getContactsDataWithFallback(s.id, s) || {}; 384 const namelessContacts = (contactsJson.contacts || []).filter(c => !c.name); 385 if (namelessContacts.length === 0) continue; 386 results.push({ 387 site_id: s.id, 388 domain: s.domain, 389 landing_page_url: s.landing_page_url, 390 contacts: namelessContacts.map(c => ({ 391 type: c.type, 392 uri: c.uri || c.value, 393 })), 394 }); 395 } 396 return results; 397 } 398 399 400 async function fetchReplyResponses() { 401 // Inbound messages that have been classified but have no outbound reply yet 402 const inbounds = await getAll( 403 `SELECT m.id, m.site_id, m.message_body, m.contact_method, m.contact_uri, 404 m.intent, m.sentiment, 405 s.domain, s.score, s.grade, s.country_code 406 FROM messages m 407 JOIN sites s ON s.id = m.site_id 408 WHERE m.direction = 'inbound' 409 AND m.intent IS NOT NULL 410 AND m.intent NOT IN ('opt-out', 'autoresponder') 411 AND NOT EXISTS ( 412 SELECT 1 FROM messages m2 413 WHERE m2.site_id = m.site_id 414 AND m2.direction = 'outbound' 415 AND m2.message_type IN ('reply', 'reply_skipped') 416 AND m2.created_at > m.created_at 417 ) 418 ORDER BY m.created_at ASC 419 LIMIT $1`, 420 [limit] 421 ); 422 423 return Promise.all(inbounds.map(async msg => { 424 const scoreJson = getScoreDataWithFallback(msg.site_id, msg) || {}; 425 const contactsJson = getContactsDataWithFallback(msg.site_id, msg) || {}; 426 427 // Get full conversation history for this site 428 const history = await getAll( 429 `SELECT direction, message_body, contact_method, sent_at, created_at, intent 430 FROM messages 431 WHERE site_id = $1 432 ORDER BY created_at ASC`, 433 [msg.site_id] 434 ); 435 436 // Get country-specific pricing from DB 437 const countryRow = await getOne( 438 'SELECT price_usd, currency_symbol, currency_code FROM countries WHERE country_code = $1', 439 [msg.country_code] 440 ); 441 const pricing = countryRow 442 ? `${countryRow.currency_symbol}${Math.round(countryRow.price_usd / 100)} ${countryRow.currency_code}` 443 : null; 444 445 return { 446 inbound_message_id: msg.id, 447 site_id: msg.site_id, 448 domain: msg.domain, 449 score: msg.score, 450 grade: msg.grade, 451 country_code: msg.country_code, 452 business_name: contactsJson.business_name || msg.domain, 453 intent: msg.intent, 454 sentiment: msg.sentiment, 455 inbound_message: wrapUntrusted( 456 stripInjectionMarkers(msg.message_body || ''), 457 'prospect_reply' 458 ), 459 channel: msg.contact_method, 460 contact_uri: msg.contact_uri, 461 pricing, 462 weaknesses: extractWeaknesses(scoreJson), 463 conversation_history: history.map(h => ({ 464 direction: h.direction, 465 message: wrapUntrusted(stripInjectionMarkers(h.message_body || ''), 'conversation_history'), 466 channel: h.contact_method, 467 timestamp: h.sent_at || h.created_at, 468 intent: h.intent || undefined, 469 })), 470 }; 471 })); 472 } 473 474 async function fetchOverseeData() { 475 // Collect system state for overseer analysis — mirrors sonnet-overseer.js data collection 476 const services = ['333method-pipeline', 'mmo-cron.timer', '333method-dashboard']; 477 const serviceStatus = {}; 478 for (const svc of services) { 479 try { 480 serviceStatus[svc] = execSync(`systemctl --user is-active ${svc}`, { 481 encoding: 'utf8', 482 timeout: 8000, 483 }).trim(); 484 } catch (err) { 485 serviceStatus[svc] = (err.stdout || '').trim() || 'inactive'; 486 } 487 } 488 489 const today = new Date().toISOString().slice(0, 10); 490 const logFile = join(process.cwd(), `logs/pipeline-${today}.log`); 491 let recentErrors = []; 492 if (existsSync(logFile)) { 493 const cutoff = new Date(Date.now() - 30 * 60 * 1000); 494 const lines = readFileSync(logFile, 'utf8').split('\n'); 495 const seen = new Map(); 496 for (const line of lines.slice(-400)) { 497 if (!line.includes('[ERROR]') && !line.includes('[WARN]')) continue; 498 const tsMatch = line.match(/^(\[[\d\-T:.+Z]+\])/); 499 if (tsMatch) { 500 try { 501 if (new Date(tsMatch[1].slice(1, -1)) < cutoff) continue; 502 } catch { 503 /* keep */ 504 } 505 } 506 const trimmed = line.slice(0, 200); 507 const key = trimmed.slice(0, 80); 508 const count = (seen.get(key) || 0) + 1; 509 seen.set(key, count); 510 if (count === 1) recentErrors.push(trimmed); 511 else if (count <= 3) recentErrors[recentErrors.length - 1] = `${trimmed} (×${count})`; 512 } 513 recentErrors = recentErrors.slice(-30); 514 } 515 516 const now = new Date(); 517 const twoHoursAgo = new Date(now - 2 * 60 * 60 * 1000).toISOString(); 518 const sixHoursAgo = new Date(now - 6 * 60 * 60 * 1000).toISOString(); 519 const fourHoursAgo = new Date(now - 4 * 60 * 60 * 1000).toISOString(); 520 const activeStages = [ 521 'found', 522 'assets_captured', 523 'prog_scored', 524 'semantic_scored', 525 'vision_scored', 526 'enriched', 527 ]; 528 529 const agentTasks = { 530 blocked_sample: await getAll( 531 `SELECT id, task_type, created_at, substr(context_json::text, 1, 120) as ctx 532 FROM tel.agent_tasks WHERE status='blocked' ORDER BY created_at ASC LIMIT 20` 533 ), 534 stale_blocked_count: (await getOne( 535 `SELECT COUNT(*) as n FROM tel.agent_tasks WHERE status='blocked' AND created_at < $1`, 536 [twoHoursAgo] 537 ))?.n ?? 0, 538 stale_pending_count: (await getOne( 539 `SELECT COUNT(*) as n FROM tel.agent_tasks WHERE status='pending' AND created_at < $1`, 540 [sixHoursAgo] 541 ))?.n ?? 0, 542 stale_running_count: (await getOne( 543 `SELECT COUNT(*) as n FROM tel.agent_tasks WHERE status='running' AND started_at < $1`, 544 [twoHoursAgo] 545 ))?.n ?? 0, 546 recent_failed: await getAll( 547 `SELECT task_type, COUNT(*) as n FROM tel.agent_tasks 548 WHERE status='failed' AND completed_at > $1 549 GROUP BY task_type ORDER BY n DESC LIMIT 10`, 550 [twoHoursAgo] 551 ), 552 running_tasks: await getAll( 553 `SELECT id, task_type, assigned_to, started_at, 554 ROUND(EXTRACT(EPOCH FROM (NOW() - started_at)) / 60) as age_mins 555 FROM tel.agent_tasks WHERE status='running' ORDER BY started_at ASC LIMIT 10` 556 ), 557 }; 558 559 const stagesPlaceholders = activeStages.map((_, i) => `$${i + 1}`).join(','); 560 const siteHealth = { 561 distribution: await getAll( 562 `SELECT status, COUNT(*) as n FROM sites GROUP BY status ORDER BY n DESC LIMIT 10` 563 ), 564 stuck_by_stage: await getAll( 565 `SELECT status, COUNT(*) as n FROM sites 566 WHERE status IN (${stagesPlaceholders}) AND updated_at < $${activeStages.length + 1} 567 GROUP BY status ORDER BY n DESC`, 568 [...activeStages, fourHoursAgo] 569 ), 570 looping_candidates: await getAll( 571 `SELECT status, COUNT(*) as n FROM sites 572 WHERE recapture_count > 5 AND status IN (${stagesPlaceholders}) 573 GROUP BY status`, 574 activeStages 575 ), 576 recently_reset_counts: await getAll( 577 `SELECT status, COUNT(*) as n FROM sites 578 WHERE error_message LIKE '%reset by sonnet overseer%' 579 AND updated_at > NOW() - INTERVAL '24 hours' 580 GROUP BY status` 581 ), 582 queue_depths: { 583 found: (await getOne(`SELECT COUNT(*) as n FROM sites WHERE status='found'`))?.n ?? 0, 584 semantic_scored: (await getOne( 585 `SELECT COUNT(*) as n FROM sites WHERE status IN ('semantic_scored','vision_scored')` 586 ))?.n ?? 0, 587 enriched: (await getOne( 588 `SELECT COUNT(*) as n FROM sites WHERE status IN ('enriched','enriched_regex','enriched_llm')` 589 ))?.n ?? 0, 590 }, 591 }; 592 593 const nowIso = now.toISOString(); 594 const cronHealth = { 595 overdue_cron_jobs: await getAll( 596 `SELECT task_key, last_run_at, interval_value, interval_unit, 597 (last_run_at + (interval_value || ' ' || interval_unit)::interval) AS next_expected_at 598 FROM ops.cron_jobs 599 WHERE enabled = 1 600 AND (last_run_at + (interval_value || ' ' || interval_unit)::interval + INTERVAL '30 seconds') < $1 601 ORDER BY next_expected_at ASC LIMIT 10`, 602 [nowIso] 603 ), 604 recent_health_checks: await getAll( 605 `SELECT check_type, status, action_taken, created_at 606 FROM tel.system_health ORDER BY created_at DESC LIMIT 10` 607 ), 608 }; 609 610 const backupDbRow = await getOne( 611 `SELECT last_run_at, 612 ROUND(EXTRACT(EPOCH FROM (NOW() - last_run_at)) / 3600, 1) AS hours_ago 613 FROM ops.cron_jobs WHERE task_key='backupDatabase' LIMIT 1` 614 ); 615 616 // Previous overseer log (last 30 lines) 617 const overseerLog = join(process.cwd(), 'logs/pipeline-status.txt'); 618 let previousLog = '(no previous runs)'; 619 if (existsSync(overseerLog)) { 620 try { 621 const lines = readFileSync(overseerLog, 'utf8').split('\n').filter(Boolean); 622 previousLog = lines.slice(-30).join('\n') || '(no previous entries)'; 623 } catch { 624 /* ignore */ 625 } 626 } 627 628 const systemData = { 629 timestamp: now.toISOString(), 630 skip_stages: process.env.SKIP_STAGES || 'none', 631 services: serviceStatus, 632 recent_errors_last_30min: recentErrors, 633 agent_tasks: agentTasks, 634 sites: siteHealth, 635 cron: cronHealth, 636 backup_db_status: backupDbRow ?? null, 637 previous_overseer_findings: previousLog, 638 }; 639 640 // Return as single-item batch (count=1) 641 return [{ system_data: systemData }]; 642 } 643 644 async function fetchClassifyErrors() { 645 // Collect unknown error messages for LLM pattern proposal (Phase 2 only) 646 const MIN_OCCURRENCES = 1; 647 const MAX_ERRORS = 50; 648 649 const unknowns = []; 650 651 const siteErrors = await getAll( 652 `SELECT error_message, COUNT(*) AS cnt 653 FROM sites 654 WHERE status IN ('failing', 'ignored') 655 AND error_message IS NOT NULL AND error_message != '' 656 GROUP BY error_message 657 HAVING COUNT(*) >= $1 658 ORDER BY cnt DESC LIMIT $2`, 659 [MIN_OCCURRENCES, MAX_ERRORS * 2] 660 ); 661 662 for (const row of siteErrors) { 663 const { group } = categorizeError(row.error_message, 'site'); 664 if (group === 'unknown') { 665 unknowns.push({ error_message: row.error_message, count: row.cnt, context: 'site' }); 666 } 667 } 668 669 const outreachErrors = await getAll( 670 `SELECT error_message, COUNT(*) AS cnt 671 FROM messages 672 WHERE direction = 'outbound' 673 AND delivery_status IN ('failed', 'retry_later') 674 AND error_message IS NOT NULL AND error_message != '' 675 GROUP BY error_message 676 HAVING COUNT(*) >= $1 677 ORDER BY cnt DESC LIMIT $2`, 678 [MIN_OCCURRENCES, MAX_ERRORS] 679 ); 680 681 for (const row of outreachErrors) { 682 const { group } = categorizeError(row.error_message, 'outreach'); 683 if (group === 'unknown') { 684 unknowns.push({ error_message: row.error_message, count: row.cnt, context: 'outreach' }); 685 } 686 } 687 688 unknowns.sort((a, b) => b.count - a.count); 689 const items = unknowns.slice(0, MAX_ERRORS); 690 691 // Return as single-item batch with error list embedded 692 if (items.length === 0) return []; 693 return [{ unknown_errors: items }]; 694 } 695 696 /** 697 * Fetch sites that need HTML-only LLM scoring via the orchestrator. 698 * Only runs when ENABLE_VISION=false and ENABLE_LLM_SCORING=true. 699 * Sites remain at status='assets_captured' with score IS NULL until results are stored. 700 */ 701 async function fetchScoreSites() { 702 const sites = await getAll( 703 `SELECT s.id, s.domain, s.landing_page_url, s.http_headers, s.keyword, s.html_dom 704 FROM sites s 705 WHERE s.status = 'assets_captured' 706 AND s.html_dom IS NOT NULL 707 AND s.score IS NULL 708 ORDER BY CASE WHEN s.country_code IN ('AU','CA','GB','IE','IN','NZ','US','ZA') THEN 0 ELSE 1 END ASC 709 LIMIT $1`, 710 [limit] 711 ); 712 713 return sites.map(site => { 714 const htmlDom = readHtmlDom(site.id) || site.html_dom; 715 return { 716 site_id: site.id, 717 domain: site.domain, 718 url: site.landing_page_url, 719 keyword: site.keyword, 720 html: htmlDom ? htmlDom.substring(0, 50000) : '', 721 http_headers: site.http_headers ? JSON.parse(site.http_headers) : null, 722 }; 723 }); 724 } 725 726 /** 727 * Fetch sites that need LLM contact enrichment via the orchestrator. 728 * Two cases: 729 * 1. 'semantic_scored'/'vision_scored' — no browser pass yet; extract contacts from html_dom 730 * 2. 'enriched_regex' — browser pass done; LLM pass over key_pages_html + html_dom → enriched_llm 731 */ 732 async function fetchEnrichSites() { 733 // Note: contacts_json column holds sentinel '{"_fs":true}' — real data is on filesystem. 734 // The old check json_extract(contacts_json, '$.business_name') IS NULL cannot be used. 735 // Gate on status + enriched_at instead (same intent: find unenriched sites). 736 const sites = await getAll( 737 `SELECT s.id, s.domain, s.landing_page_url, s.status, s.html_dom 738 FROM sites s 739 WHERE ( 740 ( 741 s.status IN ('semantic_scored', 'vision_scored') 742 AND (s.enriched_at IS NULL OR s.error_message IS NOT NULL) 743 AND s.html_dom IS NOT NULL 744 ) 745 OR 746 ( 747 s.status = 'enriched_regex' 748 ) 749 ) 750 ORDER BY 751 CASE s.status WHEN 'enriched_regex' THEN 0 ELSE 1 END ASC, 752 CASE WHEN s.country_code IN ('AU','CA','GB','IE','IN','NZ','US','ZA') THEN 0 ELSE 1 END ASC 753 LIMIT $1`, 754 [limit] 755 ); 756 757 return sites.map(site => { 758 const contactsJson = getContactsDataWithFallback(site.id); 759 const htmlDom = readHtmlDom(site.id) || site.html_dom; 760 const keyPagesHtml = readKeyPagesHtml(site.id); 761 762 // For enriched_regex: combine key_pages_html entries + html_dom as LLM context 763 // HTML is truncated aggressively to keep per-batch context under 150kB (15 sites × 10kB each) 764 let html; 765 if (site.status === 'enriched_regex' && keyPagesHtml) { 766 const parts = Object.entries(keyPagesHtml) 767 .map(([url, h]) => `<!-- Page: ${url} -->\n${h}`) 768 .join('\n\n'); 769 const landingHtml = htmlDom ? `<!-- Landing page -->\n${htmlDom.substring(0, 5000)}` : ''; 770 html = `${landingHtml}\n\n${parts}`.substring(0, 20000); 771 } else { 772 html = htmlDom ? htmlDom.substring(0, 15000) : ''; 773 } 774 775 return { 776 site_id: site.id, 777 domain: site.domain, 778 url: site.landing_page_url, 779 current_contacts: contactsJson, 780 html, 781 enrichment_pass: site.status === 'enriched_regex' ? 'llm' : 'initial', 782 }; 783 }); 784 } 785 786 /** 787 * Fetch sites that need semantic scoring (headline, value prop, USP). 788 * These are sites that have been programmatically scored but lack LLM semantic assessment. 789 * Extracts key text from html_dom for Haiku to evaluate. 790 */ 791 async function fetchScoreSemantic() { 792 const sites = await getAll( 793 `SELECT s.id, s.domain, s.landing_page_url, s.keyword, s.html_dom 794 FROM sites s 795 WHERE s.status IN ('prog_scored', 'semantic_scored') 796 AND s.score IS NOT NULL 797 AND s.html_dom IS NOT NULL 798 ORDER BY s.updated_at DESC 799 LIMIT $1`, 800 [limit] 801 ); 802 803 return sites 804 .map(site => { 805 const extracted = extractScoringText(readHtmlDom(site.id) || site.html_dom); 806 const scoreJson = getScoreDataWithFallback(site.id, site) || {}; 807 808 return { 809 site_id: site.id, 810 domain: site.domain, 811 keyword: site.keyword, 812 current_score: scoreJson.conversion_score || null, 813 current_grade: scoreJson.letter_grade || null, 814 programmatic_factors: scoreJson.factor_scores 815 ? { 816 headline_quality: scoreJson.factor_scores.headline_quality, 817 value_proposition: scoreJson.factor_scores.value_proposition, 818 unique_selling_proposition: scoreJson.factor_scores.unique_selling_proposition, 819 } 820 : null, 821 text: extracted, 822 }; 823 }) 824 .filter(item => item.text !== null); 825 } 826 827 /** 828 * Monitor health batch — lightweight system health snapshot for SRE analysis. 829 * Focuses on agent_tasks state, service status, and recent error rate. 830 * Lighter than oversee: skips site distribution, competitor data, etc. 831 */ 832 async function fetchMonitorHealth() { 833 const now = new Date(); 834 const thirtyMinsAgo = new Date(now - 30 * 60 * 1000).toISOString(); 835 const twoHoursAgo = new Date(now - 2 * 60 * 60 * 1000).toISOString(); 836 837 const agentTaskSummary = { 838 stuck_running: await getAll( 839 `SELECT id, task_type, started_at, 840 ROUND(EXTRACT(EPOCH FROM (NOW() - started_at)) / 60) as age_mins 841 FROM tel.agent_tasks WHERE status = 'running' AND started_at < $1 842 ORDER BY started_at ASC LIMIT 10`, 843 [thirtyMinsAgo] 844 ), 845 blocked_count: (await getOne( 846 `SELECT COUNT(*) as n FROM tel.agent_tasks WHERE status = 'blocked'` 847 ))?.n ?? 0, 848 failed_recent: await getAll( 849 `SELECT task_type, COUNT(*) as n FROM tel.agent_tasks 850 WHERE status = 'failed' AND completed_at > $1 851 GROUP BY task_type ORDER BY n DESC LIMIT 10`, 852 [twoHoursAgo] 853 ), 854 pending_by_type: await getAll( 855 `SELECT task_type, COUNT(*) as n FROM tel.agent_tasks 856 WHERE status = 'pending' 857 GROUP BY task_type ORDER BY n DESC LIMIT 10` 858 ), 859 code_review_fix_pending: (await getOne( 860 `SELECT COUNT(*) as n FROM tel.agent_tasks WHERE task_type = 'code_review_fix' AND status = 'pending'` 861 ))?.n ?? 0, 862 }; 863 864 // Recent errors from pipeline log 865 const today = new Date().toISOString().slice(0, 10); 866 const logFile = join(process.cwd(), `logs/pipeline-${today}.log`); 867 let recentErrors = []; 868 if (existsSync(logFile)) { 869 const cutoff = new Date(Date.now() - 30 * 60 * 1000); 870 const lines = readFileSync(logFile, 'utf8').split('\n'); 871 for (const line of lines.slice(-200)) { 872 if (!line.includes('[ERROR]')) continue; 873 const tsMatch = line.match(/^(\[[\d\-T:.+Z]+\])/); 874 if (tsMatch) { 875 try { 876 if (new Date(tsMatch[1].slice(1, -1)) < cutoff) continue; 877 } catch { 878 /* keep */ 879 } 880 } 881 recentErrors.push(line.slice(0, 150)); 882 } 883 recentErrors = recentErrors.slice(-20); 884 } 885 886 return [ 887 { agent_tasks: agentTaskSummary, recent_errors: recentErrors, checked_at: now.toISOString() }, 888 ]; 889 } 890 891 /** 892 * Triage errors batch — classifies recent pipeline failures for routing/prioritization. 893 * Pulls failed agent_tasks and recent site/message errors not yet categorized. 894 */ 895 async function fetchTriageErrors() { 896 const twoHoursAgo = new Date(Date.now() - 2 * 60 * 60 * 1000).toISOString(); 897 898 const failedTasks = (await getAll( 899 `SELECT id, task_type, error_message, context_json, completed_at 900 FROM tel.agent_tasks 901 WHERE status = 'failed' 902 AND completed_at > $1 903 AND error_message IS NOT NULL 904 ORDER BY completed_at DESC LIMIT 30`, 905 [twoHoursAgo] 906 )).map(t => ({ 907 id: t.id, 908 task_type: t.task_type, 909 error_message: t.error_message, 910 context_summary: t.context_json 911 ? JSON.parse(t.context_json || '{}')?.file_path || t.task_type 912 : t.task_type, 913 })); 914 915 if (failedTasks.length === 0) return []; 916 917 return [{ failed_tasks: failedTasks, window: '2h' }]; 918 } 919 920 /** 921 * Check docs batch — returns recently changed source files for doc freshness check. 922 * Reads last 10 commits' changed src/ files and returns their content alongside 923 * relevant CLAUDE.md sections. 924 */ 925 function fetchCheckDocs() { 926 let changedFiles = []; 927 try { 928 const output = execSync('git diff --name-only HEAD~5 2>/dev/null', { 929 encoding: 'utf8', 930 timeout: 5000, 931 cwd: process.cwd(), 932 }); 933 changedFiles = output 934 .split('\n') 935 .filter(f => f.startsWith('src/') && f.endsWith('.js')) 936 .slice(0, 5); 937 } catch { 938 /* no git or no changes */ 939 } 940 941 if (changedFiles.length === 0) return []; 942 943 const items = changedFiles 944 .map(filePath => { 945 const fullPath = join(process.cwd(), filePath); 946 if (!existsSync(fullPath)) return null; 947 const content = readFileSync(fullPath, 'utf8'); 948 // Return first 100 lines (enough for JSDoc + exports, not entire file) 949 const preview = content.split('\n').slice(0, 100).join('\n'); 950 return { file_path: filePath, preview }; 951 }) 952 .filter(Boolean); 953 954 if (items.length === 0) return []; 955 return [{ changed_files: items }]; 956 } 957 958 /** 959 * Code Review batch — returns the next source file to review from the queue. 960 * 961 * Queue file: logs/code-review-queue.json 962 * { "files": ["src/payment/paypal.js", ...], "next_index": 0, "reviewed": 0 } 963 * 964 * Run `node scripts/claude-batch.js code_review` once queue is seeded. 965 * Queue is seeded by: scripts/init-code-review-queue.sh (security-first order). 966 * Skips files that already have pending/running fix tasks to avoid duplicate work. 967 */ 968 async function fetchCodeReview() { 969 const queuePath = join(process.cwd(), 'logs/code-review-queue.json'); 970 if (!existsSync(queuePath)) { 971 return []; // Queue not yet seeded — nothing to do 972 } 973 974 const queue = JSON.parse(readFileSync(queuePath, 'utf8')); 975 const { files, next_index: nextIndex } = queue; 976 977 if (!Array.isArray(files) || nextIndex >= files.length) { 978 return []; // Queue exhausted 979 } 980 981 // Skip files that already have a pending/running code_review_fix task 982 let idx = nextIndex; 983 let filePath = null; 984 while (idx < files.length) { 985 const candidate = files[idx]; 986 const existing = await getOne( 987 `SELECT id FROM tel.agent_tasks 988 WHERE task_type = 'code_review_fix' 989 AND context_json->>'file_path' = $1 990 AND status IN ('pending', 'running') 991 LIMIT 1`, 992 [candidate] 993 ); 994 if (!existing) { 995 filePath = candidate; 996 break; 997 } 998 idx++; 999 } 1000 1001 if (!filePath) return []; // All remaining files have active fix tasks 1002 1003 // Advance queue pointer 1004 queue.next_index = idx + 1; 1005 queue.reviewed = (queue.reviewed || 0) + 1; 1006 writeFileSync(queuePath, JSON.stringify(queue, null, 2)); 1007 1008 // Read file content 1009 const fullPath = join(process.cwd(), filePath); 1010 if (!existsSync(fullPath)) { 1011 return []; // File was deleted — skip 1012 } 1013 1014 const content = readFileSync(fullPath, 'utf8'); 1015 const lineCount = content.split('\n').length; 1016 1017 return [ 1018 { 1019 file_path: filePath, 1020 line_count: lineCount, 1021 content, 1022 }, 1023 ]; 1024 } 1025 1026 /** 1027 * followup_generate — Fetch sites needing follow-up message generation. 1028 * 1029 * Finds sites where: 1030 * - status = 'outreach_sent' 1031 * - next_followup_at <= now (due for follow-up) 1032 * - No inbound reply received 1033 * - Not opted out 1034 * - Max sequence_step < 5 1035 * - Respects 3-day cooldown 1036 * 1037 * Returns site data + weaknesses + conversation history for LLM follow-up generation. 1038 */ 1039 async function fetchFollowupGenerate() { 1040 const closedStatuses = ['not_interested', 'closed', 'unsubscribed', 'paid', 'report_delivered']; 1041 const closedPlaceholders = closedStatuses.map((_, i) => `$${i + 1}`).join(','); 1042 1043 const sites = await getAll( 1044 `SELECT 1045 s.id, s.domain, s.landing_page_url, s.keyword, s.score, s.grade, 1046 s.country_code, s.city, s.state, s.is_running_ads, 1047 s.last_outreach_at, s.next_followup_at, 1048 (SELECT MAX(m.sequence_step) FROM messages m 1049 WHERE m.site_id = s.id AND m.direction = 'outbound' 1050 AND m.sequence_step IS NOT NULL) as max_step 1051 FROM sites s 1052 WHERE s.status = 'outreach_sent' 1053 AND s.next_followup_at IS NOT NULL 1054 AND s.next_followup_at <= NOW() 1055 AND NOT EXISTS ( 1056 SELECT 1 FROM messages m 1057 WHERE m.site_id = s.id AND m.direction = 'inbound' 1058 ) 1059 AND NOT EXISTS ( 1060 SELECT 1 FROM opt_outs o 1061 JOIN messages m ON ( 1062 (o.phone = m.contact_uri AND o.method = 'sms') 1063 OR (o.email = m.contact_uri AND o.method = 'email') 1064 ) 1065 WHERE m.site_id = s.id AND m.direction = 'outbound' 1066 ) 1067 AND (s.conversation_status IS NULL 1068 OR s.conversation_status NOT IN (${closedPlaceholders})) 1069 AND (SELECT COALESCE(MAX(m2.sequence_step), 1) FROM messages m2 1070 WHERE m2.site_id = s.id AND m2.direction = 'outbound' 1071 AND m2.sequence_step IS NOT NULL) < 8 1072 AND (s.last_outreach_at IS NULL 1073 OR s.last_outreach_at < NOW() - INTERVAL '3 days') 1074 -- Must have at least one successfully delivered email/sms to follow up on 1075 AND EXISTS ( 1076 SELECT 1 FROM messages m3 1077 WHERE m3.site_id = s.id AND m3.direction = 'outbound' 1078 AND m3.delivery_status IN ('sent', 'delivered') 1079 AND m3.contact_method IN ('email', 'sms') 1080 ) 1081 ORDER BY s.next_followup_at ASC 1082 LIMIT $${closedStatuses.length + 1}`, 1083 [...closedStatuses, limit] 1084 ); 1085 1086 return Promise.all(sites.map(async site => { 1087 const scoreJson = getScoreDataWithFallback(site.id, site) || {}; 1088 const contactsJson = getContactsDataWithFallback(site.id, site) || {}; 1089 const nextStep = (site.max_step || 1) + 1; 1090 1091 // Get sent outreach channels for this site 1092 const channels = await getAll( 1093 `SELECT DISTINCT contact_method, contact_uri 1094 FROM messages 1095 WHERE site_id = $1 AND direction = 'outbound' 1096 AND delivery_status IN ('sent', 'delivered') 1097 AND contact_method IN ('email', 'sms')`, 1098 [site.id] 1099 ); 1100 1101 // Get previous outreach messages for context 1102 const previousMessages = await getAll( 1103 `SELECT message_body, subject_line, contact_method, sequence_step, sent_at 1104 FROM messages 1105 WHERE site_id = $1 AND direction = 'outbound' 1106 AND delivery_status IN ('sent', 'delivered') 1107 ORDER BY sequence_step ASC, sent_at ASC`, 1108 [site.id] 1109 ); 1110 1111 return { 1112 site_id: site.id, 1113 domain: site.domain, 1114 landing_page_url: site.landing_page_url, 1115 keyword: site.keyword, 1116 score: site.score, 1117 grade: site.grade || computeGrade(site.score), 1118 country_code: site.country_code, 1119 city: site.city || contactsJson.city || null, 1120 state: site.state || contactsJson.state || null, 1121 business_name: contactsJson.business_name || site.domain, 1122 industry: scoreJson.industry_classification || scoreJson?.overall_calculation?.industry_classification || null, 1123 next_step: nextStep, 1124 channels: channels.map(c => ({ 1125 method: c.contact_method, 1126 uri: c.contact_uri, 1127 })), 1128 weaknesses: extractWeaknesses(scoreJson), 1129 previous_messages: previousMessages.map(m => ({ 1130 body: m.message_body?.substring(0, 200), 1131 subject: m.subject_line, 1132 channel: m.contact_method, 1133 step: m.sequence_step || 1, 1134 })), 1135 }; 1136 })); 1137 } 1138 1139 /** 1140 * followup_check — Queue approved followup messages whose send window has arrived. 1141 * 1142 * followup1: approved, queued=NULL, followup1_sent_at IS NULL, 1143 * last_outreach_at set, 24h elapsed, conversation not closed/replied. 1144 * followup2: approved, queued=NULL, followup2_sent_at IS NULL, 1145 * followup1_sent_at set, 3 days elapsed, conversation not closed/replied. 1146 * 1147 * Sets delivery_status='queued' on matching messages so the outreach worker picks them up. 1148 * Returns a summary item (count=1) describing how many were queued. 1149 * 1150 * Note: Uses run() which writes through the shared pool. 1151 */ 1152 async function fetchFollowupCheck() { 1153 let followup1Queued = 0; 1154 let followup2Queued = 0; 1155 1156 const closedStatuses = ['replied', 'interested', 'closed', 'not_interested']; 1157 const closedPlaceholders = closedStatuses.map((_, i) => `$${i + 1}`).join(', '); 1158 1159 // followup1: 24h after last_outreach_at, conversation still open 1160 const followup1Result = await run( 1161 `UPDATE messages 1162 SET delivery_status = 'queued', 1163 updated_at = NOW() 1164 WHERE message_type = 'followup1' 1165 AND approval_status = 'approved' 1166 AND delivery_status IS NULL 1167 AND sent_at IS NULL 1168 AND EXISTS ( 1169 SELECT 1 FROM sites s 1170 WHERE s.id = messages.site_id 1171 AND s.followup1_sent_at IS NULL 1172 AND s.last_outreach_at IS NOT NULL 1173 AND s.last_outreach_at + INTERVAL '24 hours' <= NOW() 1174 AND (s.conversation_status IS NULL 1175 OR s.conversation_status NOT IN (${closedPlaceholders})) 1176 )`, 1177 closedStatuses 1178 ); 1179 followup1Queued = followup1Result.changes; 1180 1181 // followup2: 3 days after followup1_sent_at, conversation still open 1182 const followup2Result = await run( 1183 `UPDATE messages 1184 SET delivery_status = 'queued', 1185 updated_at = NOW() 1186 WHERE message_type = 'followup2' 1187 AND approval_status = 'approved' 1188 AND delivery_status IS NULL 1189 AND sent_at IS NULL 1190 AND EXISTS ( 1191 SELECT 1 FROM sites s 1192 WHERE s.id = messages.site_id 1193 AND s.followup2_sent_at IS NULL 1194 AND s.followup1_sent_at IS NOT NULL 1195 AND s.followup1_sent_at + INTERVAL '3 days' <= NOW() 1196 AND (s.conversation_status IS NULL 1197 OR s.conversation_status NOT IN (${closedPlaceholders})) 1198 )`, 1199 closedStatuses 1200 ); 1201 followup2Queued = followup2Result.changes; 1202 1203 const total = followup1Queued + followup2Queued; 1204 if (total === 0) return []; 1205 1206 return [ 1207 { 1208 followup1_queued: followup1Queued, 1209 followup2_queued: followup2Queued, 1210 total_queued: total, 1211 }, 1212 ]; 1213 } 1214 1215 /** 1216 * Fetch sites ready for evidence merging. 1217 * 1218 * Picks sites where both evidence passes have been collected (evidence_pass1_json and 1219 * evidence_pass2_json are non-null) but the merged evidence_json hasn't been written yet. 1220 * 1221 * Returns one site at a time to keep Haiku context small. 1222 */ 1223 async function fetchEvidenceMerge() { 1224 const site = await getOne( 1225 `SELECT id, domain, evidence_pass1_json, evidence_pass2_json 1226 FROM sites 1227 WHERE evidence_json IS NULL 1228 AND evidence_pass1_json IS NOT NULL 1229 AND evidence_pass2_json IS NOT NULL 1230 AND status IN ('prog_scored','semantic_scored','vision_scored','enriched','proposals_drafted','outreach_partial','outreach_sent') 1231 ORDER BY score ASC 1232 LIMIT 1` 1233 ); 1234 1235 if (!site) return []; 1236 1237 return [ 1238 { 1239 site_id: site.id, 1240 domain: site.domain, 1241 score_json: getScoreJsonWithFallback(site.id), 1242 evidence_pass1: site.evidence_pass1_json || '', 1243 evidence_pass2: site.evidence_pass2_json || '', 1244 }, 1245 ]; 1246 } 1247 1248 const handlers = { 1249 proposals_email: () => fetchProposalsBatch('email'), 1250 proposals_sms: () => fetchProposalsBatch('sms'), 1251 proofread: fetchProofread, 1252 classify_replies: fetchClassifyReplies, 1253 extract_names: fetchExtractNames, 1254 reply_responses: fetchReplyResponses, 1255 oversee: fetchOverseeData, 1256 classify_errors: fetchClassifyErrors, 1257 score_semantic: fetchScoreSemantic, 1258 score_sites: fetchScoreSites, 1259 enrich_sites: fetchEnrichSites, 1260 code_review: fetchCodeReview, 1261 monitor_health: fetchMonitorHealth, 1262 triage_errors: fetchTriageErrors, 1263 check_docs: fetchCheckDocs, 1264 evidence_merge: fetchEvidenceMerge, 1265 followup_check: fetchFollowupCheck, 1266 followup_generate: fetchFollowupGenerate, 1267 }; 1268 1269 const handler = handlers[batchType]; 1270 if (!handler) { 1271 console.error(`Unknown batch type: ${batchType}`); 1272 console.error(`Valid types: ${Object.keys(handlers).join(', ')}`); 1273 process.exit(1); 1274 } 1275 1276 const result = { 1277 batch_type: batchType, 1278 timestamp: new Date().toISOString(), 1279 count: 0, 1280 items: [], 1281 }; 1282 1283 try { 1284 result.items = await handler(); 1285 result.count = result.items.length; 1286 console.log(JSON.stringify(result, null, 2)); 1287 } finally { 1288 await closePool(); 1289 }