claude-store.js
1 #!/usr/bin/env node 2 3 /** 4 * Claude Store — Accept Claude-generated results on stdin, write to DB. 5 * 6 * Usage: 7 * echo '{"batch_type":"classify_replies","results":[...]}' | node scripts/claude-store.js 8 * 9 * Or pipe from claude: 10 * claude -p "..." | node scripts/claude-store.js 11 * 12 * Expected input format (JSON on stdin): 13 * { 14 * "batch_type": "proposals_email|proposals_sms|classify_replies|extract_names|reply_responses|score_semantic|enrich_sites|proofread", 15 * "results": [ ... ] 16 * } 17 * 18 * Each batch type expects different result item shapes — see handlers below. 19 */ 20 21 import { execSync } from 'child_process'; 22 import { appendFileSync } from 'fs'; 23 import { join } from 'path'; 24 import { run, getOne, getAll, withTransaction, closePool } from '../src/utils/db.js'; 25 import { getCountryByCode } from '../src/config/countries.js'; 26 import { computeGrade, FACTOR_WEIGHTS, computeScoreFromFactors } from '../src/score.js'; 27 import { computeWeightedScore } from '../src/utils/programmatic-scorer.js'; 28 import { 29 isGovernmentEmail, 30 isEducationEmail, 31 isDemoEmail, 32 checkBlocklist, 33 classifyIndustry, 34 } from '../src/utils/site-filters.js'; 35 import { addCountryCode, normalizePhoneNumber } from '../src/utils/phone-normalizer.js'; 36 import { spin } from '../src/utils/spintax.js'; 37 import { cleanInvalidSocialLinks } from '../src/contacts/prioritize.js'; 38 import { resetRetries, recordFailure } from '../src/utils/retry-handler.js'; 39 import Logger from '../src/utils/logger.js'; 40 import { deleteHtmlDom, deleteKeyPagesHtml } from '../src/utils/html-storage.js'; 41 import { setScoreJson, getScoreJsonWithFallback } from '../src/utils/score-storage.js'; 42 import { setContactsJson, getContactsJsonWithFallback } from '../src/utils/contacts-storage.js'; 43 import '../src/utils/load-env.js'; 44 45 const logger = new Logger('ClaudeStore'); 46 47 async function readStdin() { 48 const chunks = []; 49 for await (const chunk of process.stdin) { 50 chunks.push(chunk); 51 } 52 return Buffer.concat(chunks).toString('utf8'); 53 } 54 55 async function storeProposal(client, item) { 56 const contactMethod = item.contact_method || 'email'; 57 let contactUri = item.contact_uri || 'PENDING_CONTACT_EXTRACTION'; 58 59 // Normalize phone numbers 60 if (contactMethod === 'sms' && contactUri !== 'PENDING_CONTACT_EXTRACTION' && item.country_code) { 61 contactUri = addCountryCode(contactUri, item.country_code); 62 } 63 64 // GDPR check 65 let approvalStatus = 'pending'; 66 if (contactMethod === 'email' && item.country_code) { 67 const country = getCountryByCode(item.country_code); 68 if (country?.requiresGDPRCheck) { 69 approvalStatus = 'gdpr_blocked'; 70 } 71 } 72 73 // Skip blocked emails 74 if (contactMethod === 'email') { 75 if (isGovernmentEmail(contactUri) || isEducationEmail(contactUri) || isDemoEmail(contactUri)) { 76 logger.info(`Skipping blocked email: ${contactUri}`); 77 return null; 78 } 79 } 80 81 // Resolve spintax 82 let messageBody = spin(item.message_body || item.proposal_text || ''); 83 let subjectLine = spin(item.subject_line || ''); 84 85 // Reject broken templates 86 if (messageBody.startsWith('{') || messageBody.startsWith(', ')) { 87 logger.warn(`Broken template for site ${item.site_id} — skipping`); 88 return null; 89 } 90 91 // ── Auto-fix known LLM proposal defects ───────────────────────────────── 92 // These catch the exact issues that cause pending↔rework cycles in proofread. 93 // Fix at generation time so proofread can focus on genuine quality issues. 94 95 // 1. Unresolved template variables — LLM sometimes leaves [name], [score] etc. 96 messageBody = messageBody.replace(/\[name\]/gi, 'Marcus'); 97 messageBody = messageBody.replace(/\[score\]/gi, String(item.score ?? '')); 98 messageBody = messageBody.replace(/\[grade\]/gi, item.grade ?? ''); 99 subjectLine = subjectLine.replace(/\[name\]/gi, 'Marcus'); 100 101 // 2. Sender identity + opt-out — appended dynamically based on country and channel. 102 // SMS: dynamic signature that fits remaining char budget. 103 // Email/form: full identity always appended. 104 const hasIdentity = /Marcus|Audit&Fix|AuditFix|auditandfix/i.test(messageBody); 105 if (contactMethod === 'sms') { 106 const cc = (item.country_code || '').toUpperCase(); 107 const needsStop = cc === 'US' || cc === 'CA'; 108 // Strip any LLM-generated STOP text or signature — we control these 109 messageBody = messageBody 110 .replace(/\s*Reply STOP to opt out\.?\s*/gi, '') 111 .replace(/\s*-\s*Marcus\s*(Webb)?\s*,?\s*(Audit\s*&\s*Fix)?\s*$/i, '') 112 .trimEnd(); 113 const stopText = needsStop ? ' Reply STOP to opt out.' : ''; 114 // Signature tiers: try longest first, fall back to shorter 115 const sigTiers = needsStop 116 ? [' - Marcus, Audit&Fix', ' - Audit&Fix'] // legal minimum is company name 117 : [' - Marcus', '']; // no legal requirement, personal touch if room 118 let bestSig = sigTiers[sigTiers.length - 1]; // fallback = shortest 119 for (const sig of sigTiers) { 120 if (messageBody.length + stopText.length + sig.length <= 160) { 121 bestSig = sig; 122 break; 123 } 124 } 125 messageBody = `${messageBody}${stopText}${bestSig}`; 126 } else if (!hasIdentity) { 127 messageBody = `${messageBody.trimEnd()}\n\nMarcus Webb, Audit&Fix (auditandfix.com)`; 128 } 129 130 // 3. Score/grade mismatch — LLM sometimes hallucinates a different score. 131 // Replace any "scored X/100" or "X/100 score" with the actual DB value. 132 if (item.score !== null && item.score !== undefined) { 133 const actualScore = Math.round(item.score); 134 messageBody = messageBody.replace( 135 /\b\d{1,3}\/100\b/g, 136 `${actualScore}/100` 137 ); 138 } 139 140 // Channel character limits 141 const limits = { sms: 160, x: 280 }; 142 const maxChars = limits[contactMethod]; 143 if (maxChars && messageBody.length > maxChars) { 144 logger.warn( 145 `${contactMethod} too long (${messageBody.length}/${maxChars}) for site ${item.site_id}` 146 ); 147 return null; 148 } 149 150 const result = await client.query( 151 `INSERT INTO messages (site_id, direction, message_body, subject_line, contact_method, contact_uri, approval_status, message_type) 152 VALUES ($1, 'outbound', $2, $3, $4, $5, $6, 'outreach') 153 RETURNING id`, 154 [item.site_id, messageBody, subjectLine, contactMethod, contactUri, approvalStatus] 155 ); 156 157 return result.rows[0]?.id; 158 } 159 160 async function storeProofreadDecision(client, item) { 161 if (!item.message_id) { 162 logger.warn('Proofread result missing message_id'); 163 return null; 164 } 165 166 const { decision } = item; 167 if (!['approve', 'rework', 'reject'].includes(decision)) { 168 logger.warn(`Invalid proofread decision "${decision}" for message ${item.message_id}`); 169 return null; 170 } 171 172 // Guard: never change approval_status on already-sent messages (race condition protection) 173 if (decision === 'approve') { 174 const r = await client.query( 175 "UPDATE messages SET approval_status = 'approved' WHERE id = $1 AND sent_at IS NULL", 176 [item.message_id] 177 ); 178 if (r.rowCount === 0) { 179 logger.warn(`Skipped approve for message ${item.message_id}: already sent`); 180 return null; 181 } 182 logger.info(`Approved message ${item.message_id}`); 183 } else if (decision === 'rework') { 184 // Circuit breaker: after 3 rework cycles, park the message instead of looping. 185 // Prevents pending↔rework cycles from blocking Gate 2 indefinitely. 186 const MAX_PROOFREAD_ATTEMPTS = 3; 187 const current = await client.query( 188 'SELECT proofread_attempts FROM messages WHERE id = $1', 189 [item.message_id] 190 ); 191 const attempts = (current.rows[0]?.proofread_attempts || 0) + 1; 192 193 if (attempts >= MAX_PROOFREAD_ATTEMPTS) { 194 await client.query( 195 "UPDATE messages SET approval_status = 'parked', proofread_attempts = $1, rework_instructions = $2 WHERE id = $3 AND sent_at IS NULL", 196 [attempts, `Auto-parked after ${attempts} rework cycles: ${item.rework_instructions || ''}`, item.message_id] 197 ); 198 logger.warn(`Parked message ${item.message_id} after ${attempts} rework cycles (circuit breaker)`); 199 return { stored: true, parked: true }; 200 } 201 202 const r = await client.query( 203 "UPDATE messages SET approval_status = 'rework', proofread_attempts = $1, rework_instructions = $2 WHERE id = $3 AND sent_at IS NULL", 204 [attempts, item.rework_instructions || 'QA: needs rework', item.message_id] 205 ); 206 if (r.rowCount === 0) { 207 logger.warn(`Skipped rework for message ${item.message_id}: already sent`); 208 return null; 209 } 210 logger.info(`Rework message ${item.message_id} (attempt ${attempts}/${MAX_PROOFREAD_ATTEMPTS}): ${item.rework_instructions}`); 211 } else if (decision === 'reject') { 212 const r = await client.query( 213 "UPDATE messages SET approval_status = 'rejected', rework_instructions = $1 WHERE id = $2 AND sent_at IS NULL", 214 [item.reject_reason || 'QA: rejected', item.message_id] 215 ); 216 if (r.rowCount === 0) { 217 logger.warn(`Skipped reject for message ${item.message_id}: already sent`); 218 return null; 219 } 220 logger.info(`Rejected message ${item.message_id}: ${item.reject_reason}`); 221 } 222 223 return item.message_id; 224 } 225 226 async function storeClassification(client, item) { 227 if (!item.message_id) { 228 logger.warn('Classification missing message_id'); 229 return null; 230 } 231 232 const validIntents = [ 233 'inquiry', 234 'opt-out', 235 'interested', 236 'not-interested', 237 'pricing', 238 'schedule', 239 'unknown', 240 'autoresponder', 241 ]; 242 const validSentiments = ['positive', 'neutral', 'negative', 'objection']; 243 244 const intent = validIntents.includes(item.intent) ? item.intent : 'unknown'; 245 const sentiment = validSentiments.includes(item.sentiment) ? item.sentiment : 'neutral'; 246 247 await client.query( 248 'UPDATE messages SET intent = $1, sentiment = $2 WHERE id = $3', 249 [intent, sentiment, item.message_id] 250 ); 251 252 // If opt-out, mark in opt_outs table 253 if (intent === 'opt-out') { 254 const msgResult = await client.query( 255 'SELECT contact_uri, contact_method FROM messages WHERE id = $1', 256 [item.message_id] 257 ); 258 const msg = msgResult.rows[0]; 259 if (msg) { 260 await client.query( 261 `INSERT INTO opt_outs (phone, email, method, opted_out_at, source) 262 VALUES ( 263 CASE WHEN $2 = 'sms' THEN $1 ELSE NULL END, 264 CASE WHEN $2 = 'email' THEN $1 ELSE NULL END, 265 $2, NOW(), 'llm_classify' 266 ) 267 ON CONFLICT DO NOTHING`, 268 [msg.contact_uri, msg.contact_method] 269 ); 270 } 271 } 272 273 return item.message_id; 274 } 275 276 async function storeName(client, item) { 277 if (!item.site_id || !item.contacts) { 278 logger.warn('Name extraction missing site_id or contacts'); 279 return null; 280 } 281 282 // Update contacts_json with extracted names (filesystem) 283 const contactsRaw = getContactsJsonWithFallback(item.site_id); 284 if (!contactsRaw) return null; 285 286 const contactsData = JSON.parse(contactsRaw); 287 let updated = false; 288 289 for (const extracted of item.contacts) { 290 if (!extracted.name || !extracted.uri) continue; 291 const match = (contactsData.contacts || []).find( 292 c => (c.uri || c.value) === extracted.uri && !c.name 293 ); 294 if (match) { 295 match.name = extracted.name; 296 updated = true; 297 } 298 } 299 300 if (updated) { 301 setContactsJson(item.site_id, JSON.stringify(contactsData)); 302 } 303 304 return item.site_id; 305 } 306 307 async function storeReplyResponse(client, item) { 308 if (!item.site_id) { 309 logger.warn('Reply response missing site_id'); 310 return null; 311 } 312 313 // Don't respond to opt-outs or not-interested, but mark the inbound as handled 314 // so it doesn't recur in every batch (NOT EXISTS check would never clear it otherwise) 315 if (item.skip) { 316 logger.info(`Skipping response for site ${item.site_id}: ${item.skip_reason || 'opted out'}`); 317 const inboundResult = await client.query( 318 `SELECT contact_method, contact_uri FROM messages WHERE site_id=$1 AND direction='inbound' ORDER BY created_at DESC LIMIT 1`, 319 [item.site_id] 320 ); 321 const inboundRow = inboundResult.rows[0]; 322 const skipChannel = item.channel || item.contact_method || inboundRow?.contact_method || 'sms'; 323 const skipUri = item.contact_uri || inboundRow?.contact_uri || ''; 324 await client.query( 325 `INSERT INTO messages (site_id, direction, message_body, message_type, contact_method, contact_uri) 326 VALUES ($1, 'outbound', $2, 'reply_skipped', $3, $4)`, 327 [item.site_id, `[skipped: ${item.skip_reason || 'not interested'}]`, skipChannel, skipUri] 328 ); 329 return null; 330 } 331 332 if (!item.message_body) { 333 logger.warn(`Reply response missing message_body for site ${item.site_id}`); 334 return null; 335 } 336 337 // channel/contact_uri come from the LLM echoing back the inbound data. 338 // Fall back to the most recent inbound for this site if LLM omits them. 339 let contactMethod = item.channel || item.contact_method; 340 let contactUri = item.contact_uri || ''; 341 if (!contactMethod) { 342 const inboundResult = await client.query( 343 `SELECT contact_method, contact_uri FROM messages 344 WHERE site_id = $1 AND direction = 'inbound' 345 ORDER BY created_at DESC LIMIT 1`, 346 [item.site_id] 347 ); 348 const inbound = inboundResult.rows[0]; 349 contactMethod = inbound?.contact_method || 'sms'; 350 if (!contactUri) contactUri = inbound?.contact_uri || ''; 351 } 352 353 if (contactMethod === 'sms' && item.country_code) { 354 contactUri = addCountryCode(contactUri, item.country_code); 355 } 356 357 const result = await client.query( 358 `INSERT INTO messages (site_id, direction, message_body, subject_line, contact_method, contact_uri, message_type, approval_status) 359 VALUES ($1, 'outbound', $2, $3, $4, $5, 'reply', 'approved') 360 RETURNING id`, 361 [item.site_id, item.message_body, item.subject_line || null, contactMethod, contactUri] 362 ); 363 364 return result.rows[0]?.id; 365 } 366 367 const PROJECT_ROOT = join(import.meta.dirname || process.cwd(), '..'); 368 const OVERSEER_LOG = '/tmp/sonnet-overseer.log'; 369 const STATUS_FILE = join(PROJECT_ROOT, 'logs/pipeline-status.txt'); 370 371 function overseerLog(msg) { 372 const line = `[${new Date().toISOString()}] [SonnetOverseer] ${msg}`; 373 logger.info(msg); 374 try { 375 appendFileSync(OVERSEER_LOG, `${line}\n`); 376 } catch { 377 /* ignore */ 378 } 379 try { 380 appendFileSync(STATUS_FILE, `${line}\n`); 381 } catch { 382 /* ignore */ 383 } 384 } 385 386 async function executeOverseerAction(action, client) { 387 const { code, description, params = {} } = action; 388 overseerLog(`ACTION: ${code} — ${description}`); 389 390 switch (code) { 391 case 'RESTART_PIPELINE': { 392 try { 393 execSync('systemctl --user restart 333method-pipeline --no-block', { timeout: 30000 }); 394 overseerLog('Pipeline service restart requested'); 395 await client.query( 396 `INSERT INTO tel.system_health (check_type, status, details, action_taken) 397 VALUES ('sonnet_overseer', 'ok', $1, 'restart_pipeline')`, 398 [JSON.stringify({ reason: description })] 399 ); 400 } catch (err) { 401 overseerLog(`ERROR: Failed to restart pipeline: ${err.message}`); 402 } 403 break; 404 } 405 case 'CLEAR_STALE_TASKS': { 406 const minAgeHours = params.min_age_hours || 2; 407 const cutoff = new Date(Date.now() - minAgeHours * 60 * 60 * 1000).toISOString(); 408 const res = await client.query( 409 `UPDATE tel.agent_tasks SET status='failed', completed_at=CURRENT_TIMESTAMP, 410 result_json='{"reason":"cleared by sonnet overseer - stale blocked/pending task"}' 411 WHERE status IN ('blocked','pending') AND created_at < $1`, 412 [cutoff] 413 ); 414 const runningCutoff = new Date(Date.now() - 2 * 60 * 60 * 1000).toISOString(); 415 const res2 = await client.query( 416 `UPDATE tel.agent_tasks SET status='failed', completed_at=CURRENT_TIMESTAMP, 417 result_json='{"reason":"cleared by sonnet overseer - stuck in running state >2h"}' 418 WHERE status='running' AND started_at < $1`, 419 [runningCutoff] 420 ); 421 overseerLog( 422 `Cleared ${res.rowCount} stale blocked/pending + ${res2.rowCount} stuck-running agent tasks` 423 ); 424 await client.query( 425 `INSERT INTO tel.system_health (check_type, status, details, action_taken) 426 VALUES ('sonnet_overseer', 'ok', $1, 'clear_stale_tasks')`, 427 [JSON.stringify({ cleared: res.rowCount, min_age_hours: minAgeHours })] 428 ); 429 break; 430 } 431 case 'RESET_STUCK_SITES': { 432 const { stage } = params; 433 const MIN_HOURS_FLOOR = 4; 434 const minHours = Math.max(params.min_stuck_hours ?? MIN_HOURS_FLOOR, MIN_HOURS_FLOOR); 435 if (!stage) { 436 overseerLog('RESET_STUCK_SITES: missing stage param, skipping'); 437 break; 438 } 439 const priorStage = { 440 assets_captured: 'found', 441 prog_scored: 'assets_captured', 442 semantic_scored: 'prog_scored', 443 vision_scored: 'prog_scored', 444 // enriched intentionally omitted — enriched is a deliberate queue for evidence 445 // collection (gather_evidence cron). Resetting to semantic_scored causes loop. 446 }; 447 const prevStage = priorStage[stage]; 448 if (!prevStage) { 449 overseerLog(`RESET_STUCK_SITES: unknown stage '${stage}', skipping`); 450 break; 451 } 452 const cutoff = new Date(Date.now() - minHours * 60 * 60 * 1000).toISOString(); 453 const excludedResult = await client.query( 454 `SELECT id FROM sites WHERE recapture_count >= 2 AND status = $1`, 455 [stage] 456 ); 457 const excludeIds = excludedResult.rows.map(r => r.id); 458 const clearAssets = stage === 'assets_captured'; 459 460 // When clearing assets, also delete HTML files from filesystem 461 const errorMsg = `reset by sonnet overseer - stuck >${minHours}h`; 462 if (clearAssets) { 463 let affectedResult; 464 if (excludeIds.length > 0) { 465 affectedResult = await client.query( 466 `SELECT id FROM sites WHERE status=$1 AND updated_at < $2 AND NOT (id = ANY($3))`, 467 [stage, cutoff, excludeIds] 468 ); 469 } else { 470 affectedResult = await client.query( 471 `SELECT id FROM sites WHERE status=$1 AND updated_at < $2`, 472 [stage, cutoff] 473 ); 474 } 475 for (const row of affectedResult.rows) { 476 deleteHtmlDom(row.id); 477 } 478 } 479 480 let result; 481 if (excludeIds.length > 0) { 482 if (clearAssets) { 483 result = await client.query( 484 `UPDATE sites SET status=$1, updated_at=NOW(), 485 error_message=$2, 486 recapture_count=recapture_count+1, html_dom=NULL, screenshot_path=NULL 487 WHERE status=$3 AND updated_at < $4 AND NOT (id = ANY($5))`, 488 [prevStage, errorMsg, stage, cutoff, excludeIds] 489 ); 490 } else { 491 result = await client.query( 492 `UPDATE sites SET status=$1, updated_at=NOW(), 493 error_message=$2, 494 recapture_count=recapture_count+1 495 WHERE status=$3 AND updated_at < $4 AND NOT (id = ANY($5))`, 496 [prevStage, errorMsg, stage, cutoff, excludeIds] 497 ); 498 } 499 } else { 500 if (clearAssets) { 501 result = await client.query( 502 `UPDATE sites SET status=$1, updated_at=NOW(), 503 error_message=$2, 504 recapture_count=recapture_count+1, html_dom=NULL, screenshot_path=NULL 505 WHERE status=$3 AND updated_at < $4`, 506 [prevStage, errorMsg, stage, cutoff] 507 ); 508 } else { 509 result = await client.query( 510 `UPDATE sites SET status=$1, updated_at=NOW(), 511 error_message=$2, 512 recapture_count=recapture_count+1 513 WHERE status=$3 AND updated_at < $4`, 514 [prevStage, errorMsg, stage, cutoff] 515 ); 516 } 517 } 518 overseerLog( 519 `Reset ${result.rowCount} sites stuck at '${stage}' → '${prevStage}' (skipped ${excludeIds.length} anti-loop)` 520 ); 521 await client.query( 522 `INSERT INTO tel.system_health (check_type, status, details, action_taken) 523 VALUES ('sonnet_overseer', 'ok', $1, 'reset_stuck_sites')`, 524 [JSON.stringify({ 525 stage, 526 prev_stage: prevStage, 527 reset: result.rowCount, 528 skipped_anti_loop: excludeIds.length, 529 })] 530 ); 531 break; 532 } 533 case 'LOG_ONLY': 534 overseerLog(`NOTE: ${params.note || description}`); 535 break; 536 default: 537 overseerLog(`WARN: Unknown action code '${code}', skipping`); 538 } 539 } 540 541 async function storeOverseerResult(client, item) { 542 // item = { summary, severity, findings, actions } 543 if (!item.summary) { 544 logger.warn('Overseer result missing summary'); 545 return null; 546 } 547 548 overseerLog(`Analysis: severity=${item.severity} | ${item.summary}`); 549 overseerLog(`Findings: ${item.findings?.length || 0}, Actions: ${item.actions?.length || 0}`); 550 551 let actionsTaken = 0; 552 for (const action of item.actions || []) { 553 try { 554 await executeOverseerAction(action, client); 555 if (action.code !== 'LOG_ONLY') actionsTaken++; 556 } catch (err) { 557 overseerLog(`ERROR: Action ${action.code} failed: ${err.message}`); 558 } 559 } 560 561 // Write report to status file 562 const report = [ 563 `\n${'='.repeat(70)}`, 564 `SONNET OVERSEER — ${new Date().toISOString()} [${(item.severity || 'unknown').toUpperCase()}]`, 565 `Summary: ${item.summary}`, 566 '', 567 `Findings: ${(item.findings || []).map(f => `[${f.severity}] ${f.issue}`).join(' | ') || 'none'}`, 568 `Actions taken: ${actionsTaken}`, 569 `${'='.repeat(70)}`, 570 ].join('\n'); 571 try { 572 appendFileSync(STATUS_FILE, `${report}\n`); 573 } catch { 574 /* ignore */ 575 } 576 577 return actionsTaken; 578 } 579 580 async function storeErrorClassification(client, item) { 581 // item = { pattern, label, group, context, example_errors, occurrence_count } 582 const { pattern, label, group, context, example_errors, occurrence_count } = item; 583 584 if ( 585 !pattern || 586 !label || 587 !['terminal', 'retriable'].includes(group) || 588 !['site', 'outreach'].includes(context) 589 ) { 590 logger.warn(`Skipping invalid error proposal: ${JSON.stringify(item)}`); 591 return null; 592 } 593 594 try { 595 new RegExp(pattern); 596 } catch { 597 logger.warn(`Skipping invalid regex "${pattern}"`); 598 return null; 599 } 600 601 // Avoid duplicates 602 const existing = await client.query( 603 `SELECT id FROM error_pattern_proposals WHERE pattern = $1 AND context = $2 AND status = 'pending'`, 604 [pattern, context] 605 ); 606 if (existing.rows[0]) return null; 607 608 const examples = Array.isArray(example_errors) ? example_errors.slice(0, 5) : []; 609 const result = await client.query( 610 `INSERT INTO error_pattern_proposals (pattern, label, group_name, context, example_errors, occurrence_count) 611 VALUES ($1, $2, $3, $4, $5, $6) 612 RETURNING id`, 613 [pattern, label, group, context, JSON.stringify(examples), occurrence_count || 0] 614 ); 615 616 return result.rows[0]?.id; 617 } 618 619 /** 620 * Store semantic scoring results from Haiku. 621 * Merges LLM-assessed semantic factor scores (headline, value prop, USP) 622 * into the existing programmatic score_json and recomputes the final score. 623 */ 624 async function storeSemanticScore(client, item) { 625 const siteId = item.site_id; 626 if (!siteId) return null; 627 628 const siteResult = await client.query('SELECT score FROM sites WHERE id = $1', [siteId]); 629 const site = siteResult.rows[0]; 630 const scoreRaw = getScoreJsonWithFallback(siteId); 631 if (!scoreRaw) { 632 // score_json lost in migration — advance semantic_scored → enriched using existing DB score 633 const existingScore = site?.score; 634 if (existingScore === null || existingScore === undefined) return null; 635 const scoreThreshold = parseInt(process.env.LOW_SCORE_CUTOFF || '82', 10); 636 const isHighScore = existingScore > scoreThreshold; 637 await client.query( 638 `UPDATE sites SET 639 status = CASE WHEN status = 'semantic_scored' THEN $1 ELSE status END, 640 updated_at = CURRENT_TIMESTAMP 641 WHERE id = $2`, 642 [isHighScore ? 'high_score' : 'enriched', siteId] 643 ); 644 return siteId; 645 } 646 647 const scoreJson = JSON.parse(scoreRaw); 648 if (!scoreJson.factor_scores) return null; 649 650 // Merge Haiku's semantic scores into factor_scores 651 const semanticFactors = ['headline_quality', 'value_proposition', 'unique_selling_proposition']; 652 for (const factor of semanticFactors) { 653 if (item[factor] && typeof item[factor].score === 'number') { 654 scoreJson.factor_scores[factor] = { 655 score: Math.min(Math.max(item[factor].score, 0), 10), 656 reasoning: item[factor].reasoning || 'LLM semantic assessment', 657 evidence: scoreJson.factor_scores[factor]?.evidence || '', 658 source: 'haiku_semantic', 659 }; 660 } 661 } 662 663 // Recompute weighted score with vision-aware weights 664 const visionEnabled = process.env.ENABLE_VISION !== 'false'; 665 const newScore = computeWeightedScore(scoreJson.factor_scores, visionEnabled); 666 const newGrade = computeGrade(newScore); 667 668 // Mark as semantically scored 669 scoreJson.semantic_scored = true; 670 scoreJson.scoring_method = 'hybrid'; 671 scoreJson.conversion_score = newScore; 672 scoreJson.letter_grade = newGrade; 673 674 // Advance status: 675 // prog_scored → semantic_scored (or high_score) — vision-on path 676 // semantic_scored → enriched (or high_score) — vision-off path (LLM overlay done) 677 const scoreThreshold = parseInt(process.env.LOW_SCORE_CUTOFF || '82', 10); 678 const isHighScore = newScore !== null && newScore > scoreThreshold; 679 const newStatusFromProg = isHighScore ? 'high_score' : 'semantic_scored'; 680 const newStatusFromSemantic = isHighScore ? 'high_score' : 'enriched'; 681 682 setScoreJson(siteId, scoreJson); 683 await client.query( 684 `UPDATE sites 685 SET score = $1, grade = $2, 686 status = CASE 687 WHEN status = 'prog_scored' THEN $3 688 WHEN status = 'semantic_scored' THEN $4 689 ELSE status 690 END, 691 rescored_at = CURRENT_TIMESTAMP, 692 updated_at = CURRENT_TIMESTAMP 693 WHERE id = $5`, 694 [newScore, newGrade, newStatusFromProg, newStatusFromSemantic, siteId] 695 ); 696 697 return siteId; 698 } 699 700 /** 701 * Store scoring result from the orchestrator's score_sites batch. 702 * Replicates the site-classification and DB-update logic from src/stages/scoring.js:scoreSite(). 703 * item = { site_id, domain, factor_scores, overall_calculation, contact_details, ... } 704 */ 705 async function storeScoreResult(client, item) { 706 const siteId = item.site_id; 707 if (!siteId) { 708 logger.warn('Score result missing site_id'); 709 return null; 710 } 711 712 // Validate factor_scores are present — incomplete responses are unusable 713 if (!item.factor_scores) { 714 logger.warn(`Score result for site ${siteId} missing factor_scores — skipping`); 715 return null; 716 } 717 718 // Compute score and grade programmatically from factor scores (same as scoreWebsite()) 719 const computedScore = computeScoreFromFactors(item.factor_scores); 720 const computedGrade = computeGrade(computedScore); 721 722 // Ensure overall_calculation has the computed values 723 if (!item.overall_calculation) item.overall_calculation = {}; 724 item.overall_calculation.conversion_score = computedScore; 725 item.overall_calculation.letter_grade = computedGrade; 726 727 const grade = computedGrade; 728 const score = computedScore; 729 730 // Domain-level blocklist check (same logic as scoring stage) 731 const siteResult = await client.query( 732 'SELECT domain, country_code FROM sites WHERE id = $1', 733 [siteId] 734 ); 735 const site = siteResult.rows[0]; 736 if (!site) { 737 logger.warn(`Score result: site ${siteId} not found in DB`); 738 return null; 739 } 740 741 const blocked = checkBlocklist(site.domain, site.country_code); 742 if (blocked) { 743 await client.query( 744 `UPDATE sites SET status = 'ignored', error_message = $1 WHERE id = $2`, 745 [blocked.reason, siteId] 746 ); 747 logger.info(`Score: ${site.domain} blocklisted — ${blocked.reason}`); 748 return siteId; 749 } 750 751 const industry = classifyIndustry(site.domain); 752 if (industry) { 753 const reason = 754 industry.type === 'legal' 755 ? `Ignored: Legal site detected from domain (${industry.reason})` 756 : `Ignored: Regulated industry (${industry.type}) detected from domain (${industry.reason})`; 757 await client.query( 758 `UPDATE sites SET status = 'ignored', error_message = $1 WHERE id = $2`, 759 [reason, siteId] 760 ); 761 logger.info(`Score: ${site.domain} regulated industry — ${reason}`); 762 return siteId; 763 } 764 765 // Helper: dual-write score_json to both DB and filesystem 766 const scoreJsonStr = JSON.stringify(item); 767 768 // LLM-detected classifications (directory, local business, law firm, regulated industry) 769 const isDirectory = item.overall_calculation?.is_business_directory; 770 if (isDirectory) { 771 setScoreJson(siteId, scoreJsonStr); 772 await client.query( 773 `UPDATE sites SET status = 'ignored', error_message = $1 WHERE id = $2`, 774 ['Ignored: Business directory (LLM detected)', siteId] 775 ); 776 logger.info(`Score: ${site.domain} is a business directory — ignored`); 777 return siteId; 778 } 779 780 const isLocalBusiness = item.overall_calculation?.is_local_business; 781 if (isLocalBusiness === false) { 782 setScoreJson(siteId, scoreJsonStr); 783 await client.query( 784 `UPDATE sites SET status = 'ignored', error_message = $1 WHERE id = $2`, 785 ['Ignored: Not a local business (LLM detected)', siteId] 786 ); 787 logger.info(`Score: ${site.domain} is not a local business — ignored`); 788 return siteId; 789 } 790 791 const isLawFirm = item.overall_calculation?.is_law_firm; 792 if (isLawFirm) { 793 setScoreJson(siteId, scoreJsonStr); 794 await client.query( 795 `UPDATE sites SET status = 'ignored', error_message = $1 WHERE id = $2`, 796 ['Ignored: Law firm (LLM detected)', siteId] 797 ); 798 logger.info(`Score: ${site.domain} is a law firm — ignored`); 799 return siteId; 800 } 801 802 const industryClassification = 803 item.overall_calculation?.industry_classification?.toLowerCase() || ''; 804 const regulatedPatterns = [ 805 'dental', 806 'medical', 807 'clinic', 808 'hospital', 809 'pharmacy', 810 'veterinary', 811 'financial advis', 812 'mortgage broker', 813 'accounting', 814 ]; 815 const isRegulated = regulatedPatterns.some(p => industryClassification.includes(p)); 816 if (isRegulated) { 817 setScoreJson(siteId, scoreJsonStr); 818 await client.query( 819 `UPDATE sites SET status = 'ignored', error_message = $1 WHERE id = $2`, 820 [`Ignored: Regulated industry (${industryClassification})`, siteId] 821 ); 822 logger.info( 823 `Score: ${site.domain} is regulated industry (${industryClassification}) — ignored` 824 ); 825 return siteId; 826 } 827 828 // Error page detection 829 const isErrorPage = item.overall_calculation?.is_error_page; 830 const errorType = item.overall_calculation?.error_type; 831 const errorDescription = item.overall_calculation?.error_description; 832 if (isErrorPage && errorType) { 833 const permanentErrors = ['404', '403', '410']; 834 if (permanentErrors.includes(errorType)) { 835 setScoreJson(siteId, scoreJsonStr); 836 await client.query( 837 `UPDATE sites SET status = 'ignored', error_message = $1 WHERE id = $2`, 838 [errorDescription || `Permanent error: ${errorType}`, siteId] 839 ); 840 logger.info(`Score: ${site.domain} is a ${errorType} error page — ignored`); 841 return siteId; 842 } 843 // Temporary errors — keep at assets_captured for retry 844 const temporaryErrors = ['5xx', 'maintenance', 'redirect']; 845 if (temporaryErrors.includes(errorType)) { 846 setScoreJson(siteId, scoreJsonStr); 847 await client.query( 848 `UPDATE sites SET status = 'assets_captured', error_message = $1 WHERE id = $2`, 849 [errorDescription || `Temporary error: ${errorType}`, siteId] 850 ); 851 logger.info(`Score: ${site.domain} has temporary error ${errorType} — will retry`); 852 return siteId; 853 } 854 } 855 856 // Broken site detection 857 const isBrokenSite = item.overall_calculation?.is_broken_site || false; 858 const brokenSiteDetails = item.overall_calculation?.broken_site_details || []; 859 if (isBrokenSite) { 860 const siteRowResult = await client.query( 861 'SELECT recapture_count FROM sites WHERE id = $1', 862 [siteId] 863 ); 864 const recaptureCount = (siteRowResult.rows[0]?.recapture_count || 0) + 1; 865 setScoreJson(siteId, scoreJsonStr); 866 if (recaptureCount > 3) { 867 await client.query( 868 `UPDATE sites SET status = 'ignored', error_message = $1 WHERE id = $2`, 869 [`Max recapture attempts reached (3) - Issues: ${brokenSiteDetails.join(', ')}`, siteId] 870 ); 871 return siteId; 872 } 873 await client.query( 874 `UPDATE sites SET status = 'assets_captured', error_message = $1, recapture_count = $2, 875 recapture_at = NOW() + INTERVAL '7 days' WHERE id = $3`, 876 [ 877 `Broken site detected (attempt ${recaptureCount}/3) - Issues: ${brokenSiteDetails.join(', ')}`, 878 recaptureCount, 879 siteId, 880 ] 881 ); 882 return siteId; 883 } 884 885 // Extract location 886 const city = item.overall_calculation?.city || null; 887 const countryCode = item.overall_calculation?.country_code || null; 888 const state = item.overall_calculation?.state || null; 889 890 // Determine target status (same logic as scoring.js) 891 const scoreThreshold = parseInt(process.env.LOW_SCORE_CUTOFF || '82', 10); 892 // HTML-only mode: skip straight to semantic_scored (rescoring is a no-op without screenshots) 893 const enableVision = process.env.ENABLE_VISION !== 'false'; 894 const newStatus = 895 score !== null && score > scoreThreshold 896 ? 'high_score' 897 : enableVision 898 ? 'prog_scored' 899 : 'semantic_scored'; 900 901 // When vision disabled, save contact_details to contacts_json filesystem 902 if (!enableVision && item.contact_details) { 903 setContactsJson(siteId, JSON.stringify(item.contact_details)); 904 } 905 906 // Write score to filesystem; DB no longer stores score_json/contacts_json blobs 907 setScoreJson(siteId, scoreJsonStr); 908 909 await client.query( 910 `UPDATE sites SET 911 grade = $1, score = $2, 912 city = $3, country_code = $4, state = $5, 913 status = $6, 914 scored_at = CURRENT_TIMESTAMP, 915 rescored_at = CASE WHEN $7 = 'semantic_scored' THEN CURRENT_TIMESTAMP ELSE rescored_at END, 916 error_message = NULL 917 WHERE id = $8`, 918 [grade, score, city, countryCode, state, newStatus, newStatus, siteId] 919 ); 920 921 await resetRetries(siteId); 922 923 logger.info(`Score stored: site ${siteId} (${site.domain}) → ${grade} (${score}) → ${newStatus}`); 924 return siteId; 925 } 926 927 /** 928 * Clean phone numbers from LLM enrichment output — normalizes to E.164 format. 929 * @param {Array<Object|string>} phones - Phone numbers from LLM 930 * @returns {Array<Object>} Normalized phone numbers 931 */ 932 function cleanEnrichPhones(phones) { 933 if (!Array.isArray(phones)) return []; 934 return phones 935 .map(phone => { 936 if (typeof phone === 'string') { 937 return { number: normalizePhoneNumber(phone), label: '' }; 938 } 939 if (phone && phone.number) { 940 return { number: normalizePhoneNumber(phone.number), label: phone.label || '' }; 941 } 942 return null; 943 }) 944 .filter(Boolean); 945 } 946 947 /** 948 * Store enrichment result from the orchestrator's enrich_sites batch. 949 * This handles extractInitialContacts-equivalent: sets/updates contacts_json from HTML analysis. 950 * The pipeline's enrich stage still handles browser navigation for key_pages. 951 * item = { site_id, domain, business_name, city, country_code, state, email_addresses, 952 * phone_numbers, social_profiles, key_pages, primary_contact_form } 953 */ 954 async function storeEnrichResult(client, item) { 955 const siteId = item.site_id; 956 if (!siteId) { 957 logger.warn('Enrich result missing site_id'); 958 return null; 959 } 960 961 const siteResult = await client.query( 962 'SELECT country_code, status FROM sites WHERE id = $1', 963 [siteId] 964 ); 965 const site = siteResult.rows[0]; 966 if (!site) { 967 logger.warn(`Enrich result: site ${siteId} not found in DB`); 968 return null; 969 } 970 971 // Build enriched contacts_json from LLM result (filesystem first) 972 const contactsRaw = getContactsJsonWithFallback(siteId); 973 let contactsJson = contactsRaw ? JSON.parse(contactsRaw) : {}; 974 975 // Merge LLM-extracted data into existing contacts_json 976 // Prefer LLM values for business_name/city/country_code/state (previously missing) 977 if (item.business_name && !contactsJson.business_name) { 978 contactsJson.business_name = item.business_name; 979 } 980 if (item.city && !contactsJson.city) { 981 contactsJson.city = item.city; 982 } 983 if (item.country_code && !contactsJson.country_code) { 984 contactsJson.country_code = item.country_code; 985 } 986 if (item.state && !contactsJson.state) { 987 contactsJson.state = item.state; 988 } 989 990 // Merge arrays (avoid duplicates by URI/email/number) 991 if (Array.isArray(item.email_addresses)) { 992 const existing = new Set( 993 (contactsJson.email_addresses || []).map(e => (typeof e === 'string' ? e : e.email)) 994 ); 995 const newEmails = item.email_addresses.filter(e => { 996 const addr = typeof e === 'string' ? e : e.email; 997 return addr && !existing.has(addr); 998 }); 999 contactsJson.email_addresses = [...(contactsJson.email_addresses || []), ...newEmails]; 1000 } 1001 1002 if (Array.isArray(item.phone_numbers)) { 1003 const cleaned = cleanEnrichPhones(item.phone_numbers); 1004 const existing = new Set( 1005 (contactsJson.phone_numbers || []).map(p => (typeof p === 'string' ? p : p.number)) 1006 ); 1007 const newPhones = cleaned.filter(p => p.number && !existing.has(p.number)); 1008 contactsJson.phone_numbers = [...(contactsJson.phone_numbers || []), ...newPhones]; 1009 } 1010 1011 if (Array.isArray(item.social_profiles)) { 1012 const existing = new Set( 1013 (contactsJson.social_profiles || []).map(s => (typeof s === 'string' ? s : s.url)) 1014 ); 1015 const newSocial = item.social_profiles.filter(s => { 1016 const url = typeof s === 'string' ? s : s.url; 1017 return url && !existing.has(url); 1018 }); 1019 contactsJson.social_profiles = [...(contactsJson.social_profiles || []), ...newSocial]; 1020 } 1021 1022 if (Array.isArray(item.key_pages)) { 1023 const existing = new Set(contactsJson.key_pages || []); 1024 const newPages = item.key_pages.filter(p => p && !existing.has(p)); 1025 contactsJson.key_pages = [...(contactsJson.key_pages || []), ...newPages]; 1026 } 1027 1028 if (item.primary_contact_form && !contactsJson.primary_contact_form) { 1029 contactsJson.primary_contact_form = item.primary_contact_form; 1030 } 1031 1032 // Clean invalid social links 1033 contactsJson = cleanInvalidSocialLinks(contactsJson); 1034 1035 const city = contactsJson.city || null; 1036 const countryCode = contactsJson.country_code || null; 1037 const state = contactsJson.state || null; 1038 1039 // enrichment_pass is set in the input batch but not echoed back in the LLM output JSON. 1040 // Infer from DB status when the field is absent (enriched_regex = LLM pass pending). 1041 const isLlmPass = 1042 item.enrichment_pass === 'llm' || 1043 (item.enrichment_pass === undefined && site.status === 'enriched_regex'); 1044 1045 // Write contacts to filesystem; store sentinel in DB to preserve IS NOT NULL gating 1046 setContactsJson(siteId, JSON.stringify(contactsJson)); 1047 1048 if (isLlmPass) { 1049 // LLM pass over key_pages_html — set enriched_llm, clear key_pages_html to reclaim space 1050 deleteKeyPagesHtml(siteId); 1051 await client.query( 1052 `UPDATE sites SET 1053 city = COALESCE($1, city), 1054 country_code = COALESCE($2, country_code), 1055 state = COALESCE($3, state), 1056 status = 'enriched_llm', 1057 enriched_at = CURRENT_TIMESTAMP, 1058 key_pages_html = NULL, 1059 updated_at = CURRENT_TIMESTAMP 1060 WHERE id = $4`, 1061 [city, countryCode, state, siteId] 1062 ); 1063 } else { 1064 // Initial pass from html_dom — status stays at 'semantic_scored'/'vision_scored' so the 1065 // pipeline enrich stage can still run browser navigation on key_pages 1066 await client.query( 1067 `UPDATE sites SET 1068 city = COALESCE($1, city), 1069 country_code = COALESCE($2, country_code), 1070 state = COALESCE($3, state), 1071 updated_at = CURRENT_TIMESTAMP 1072 WHERE id = $4`, 1073 [city, countryCode, state, siteId] 1074 ); 1075 } 1076 1077 logger.info( 1078 `Enrich stored: site ${siteId} (${item.domain}) — emails:${contactsJson.email_addresses?.length || 0} phones:${contactsJson.phone_numbers?.length || 0} social:${contactsJson.social_profiles?.length || 0}` 1079 ); 1080 return siteId; 1081 } 1082 1083 /** 1084 * Store monitor_health result — logs findings, no DB writes. 1085 * item = { severity, summary, findings[], recommended_actions[] } 1086 */ 1087 async function storeMonitorHealthResult(client, item) { 1088 if (!item.summary) { 1089 logger.warn('monitor_health result missing summary'); 1090 return null; 1091 } 1092 logger.info(`Health check [${item.severity || 'info'}]: ${item.summary}`); 1093 for (const f of item.findings || []) { 1094 logger.info(` [${f.severity}] ${f.component}: ${f.issue}`); 1095 } 1096 for (const a of item.recommended_actions || []) { 1097 logger.info(` ACTION: ${a}`); 1098 } 1099 return item.summary; 1100 } 1101 1102 /** 1103 * Store triage_errors result — creates fix tasks for critical/high errors. 1104 * item = { error_message, type, severity, action, summary, suggested_fix } 1105 */ 1106 async function storeTriageResult(client, item) { 1107 if (!item.error_message || !item.type) { 1108 logger.warn('triage_errors result missing required fields'); 1109 return null; 1110 } 1111 1112 logger.info(`Triage [${item.severity}/${item.type}]: ${item.summary}`); 1113 1114 if (item.action !== 'create_fix_task') return item.error_message; 1115 1116 // Only create tasks for high/critical 1117 if (!['critical', 'high'].includes(item.severity)) return item.error_message; 1118 1119 const priority = item.severity === 'critical' ? 9 : 7; 1120 await client.query( 1121 `INSERT INTO tel.agent_tasks 1122 (task_type, assigned_to, created_by, priority, status, context_json) 1123 VALUES ('triage_fix', 'developer', 'incident_commander', $1, 'pending', $2)`, 1124 [ 1125 priority, 1126 JSON.stringify({ 1127 error_message: item.error_message, 1128 error_type: item.type, 1129 severity: item.severity, 1130 summary: item.summary, 1131 suggested_fix: item.suggested_fix || null, 1132 }), 1133 ] 1134 ); 1135 1136 return item.error_message; 1137 } 1138 1139 /** 1140 * Store check_docs result — logs stale docs findings, no DB writes. 1141 * item = { file_path, stale_docs[], summary } 1142 */ 1143 async function storeCheckDocsResult(client, item) { 1144 if (!item.file_path) { 1145 logger.warn('check_docs result missing file_path'); 1146 return null; 1147 } 1148 const count = Array.isArray(item.stale_docs) ? item.stale_docs.length : 0; 1149 if (count === 0) { 1150 logger.info(`check_docs: ${item.file_path} — docs current`); 1151 return item.file_path; 1152 } 1153 logger.info(`check_docs: ${item.file_path} — ${count} stale doc(s)`); 1154 for (const d of item.stale_docs) { 1155 logger.info(` [${d.location}] ${d.issue}`); 1156 } 1157 return item.file_path; 1158 } 1159 1160 /** 1161 * Store code review findings as agent_tasks for later fixing. 1162 * item = { file_path, findings: [{severity, category, line, description, suggestion}], summary } 1163 * 1164 * Each finding with severity >= 7 becomes a code_review_fix task. 1165 * Findings with severity <= 6 are stored but skipped (too low priority to auto-fix). 1166 */ 1167 async function storeCodeReviewFindings(client, item) { 1168 if (!item.file_path) { 1169 logger.warn('Code review result missing file_path'); 1170 return null; 1171 } 1172 1173 const findings = Array.isArray(item.findings) ? item.findings : []; 1174 1175 if (findings.length === 0) { 1176 logger.info(`Code review: ${item.file_path} — no findings`); 1177 return item.file_path; // Counts as stored (reviewed with no issues) 1178 } 1179 1180 let tasksCreated = 0; 1181 1182 for (const finding of findings) { 1183 const severity = parseInt(finding.severity || '5', 10); 1184 1185 // Only create fix tasks for bugs and security issues (severity >= 7) 1186 if (severity < 7) { 1187 logger.info( 1188 `Code review: ${item.file_path}:${finding.line || '?'} severity=${severity} — logged, not queued` 1189 ); 1190 continue; 1191 } 1192 1193 await client.query( 1194 `INSERT INTO tel.agent_tasks 1195 (task_type, assigned_to, created_by, priority, status, context_json) 1196 VALUES ('code_review_fix', 'developer', 'code_reviewer', $1, 'pending', $2)`, 1197 [ 1198 severity, 1199 JSON.stringify({ 1200 file_path: item.file_path, 1201 line: finding.line || null, 1202 category: finding.category || 'unknown', 1203 description: finding.description, 1204 suggestion: finding.suggestion || null, 1205 severity, 1206 }), 1207 ] 1208 ); 1209 tasksCreated++; 1210 } 1211 1212 logger.info( 1213 `Code review: ${item.file_path} — ${findings.length} findings, ${tasksCreated} fix tasks created` 1214 ); 1215 return item.file_path; 1216 } 1217 1218 /** 1219 * Store the Haiku evidence-merge result. 1220 * 1221 * item = { 1222 * site_id, 1223 * revised_scores: { factor: { original, revised, reason } }, 1224 * additional_findings: [{ factor, finding, severity }], 1225 * evidence_summary: string 1226 * } 1227 * 1228 * Writes the full merged object to sites.evidence_json. 1229 * Also patches individual factor scores in sites.score_json where revisions exist. 1230 */ 1231 async function storeEvidenceMerge(client, item) { 1232 if (!item.site_id) { 1233 logger.warn('Evidence merge result missing site_id'); 1234 return null; 1235 } 1236 1237 const siteExistsResult = await client.query( 1238 'SELECT id FROM sites WHERE id = $1', 1239 [item.site_id] 1240 ); 1241 if (!siteExistsResult.rows[0]) { 1242 logger.warn(`Evidence merge: site ${item.site_id} not found`); 1243 return null; 1244 } 1245 1246 // Patch score_json with revised scores (filesystem) 1247 const revisedScores = item.revised_scores || {}; 1248 const revisionCount = Object.keys(revisedScores).length; 1249 const scoreRaw = getScoreJsonWithFallback(item.site_id); 1250 1251 if (revisionCount > 0 && scoreRaw) { 1252 try { 1253 const scoreData = JSON.parse(scoreRaw); 1254 for (const [factor, revision] of Object.entries(revisedScores)) { 1255 if (scoreData.factor_scores && scoreData.factor_scores[factor] !== undefined) { 1256 scoreData.factor_scores[factor].score = revision.revised; 1257 scoreData.factor_scores[factor].evidence_revised = true; 1258 scoreData.factor_scores[factor].evidence_reason = revision.reason; 1259 } 1260 } 1261 // Write patched score to filesystem only; DB no longer stores score_json 1262 setScoreJson(item.site_id, JSON.stringify(scoreData)); 1263 } catch (e) { 1264 logger.warn( 1265 `Evidence merge: failed to patch score_json for site ${item.site_id}: ${e.message}` 1266 ); 1267 } 1268 } 1269 1270 // Write full evidence_json 1271 await client.query( 1272 'UPDATE sites SET evidence_json = $1 WHERE id = $2', 1273 [ 1274 JSON.stringify({ 1275 revised_scores: revisedScores, 1276 additional_findings: item.additional_findings || [], 1277 evidence_summary: item.evidence_summary || '', 1278 merged_at: new Date().toISOString(), 1279 }), 1280 item.site_id, 1281 ] 1282 ); 1283 1284 const additionalCount = (item.additional_findings || []).length; 1285 logger.info( 1286 `Evidence merge: site ${item.site_id} — ${revisionCount} score revisions, ${additionalCount} additional findings` 1287 ); 1288 if (item.evidence_summary) { 1289 logger.info(`Evidence summary: ${item.evidence_summary}`); 1290 } 1291 1292 return item.site_id; 1293 } 1294 1295 /** 1296 * Store a follow-up message generated by the LLM. 1297 * Each result item contains: site_id, contact_method, contact_uri, message_body, 1298 * subject_line (email only), sequence_step, country_code. 1299 * 1300 * Similar to storeProposal but uses message_type='followupN' and includes sequence_step. 1301 */ 1302 async function storeFollowupGenerate(client, item) { 1303 const contactMethod = item.contact_method || 'email'; 1304 let contactUri = item.contact_uri || ''; 1305 const sequenceStep = item.sequence_step || item.next_step || 2; 1306 1307 if (!contactUri || !item.site_id) { 1308 logger.warn(`Followup missing contact_uri or site_id`); 1309 return null; 1310 } 1311 1312 // Normalize phone numbers 1313 if (contactMethod === 'sms' && item.country_code) { 1314 contactUri = addCountryCode(contactUri, item.country_code); 1315 } 1316 1317 // GDPR check 1318 let approvalStatus = 'pending'; 1319 if (contactMethod === 'email' && item.country_code) { 1320 const country = getCountryByCode(item.country_code); 1321 if (country?.requiresGDPRCheck) { 1322 approvalStatus = 'gdpr_blocked'; 1323 } 1324 } 1325 1326 // Skip blocked emails 1327 if (contactMethod === 'email') { 1328 if (isGovernmentEmail(contactUri) || isEducationEmail(contactUri) || isDemoEmail(contactUri)) { 1329 logger.info(`Skipping blocked email for followup: ${contactUri}`); 1330 return null; 1331 } 1332 } 1333 1334 // Resolve spintax 1335 const messageBody = spin(item.message_body || ''); 1336 const subjectLine = spin(item.subject_line || ''); 1337 1338 if (!messageBody || messageBody.length < 10) { 1339 logger.warn(`Followup message too short for site ${item.site_id} step ${sequenceStep}`); 1340 return null; 1341 } 1342 1343 // SMS length check (more lenient for follow-ups — up to 320 chars / 2 segments) 1344 if (contactMethod === 'sms' && messageBody.length > 320) { 1345 logger.warn( 1346 `SMS followup too long (${messageBody.length}/320) for site ${item.site_id} step ${sequenceStep}` 1347 ); 1348 return null; 1349 } 1350 1351 const messageType = `followup${sequenceStep}`; 1352 1353 // Check for existing followup at this step for this site+method+uri 1354 const existingResult = await client.query( 1355 `SELECT 1 FROM messages 1356 WHERE site_id = $1 AND direction = 'outbound' 1357 AND contact_method = $2 AND contact_uri = $3 1358 AND sequence_step = $4`, 1359 [item.site_id, contactMethod, contactUri, sequenceStep] 1360 ); 1361 1362 if (existingResult.rows[0]) { 1363 logger.info(`Followup step ${sequenceStep} already exists for site ${item.site_id} ${contactMethod}`); 1364 return null; 1365 } 1366 1367 const result = await client.query( 1368 `INSERT INTO messages (site_id, direction, message_body, subject_line, contact_method, contact_uri, approval_status, message_type, sequence_step) 1369 VALUES ($1, 'outbound', $2, $3, $4, $5, $6, $7, $8) 1370 RETURNING id`, 1371 [item.site_id, messageBody, subjectLine || null, contactMethod, contactUri, approvalStatus, messageType, sequenceStep] 1372 ); 1373 1374 // Update next_followup_at for the next step 1375 const TOUCH_CADENCE = { 2: 3, 3: 7, 4: 14, 5: 21, 6: 28, 7: 35, 8: 42 }; 1376 const nextNextStep = sequenceStep + 1; 1377 if (nextNextStep <= 8 && TOUCH_CADENCE[nextNextStep]) { 1378 const daysUntilNext = TOUCH_CADENCE[nextNextStep] - TOUCH_CADENCE[sequenceStep]; 1379 await client.query( 1380 `UPDATE sites 1381 SET next_followup_at = NOW() + make_interval(days => $2), 1382 updated_at = CURRENT_TIMESTAMP 1383 WHERE id = $1`, 1384 [item.site_id, daysUntilNext] 1385 ); 1386 } else { 1387 // All follow-ups generated — clear next_followup_at 1388 await client.query( 1389 `UPDATE sites 1390 SET next_followup_at = NULL, 1391 updated_at = CURRENT_TIMESTAMP 1392 WHERE id = $1`, 1393 [item.site_id] 1394 ); 1395 } 1396 1397 logger.info(`Stored followup step ${sequenceStep} for site ${item.site_id} (${contactMethod} → ${contactUri})`); 1398 return result.rows[0]?.id; 1399 } 1400 1401 // Main 1402 const handlers = { 1403 proposals_email: storeProposal, 1404 proposals_sms: storeProposal, 1405 proofread: storeProofreadDecision, 1406 classify_replies: storeClassification, 1407 extract_names: storeName, 1408 reply_responses: storeReplyResponse, 1409 oversee: storeOverseerResult, 1410 classify_errors: storeErrorClassification, 1411 score_semantic: storeSemanticScore, 1412 score_sites: storeScoreResult, 1413 enrich_sites: storeEnrichResult, 1414 code_review: storeCodeReviewFindings, 1415 monitor_health: storeMonitorHealthResult, 1416 triage_errors: storeTriageResult, 1417 check_docs: storeCheckDocsResult, 1418 evidence_merge: storeEvidenceMerge, 1419 followup_generate: storeFollowupGenerate, 1420 }; 1421 1422 async function main() { 1423 const raw = await readStdin(); 1424 1425 let input; 1426 try { 1427 input = JSON.parse(raw); 1428 } catch { 1429 console.error('Invalid JSON on stdin'); 1430 process.exit(1); 1431 } 1432 1433 const { batch_type, results } = input; 1434 if (!batch_type || !Array.isArray(results)) { 1435 console.error('Expected { batch_type, results: [...] }'); 1436 process.exit(1); 1437 } 1438 1439 const handler = handlers[batch_type]; 1440 if (!handler) { 1441 console.error(`Unknown batch type: ${batch_type}`); 1442 process.exit(1); 1443 } 1444 1445 let stored = 0; 1446 let skipped = 0; 1447 let errors = 0; 1448 1449 try { 1450 await withTransaction(async (client) => { 1451 for (const item of results) { 1452 try { 1453 await client.query('SAVEPOINT item_save'); 1454 const id = await handler(client, item); 1455 await client.query('RELEASE SAVEPOINT item_save'); 1456 if (id) { 1457 stored++; 1458 } else { 1459 skipped++; 1460 } 1461 } catch (err) { 1462 await client.query('ROLLBACK TO SAVEPOINT item_save'); 1463 errors++; 1464 logger.error(`Failed to store item: ${err.message}`); 1465 } 1466 } 1467 }); 1468 const summary = { batch_type, total: results.length, stored, skipped, errors }; 1469 console.log(JSON.stringify(summary)); 1470 } finally { 1471 await closePool(); 1472 } 1473 } 1474 1475 // Only run main() when this script is executed directly (not imported by tests) 1476 if (import.meta.url === `file://${process.argv[1]}`) { 1477 main().catch(err => { 1478 console.error(err.message); 1479 process.exit(1); 1480 }); 1481 } 1482 1483 // Named exports for unit testing (guards prevent main() from running on import) 1484 export { storeSemanticScore, handlers };