claude-store.js
1 #!/usr/bin/env node 2 3 /** 4 * Claude Store — Accept Claude-generated results on stdin, write to DB. 5 * 6 * Usage: 7 * echo '{"batch_type":"classify_replies","results":[...]}' | node scripts/claude-store.js 8 * 9 * Or pipe from claude: 10 * claude -p "..." | node scripts/claude-store.js 11 * 12 * Expected input format (JSON on stdin): 13 * { 14 * "batch_type": "proposals_email|proposals_sms|classify_replies|extract_names|reply_responses|score_semantic|enrich_sites|proofread", 15 * "results": [ ... ] 16 * } 17 * 18 * Each batch type expects different result item shapes — see handlers below. 19 */ 20 21 import { execSync } from 'child_process'; 22 import { appendFileSync } from 'fs'; 23 import { join } from 'path'; 24 import { run, getOne, getAll, withTransaction, closePool } from '../src/utils/db.js'; 25 import { getCountryByCode } from '../src/config/countries.js'; 26 import { computeGrade, FACTOR_WEIGHTS, computeScoreFromFactors } from '../src/score.js'; 27 import { computeWeightedScore } from '../src/utils/programmatic-scorer.js'; 28 import { 29 isGovernmentEmail, 30 isEducationEmail, 31 isDemoEmail, 32 checkBlocklist, 33 classifyIndustry, 34 } from '../src/utils/site-filters.js'; 35 import { addCountryCode, normalizePhoneNumber } from '../src/utils/phone-normalizer.js'; 36 import { spin } from '../src/utils/spintax.js'; 37 import { cleanInvalidSocialLinks } from '../src/contacts/prioritize.js'; 38 import { resetRetries, recordFailure } from '../src/utils/retry-handler.js'; 39 import Logger from '../src/utils/logger.js'; 40 import { deleteHtmlDom, deleteKeyPagesHtml } from '../src/utils/html-storage.js'; 41 import { setScoreJson, getScoreJsonWithFallback } from '../src/utils/score-storage.js'; 42 import { setContactsJson, getContactsJsonWithFallback } from '../src/utils/contacts-storage.js'; 43 import '../src/utils/load-env.js'; 44 45 const logger = new Logger('ClaudeStore'); 46 47 async function readStdin() { 48 const chunks = []; 49 for await (const chunk of process.stdin) { 50 chunks.push(chunk); 51 } 52 return Buffer.concat(chunks).toString('utf8'); 53 } 54 55 async function storeProposal(client, item) { 56 const contactMethod = item.contact_method || 'email'; 57 let contactUri = item.contact_uri || 'PENDING_CONTACT_EXTRACTION'; 58 59 // Normalize phone numbers 60 if (contactMethod === 'sms' && contactUri !== 'PENDING_CONTACT_EXTRACTION' && item.country_code) { 61 contactUri = addCountryCode(contactUri, item.country_code); 62 } 63 64 // GDPR check 65 let approvalStatus = 'pending'; 66 if (contactMethod === 'email' && item.country_code) { 67 const country = getCountryByCode(item.country_code); 68 if (country?.requiresGDPRCheck) { 69 approvalStatus = 'gdpr_blocked'; 70 } 71 } 72 73 // Skip blocked emails 74 if (contactMethod === 'email') { 75 if (isGovernmentEmail(contactUri) || isEducationEmail(contactUri) || isDemoEmail(contactUri)) { 76 logger.info(`Skipping blocked email: ${contactUri}`); 77 return null; 78 } 79 } 80 81 // Resolve spintax 82 let messageBody = spin(item.message_body || item.proposal_text || ''); 83 let subjectLine = spin(item.subject_line || ''); 84 85 // Reject broken templates 86 if (messageBody.startsWith('{') || messageBody.startsWith(', ')) { 87 logger.warn(`Broken template for site ${item.site_id} — skipping`); 88 return null; 89 } 90 91 // ── Auto-fix known LLM proposal defects ───────────────────────────────── 92 // These catch the exact issues that cause pending↔rework cycles in proofread. 93 // Fix at generation time so proofread can focus on genuine quality issues. 94 95 // 1. Unresolved template variables — LLM sometimes leaves [name], [score] etc. 96 const personaFirstName = process.env.PERSONA_FIRST_NAME; 97 const personaName = process.env.PERSONA_NAME; 98 const brandName = process.env.BRAND_NAME; 99 const brandDomain = process.env.BRAND_DOMAIN; 100 if (!personaFirstName) throw new Error('PERSONA_FIRST_NAME env var required'); 101 if (!personaName) throw new Error('PERSONA_NAME env var required'); 102 if (!brandName) throw new Error('BRAND_NAME env var required'); 103 if (!brandDomain) throw new Error('BRAND_DOMAIN env var required'); 104 105 messageBody = messageBody.replace(/\[name\]/gi, personaFirstName); 106 messageBody = messageBody.replace(/\[score\]/gi, String(item.score ?? '')); 107 messageBody = messageBody.replace(/\[grade\]/gi, item.grade ?? ''); 108 subjectLine = subjectLine.replace(/\[name\]/gi, personaFirstName); 109 110 // 2. Sender identity + opt-out — appended dynamically based on country and channel. 111 // SMS: dynamic signature that fits remaining char budget. 112 // Email/form: full identity always appended. 113 const identityPattern = new RegExp(`${personaFirstName}|${brandName.replace(/[&]/g, '\\$&')}|${brandDomain.replace(/\./g, '\\.')}`, 'i'); 114 const hasIdentity = identityPattern.test(messageBody); 115 if (contactMethod === 'sms') { 116 const cc = (item.country_code || '').toUpperCase(); 117 const needsStop = cc === 'US' || cc === 'CA'; 118 // Strip any LLM-generated STOP text or signature — we control these 119 const sigStripPattern = new RegExp(`\\s*-\\s*${personaFirstName}\\s*(${personaName.split(' ').slice(1).join(' ')})?\\s*,?\\s*(${brandName.replace(/[&]/g, '\\s*&\\s*')})?\\s*$`, 'i'); 120 messageBody = messageBody 121 .replace(/\s*Reply STOP to opt out\.?\s*/gi, '') 122 .replace(sigStripPattern, '') 123 .trimEnd(); 124 const stopText = needsStop ? ' Reply STOP to opt out.' : ''; 125 // Signature tiers: try longest first, fall back to shorter 126 const sigTiers = needsStop 127 ? [` - ${personaFirstName}, ${brandName}`, ` - ${brandName}`] // legal minimum is company name 128 : [` - ${personaFirstName}`, '']; // no legal requirement, personal touch if room 129 let bestSig = sigTiers[sigTiers.length - 1]; // fallback = shortest 130 for (const sig of sigTiers) { 131 if (messageBody.length + stopText.length + sig.length <= 160) { 132 bestSig = sig; 133 break; 134 } 135 } 136 messageBody = `${messageBody}${stopText}${bestSig}`; 137 } else if (!hasIdentity) { 138 messageBody = `${messageBody.trimEnd()}\n\n${personaName}, ${brandName} (${brandDomain})`; 139 } 140 141 // 3. Score/grade mismatch — LLM sometimes hallucinates a different score. 142 // Replace any "scored X/100" or "X/100 score" with the actual DB value. 143 if (item.score !== null && item.score !== undefined) { 144 const actualScore = Math.round(item.score); 145 messageBody = messageBody.replace( 146 /\b\d{1,3}\/100\b/g, 147 `${actualScore}/100` 148 ); 149 } 150 151 // Channel character limits 152 const limits = { sms: 160, x: 280 }; 153 const maxChars = limits[contactMethod]; 154 if (maxChars && messageBody.length > maxChars) { 155 logger.warn( 156 `${contactMethod} too long (${messageBody.length}/${maxChars}) for site ${item.site_id}` 157 ); 158 return null; 159 } 160 161 const result = await client.query( 162 `INSERT INTO messages (site_id, direction, message_body, subject_line, contact_method, contact_uri, approval_status, message_type) 163 VALUES ($1, 'outbound', $2, $3, $4, $5, $6, 'outreach') 164 RETURNING id`, 165 [item.site_id, messageBody, subjectLine, contactMethod, contactUri, approvalStatus] 166 ); 167 168 return result.rows[0]?.id; 169 } 170 171 async function storeProofreadDecision(client, item) { 172 if (!item.message_id) { 173 logger.warn('Proofread result missing message_id'); 174 return null; 175 } 176 177 const { decision } = item; 178 if (!['approve', 'rework', 'reject'].includes(decision)) { 179 logger.warn(`Invalid proofread decision "${decision}" for message ${item.message_id}`); 180 return null; 181 } 182 183 // Guard: never change approval_status on already-sent messages (race condition protection) 184 if (decision === 'approve') { 185 const r = await client.query( 186 "UPDATE messages SET approval_status = 'approved' WHERE id = $1 AND sent_at IS NULL", 187 [item.message_id] 188 ); 189 if (r.rowCount === 0) { 190 logger.warn(`Skipped approve for message ${item.message_id}: already sent`); 191 return null; 192 } 193 logger.info(`Approved message ${item.message_id}`); 194 } else if (decision === 'rework') { 195 // Circuit breaker: after 3 rework cycles, park the message instead of looping. 196 // Prevents pending↔rework cycles from blocking Gate 2 indefinitely. 197 const MAX_PROOFREAD_ATTEMPTS = 3; 198 const current = await client.query( 199 'SELECT proofread_attempts FROM messages WHERE id = $1', 200 [item.message_id] 201 ); 202 const attempts = (current.rows[0]?.proofread_attempts || 0) + 1; 203 204 if (attempts >= MAX_PROOFREAD_ATTEMPTS) { 205 await client.query( 206 "UPDATE messages SET approval_status = 'parked', proofread_attempts = $1, rework_instructions = $2 WHERE id = $3 AND sent_at IS NULL", 207 [attempts, `Auto-parked after ${attempts} rework cycles: ${item.rework_instructions || ''}`, item.message_id] 208 ); 209 logger.warn(`Parked message ${item.message_id} after ${attempts} rework cycles (circuit breaker)`); 210 return { stored: true, parked: true }; 211 } 212 213 const r = await client.query( 214 "UPDATE messages SET approval_status = 'rework', proofread_attempts = $1, rework_instructions = $2 WHERE id = $3 AND sent_at IS NULL", 215 [attempts, item.rework_instructions || 'QA: needs rework', item.message_id] 216 ); 217 if (r.rowCount === 0) { 218 logger.warn(`Skipped rework for message ${item.message_id}: already sent`); 219 return null; 220 } 221 logger.info(`Rework message ${item.message_id} (attempt ${attempts}/${MAX_PROOFREAD_ATTEMPTS}): ${item.rework_instructions}`); 222 } else if (decision === 'reject') { 223 const r = await client.query( 224 "UPDATE messages SET approval_status = 'rejected', rework_instructions = $1 WHERE id = $2 AND sent_at IS NULL", 225 [item.reject_reason || 'QA: rejected', item.message_id] 226 ); 227 if (r.rowCount === 0) { 228 logger.warn(`Skipped reject for message ${item.message_id}: already sent`); 229 return null; 230 } 231 logger.info(`Rejected message ${item.message_id}: ${item.reject_reason}`); 232 } 233 234 return item.message_id; 235 } 236 237 async function storeClassification(client, item) { 238 if (!item.message_id) { 239 logger.warn('Classification missing message_id'); 240 return null; 241 } 242 243 const validIntents = [ 244 'inquiry', 245 'opt-out', 246 'interested', 247 'not-interested', 248 'pricing', 249 'schedule', 250 'unknown', 251 'autoresponder', 252 ]; 253 const validSentiments = ['positive', 'neutral', 'negative', 'objection']; 254 255 const intent = validIntents.includes(item.intent) ? item.intent : 'unknown'; 256 const sentiment = validSentiments.includes(item.sentiment) ? item.sentiment : 'neutral'; 257 258 await client.query( 259 'UPDATE messages SET intent = $1, sentiment = $2 WHERE id = $3', 260 [intent, sentiment, item.message_id] 261 ); 262 263 // If opt-out, mark in opt_outs table 264 if (intent === 'opt-out') { 265 const msgResult = await client.query( 266 'SELECT contact_uri, contact_method FROM messages WHERE id = $1', 267 [item.message_id] 268 ); 269 const msg = msgResult.rows[0]; 270 if (msg) { 271 await client.query( 272 `INSERT INTO opt_outs (phone, email, method, opted_out_at, source) 273 VALUES ( 274 CASE WHEN $2 = 'sms' THEN $1 ELSE NULL END, 275 CASE WHEN $2 = 'email' THEN $1 ELSE NULL END, 276 $2, NOW(), 'llm_classify' 277 ) 278 ON CONFLICT DO NOTHING`, 279 [msg.contact_uri, msg.contact_method] 280 ); 281 } 282 } 283 284 return item.message_id; 285 } 286 287 async function storeName(client, item) { 288 if (!item.site_id || !item.contacts) { 289 logger.warn('Name extraction missing site_id or contacts'); 290 return null; 291 } 292 293 // Update contacts_json with extracted names (filesystem) 294 const contactsRaw = getContactsJsonWithFallback(item.site_id); 295 if (!contactsRaw) return null; 296 297 const contactsData = JSON.parse(contactsRaw); 298 let updated = false; 299 300 for (const extracted of item.contacts) { 301 if (!extracted.name || !extracted.uri) continue; 302 const match = (contactsData.contacts || []).find( 303 c => (c.uri || c.value) === extracted.uri && !c.name 304 ); 305 if (match) { 306 match.name = extracted.name; 307 updated = true; 308 } 309 } 310 311 if (updated) { 312 setContactsJson(item.site_id, JSON.stringify(contactsData)); 313 } 314 315 return item.site_id; 316 } 317 318 async function storeReplyResponse(client, item) { 319 if (!item.site_id) { 320 logger.warn('Reply response missing site_id'); 321 return null; 322 } 323 324 // Don't respond to opt-outs or not-interested, but mark the inbound as handled 325 // so it doesn't recur in every batch (NOT EXISTS check would never clear it otherwise) 326 if (item.skip) { 327 logger.info(`Skipping response for site ${item.site_id}: ${item.skip_reason || 'opted out'}`); 328 const inboundResult = await client.query( 329 `SELECT contact_method, contact_uri FROM messages WHERE site_id=$1 AND direction='inbound' ORDER BY created_at DESC LIMIT 1`, 330 [item.site_id] 331 ); 332 const inboundRow = inboundResult.rows[0]; 333 const skipChannel = item.channel || item.contact_method || inboundRow?.contact_method || 'sms'; 334 const skipUri = item.contact_uri || inboundRow?.contact_uri || ''; 335 await client.query( 336 `INSERT INTO messages (site_id, direction, message_body, message_type, contact_method, contact_uri) 337 VALUES ($1, 'outbound', $2, 'reply_skipped', $3, $4)`, 338 [item.site_id, `[skipped: ${item.skip_reason || 'not interested'}]`, skipChannel, skipUri] 339 ); 340 return null; 341 } 342 343 if (!item.message_body) { 344 logger.warn(`Reply response missing message_body for site ${item.site_id}`); 345 return null; 346 } 347 348 // channel/contact_uri come from the LLM echoing back the inbound data. 349 // Fall back to the most recent inbound for this site if LLM omits them. 350 let contactMethod = item.channel || item.contact_method; 351 let contactUri = item.contact_uri || ''; 352 if (!contactMethod) { 353 const inboundResult = await client.query( 354 `SELECT contact_method, contact_uri FROM messages 355 WHERE site_id = $1 AND direction = 'inbound' 356 ORDER BY created_at DESC LIMIT 1`, 357 [item.site_id] 358 ); 359 const inbound = inboundResult.rows[0]; 360 contactMethod = inbound?.contact_method || 'sms'; 361 if (!contactUri) contactUri = inbound?.contact_uri || ''; 362 } 363 364 if (contactMethod === 'sms' && item.country_code) { 365 contactUri = addCountryCode(contactUri, item.country_code); 366 } 367 368 const result = await client.query( 369 `INSERT INTO messages (site_id, direction, message_body, subject_line, contact_method, contact_uri, message_type, approval_status) 370 VALUES ($1, 'outbound', $2, $3, $4, $5, 'reply', 'approved') 371 RETURNING id`, 372 [item.site_id, item.message_body, item.subject_line || null, contactMethod, contactUri] 373 ); 374 375 return result.rows[0]?.id; 376 } 377 378 const PROJECT_ROOT = join(import.meta.dirname || process.cwd(), '..'); 379 const OVERSEER_LOG = '/tmp/sonnet-overseer.log'; 380 const STATUS_FILE = join(PROJECT_ROOT, 'logs/pipeline-status.txt'); 381 382 function overseerLog(msg) { 383 const line = `[${new Date().toISOString()}] [SonnetOverseer] ${msg}`; 384 logger.info(msg); 385 try { 386 appendFileSync(OVERSEER_LOG, `${line}\n`); 387 } catch { 388 /* ignore */ 389 } 390 try { 391 appendFileSync(STATUS_FILE, `${line}\n`); 392 } catch { 393 /* ignore */ 394 } 395 } 396 397 async function executeOverseerAction(action, client) { 398 const { code, description, params = {} } = action; 399 overseerLog(`ACTION: ${code} — ${description}`); 400 401 switch (code) { 402 case 'RESTART_PIPELINE': { 403 try { 404 execSync('systemctl --user restart 333method-pipeline --no-block', { timeout: 30000 }); 405 overseerLog('Pipeline service restart requested'); 406 await client.query( 407 `INSERT INTO tel.system_health (check_type, status, details, action_taken) 408 VALUES ('sonnet_overseer', 'ok', $1, 'restart_pipeline')`, 409 [JSON.stringify({ reason: description })] 410 ); 411 } catch (err) { 412 overseerLog(`ERROR: Failed to restart pipeline: ${err.message}`); 413 } 414 break; 415 } 416 case 'CLEAR_STALE_TASKS': { 417 const minAgeHours = params.min_age_hours || 2; 418 const cutoff = new Date(Date.now() - minAgeHours * 60 * 60 * 1000).toISOString(); 419 const res = await client.query( 420 `UPDATE tel.agent_tasks SET status='failed', completed_at=CURRENT_TIMESTAMP, 421 result_json='{"reason":"cleared by sonnet overseer - stale blocked/pending task"}' 422 WHERE status IN ('blocked','pending') AND created_at < $1`, 423 [cutoff] 424 ); 425 const runningCutoff = new Date(Date.now() - 2 * 60 * 60 * 1000).toISOString(); 426 const res2 = await client.query( 427 `UPDATE tel.agent_tasks SET status='failed', completed_at=CURRENT_TIMESTAMP, 428 result_json='{"reason":"cleared by sonnet overseer - stuck in running state >2h"}' 429 WHERE status='running' AND started_at < $1`, 430 [runningCutoff] 431 ); 432 overseerLog( 433 `Cleared ${res.rowCount} stale blocked/pending + ${res2.rowCount} stuck-running agent tasks` 434 ); 435 await client.query( 436 `INSERT INTO tel.system_health (check_type, status, details, action_taken) 437 VALUES ('sonnet_overseer', 'ok', $1, 'clear_stale_tasks')`, 438 [JSON.stringify({ cleared: res.rowCount, min_age_hours: minAgeHours })] 439 ); 440 break; 441 } 442 case 'RESET_STUCK_SITES': { 443 const { stage } = params; 444 const MIN_HOURS_FLOOR = 4; 445 const minHours = Math.max(params.min_stuck_hours ?? MIN_HOURS_FLOOR, MIN_HOURS_FLOOR); 446 if (!stage) { 447 overseerLog('RESET_STUCK_SITES: missing stage param, skipping'); 448 break; 449 } 450 const priorStage = { 451 assets_captured: 'found', 452 prog_scored: 'assets_captured', 453 semantic_scored: 'prog_scored', 454 vision_scored: 'prog_scored', 455 // enriched intentionally omitted — enriched is a deliberate queue for evidence 456 // collection (gather_evidence cron). Resetting to semantic_scored causes loop. 457 }; 458 const prevStage = priorStage[stage]; 459 if (!prevStage) { 460 overseerLog(`RESET_STUCK_SITES: unknown stage '${stage}', skipping`); 461 break; 462 } 463 const cutoff = new Date(Date.now() - minHours * 60 * 60 * 1000).toISOString(); 464 const excludedResult = await client.query( 465 `SELECT id FROM sites WHERE recapture_count >= 2 AND status = $1`, 466 [stage] 467 ); 468 const excludeIds = excludedResult.rows.map(r => r.id); 469 const clearAssets = stage === 'assets_captured'; 470 471 // When clearing assets, also delete HTML files from filesystem 472 const errorMsg = `reset by sonnet overseer - stuck >${minHours}h`; 473 if (clearAssets) { 474 let affectedResult; 475 if (excludeIds.length > 0) { 476 affectedResult = await client.query( 477 `SELECT id FROM sites WHERE status=$1 AND updated_at < $2 AND NOT (id = ANY($3))`, 478 [stage, cutoff, excludeIds] 479 ); 480 } else { 481 affectedResult = await client.query( 482 `SELECT id FROM sites WHERE status=$1 AND updated_at < $2`, 483 [stage, cutoff] 484 ); 485 } 486 for (const row of affectedResult.rows) { 487 deleteHtmlDom(row.id); 488 } 489 } 490 491 let result; 492 if (excludeIds.length > 0) { 493 if (clearAssets) { 494 result = await client.query( 495 `UPDATE sites SET status=$1, updated_at=NOW(), 496 error_message=$2, 497 recapture_count=recapture_count+1, html_dom=NULL, screenshot_path=NULL 498 WHERE status=$3 AND updated_at < $4 AND NOT (id = ANY($5))`, 499 [prevStage, errorMsg, stage, cutoff, excludeIds] 500 ); 501 } else { 502 result = await client.query( 503 `UPDATE sites SET status=$1, updated_at=NOW(), 504 error_message=$2, 505 recapture_count=recapture_count+1 506 WHERE status=$3 AND updated_at < $4 AND NOT (id = ANY($5))`, 507 [prevStage, errorMsg, stage, cutoff, excludeIds] 508 ); 509 } 510 } else { 511 if (clearAssets) { 512 result = await client.query( 513 `UPDATE sites SET status=$1, updated_at=NOW(), 514 error_message=$2, 515 recapture_count=recapture_count+1, html_dom=NULL, screenshot_path=NULL 516 WHERE status=$3 AND updated_at < $4`, 517 [prevStage, errorMsg, stage, cutoff] 518 ); 519 } else { 520 result = await client.query( 521 `UPDATE sites SET status=$1, updated_at=NOW(), 522 error_message=$2, 523 recapture_count=recapture_count+1 524 WHERE status=$3 AND updated_at < $4`, 525 [prevStage, errorMsg, stage, cutoff] 526 ); 527 } 528 } 529 overseerLog( 530 `Reset ${result.rowCount} sites stuck at '${stage}' → '${prevStage}' (skipped ${excludeIds.length} anti-loop)` 531 ); 532 await client.query( 533 `INSERT INTO tel.system_health (check_type, status, details, action_taken) 534 VALUES ('sonnet_overseer', 'ok', $1, 'reset_stuck_sites')`, 535 [JSON.stringify({ 536 stage, 537 prev_stage: prevStage, 538 reset: result.rowCount, 539 skipped_anti_loop: excludeIds.length, 540 })] 541 ); 542 break; 543 } 544 case 'LOG_ONLY': 545 overseerLog(`NOTE: ${params.note || description}`); 546 break; 547 default: 548 overseerLog(`WARN: Unknown action code '${code}', skipping`); 549 } 550 } 551 552 async function storeOverseerResult(client, item) { 553 // item = { summary, severity, findings, actions } 554 if (!item.summary) { 555 logger.warn('Overseer result missing summary'); 556 return null; 557 } 558 559 overseerLog(`Analysis: severity=${item.severity} | ${item.summary}`); 560 overseerLog(`Findings: ${item.findings?.length || 0}, Actions: ${item.actions?.length || 0}`); 561 562 let actionsTaken = 0; 563 for (const action of item.actions || []) { 564 try { 565 await executeOverseerAction(action, client); 566 if (action.code !== 'LOG_ONLY') actionsTaken++; 567 } catch (err) { 568 overseerLog(`ERROR: Action ${action.code} failed: ${err.message}`); 569 } 570 } 571 572 // Write report to status file 573 const report = [ 574 `\n${'='.repeat(70)}`, 575 `SONNET OVERSEER — ${new Date().toISOString()} [${(item.severity || 'unknown').toUpperCase()}]`, 576 `Summary: ${item.summary}`, 577 '', 578 `Findings: ${(item.findings || []).map(f => `[${f.severity}] ${f.issue}`).join(' | ') || 'none'}`, 579 `Actions taken: ${actionsTaken}`, 580 `${'='.repeat(70)}`, 581 ].join('\n'); 582 try { 583 appendFileSync(STATUS_FILE, `${report}\n`); 584 } catch { 585 /* ignore */ 586 } 587 588 return actionsTaken; 589 } 590 591 async function storeErrorClassification(client, item) { 592 // item = { pattern, label, group, context, example_errors, occurrence_count } 593 const { pattern, label, group, context, example_errors, occurrence_count } = item; 594 595 if ( 596 !pattern || 597 !label || 598 !['terminal', 'retriable'].includes(group) || 599 !['site', 'outreach'].includes(context) 600 ) { 601 logger.warn(`Skipping invalid error proposal: ${JSON.stringify(item)}`); 602 return null; 603 } 604 605 try { 606 new RegExp(pattern); 607 } catch { 608 logger.warn(`Skipping invalid regex "${pattern}"`); 609 return null; 610 } 611 612 // Avoid duplicates 613 const existing = await client.query( 614 `SELECT id FROM error_pattern_proposals WHERE pattern = $1 AND context = $2 AND status = 'pending'`, 615 [pattern, context] 616 ); 617 if (existing.rows[0]) return null; 618 619 const examples = Array.isArray(example_errors) ? example_errors.slice(0, 5) : []; 620 const result = await client.query( 621 `INSERT INTO error_pattern_proposals (pattern, label, group_name, context, example_errors, occurrence_count) 622 VALUES ($1, $2, $3, $4, $5, $6) 623 RETURNING id`, 624 [pattern, label, group, context, JSON.stringify(examples), occurrence_count || 0] 625 ); 626 627 return result.rows[0]?.id; 628 } 629 630 /** 631 * Store semantic scoring results from Haiku. 632 * Merges LLM-assessed semantic factor scores (headline, value prop, USP) 633 * into the existing programmatic score_json and recomputes the final score. 634 */ 635 async function storeSemanticScore(client, item) { 636 const siteId = item.site_id; 637 if (!siteId) return null; 638 639 const siteResult = await client.query('SELECT score FROM sites WHERE id = $1', [siteId]); 640 const site = siteResult.rows[0]; 641 const scoreRaw = getScoreJsonWithFallback(siteId); 642 if (!scoreRaw) { 643 // score_json lost in migration — advance semantic_scored → enriched using existing DB score 644 const existingScore = site?.score; 645 if (existingScore === null || existingScore === undefined) return null; 646 const scoreThreshold = parseInt(process.env.LOW_SCORE_CUTOFF || '82', 10); 647 const isHighScore = existingScore > scoreThreshold; 648 await client.query( 649 `UPDATE sites SET 650 status = CASE WHEN status = 'semantic_scored' THEN $1 ELSE status END, 651 updated_at = CURRENT_TIMESTAMP 652 WHERE id = $2`, 653 [isHighScore ? 'high_score' : 'enriched', siteId] 654 ); 655 return siteId; 656 } 657 658 const scoreJson = JSON.parse(scoreRaw); 659 if (!scoreJson.factor_scores) return null; 660 661 // Merge Haiku's semantic scores into factor_scores 662 const semanticFactors = ['headline_quality', 'value_proposition', 'unique_selling_proposition']; 663 for (const factor of semanticFactors) { 664 if (item[factor] && typeof item[factor].score === 'number') { 665 scoreJson.factor_scores[factor] = { 666 score: Math.min(Math.max(item[factor].score, 0), 10), 667 reasoning: item[factor].reasoning || 'LLM semantic assessment', 668 evidence: scoreJson.factor_scores[factor]?.evidence || '', 669 source: 'haiku_semantic', 670 }; 671 } 672 } 673 674 // Recompute weighted score with vision-aware weights 675 const visionEnabled = process.env.ENABLE_VISION !== 'false'; 676 const newScore = computeWeightedScore(scoreJson.factor_scores, visionEnabled); 677 const newGrade = computeGrade(newScore); 678 679 // Mark as semantically scored 680 scoreJson.semantic_scored = true; 681 scoreJson.scoring_method = 'hybrid'; 682 scoreJson.conversion_score = newScore; 683 scoreJson.letter_grade = newGrade; 684 685 // Advance status: 686 // prog_scored → semantic_scored (or high_score) — vision-on path 687 // semantic_scored → enriched (or high_score) — vision-off path (LLM overlay done) 688 const scoreThreshold = parseInt(process.env.LOW_SCORE_CUTOFF || '82', 10); 689 const isHighScore = newScore !== null && newScore > scoreThreshold; 690 const newStatusFromProg = isHighScore ? 'high_score' : 'semantic_scored'; 691 const newStatusFromSemantic = isHighScore ? 'high_score' : 'enriched'; 692 693 setScoreJson(siteId, scoreJson); 694 await client.query( 695 `UPDATE sites 696 SET score = $1, grade = $2, 697 status = CASE 698 WHEN status = 'prog_scored' THEN $3 699 WHEN status = 'semantic_scored' THEN $4 700 ELSE status 701 END, 702 rescored_at = CURRENT_TIMESTAMP, 703 updated_at = CURRENT_TIMESTAMP 704 WHERE id = $5`, 705 [newScore, newGrade, newStatusFromProg, newStatusFromSemantic, siteId] 706 ); 707 708 return siteId; 709 } 710 711 /** 712 * Store scoring result from the orchestrator's score_sites batch. 713 * Replicates the site-classification and DB-update logic from src/stages/scoring.js:scoreSite(). 714 * item = { site_id, domain, factor_scores, overall_calculation, contact_details, ... } 715 */ 716 async function storeScoreResult(client, item) { 717 const siteId = item.site_id; 718 if (!siteId) { 719 logger.warn('Score result missing site_id'); 720 return null; 721 } 722 723 // Validate factor_scores are present — incomplete responses are unusable 724 if (!item.factor_scores) { 725 logger.warn(`Score result for site ${siteId} missing factor_scores — skipping`); 726 return null; 727 } 728 729 // Compute score and grade programmatically from factor scores (same as scoreWebsite()) 730 const computedScore = computeScoreFromFactors(item.factor_scores); 731 const computedGrade = computeGrade(computedScore); 732 733 // Ensure overall_calculation has the computed values 734 if (!item.overall_calculation) item.overall_calculation = {}; 735 item.overall_calculation.conversion_score = computedScore; 736 item.overall_calculation.letter_grade = computedGrade; 737 738 const grade = computedGrade; 739 const score = computedScore; 740 741 // Domain-level blocklist check (same logic as scoring stage) 742 const siteResult = await client.query( 743 'SELECT domain, country_code FROM sites WHERE id = $1', 744 [siteId] 745 ); 746 const site = siteResult.rows[0]; 747 if (!site) { 748 logger.warn(`Score result: site ${siteId} not found in DB`); 749 return null; 750 } 751 752 const blocked = checkBlocklist(site.domain, site.country_code); 753 if (blocked) { 754 await client.query( 755 `UPDATE sites SET status = 'ignored', error_message = $1 WHERE id = $2`, 756 [blocked.reason, siteId] 757 ); 758 logger.info(`Score: ${site.domain} blocklisted — ${blocked.reason}`); 759 return siteId; 760 } 761 762 const industry = classifyIndustry(site.domain); 763 if (industry) { 764 const reason = 765 industry.type === 'legal' 766 ? `Ignored: Legal site detected from domain (${industry.reason})` 767 : `Ignored: Regulated industry (${industry.type}) detected from domain (${industry.reason})`; 768 await client.query( 769 `UPDATE sites SET status = 'ignored', error_message = $1 WHERE id = $2`, 770 [reason, siteId] 771 ); 772 logger.info(`Score: ${site.domain} regulated industry — ${reason}`); 773 return siteId; 774 } 775 776 // Helper: dual-write score_json to both DB and filesystem 777 const scoreJsonStr = JSON.stringify(item); 778 779 // LLM-detected classifications (directory, local business, law firm, regulated industry) 780 const isDirectory = item.overall_calculation?.is_business_directory; 781 if (isDirectory) { 782 setScoreJson(siteId, scoreJsonStr); 783 await client.query( 784 `UPDATE sites SET status = 'ignored', error_message = $1 WHERE id = $2`, 785 ['Ignored: Business directory (LLM detected)', siteId] 786 ); 787 logger.info(`Score: ${site.domain} is a business directory — ignored`); 788 return siteId; 789 } 790 791 const isLocalBusiness = item.overall_calculation?.is_local_business; 792 if (isLocalBusiness === false) { 793 setScoreJson(siteId, scoreJsonStr); 794 await client.query( 795 `UPDATE sites SET status = 'ignored', error_message = $1 WHERE id = $2`, 796 ['Ignored: Not a local business (LLM detected)', siteId] 797 ); 798 logger.info(`Score: ${site.domain} is not a local business — ignored`); 799 return siteId; 800 } 801 802 const isLawFirm = item.overall_calculation?.is_law_firm; 803 if (isLawFirm) { 804 setScoreJson(siteId, scoreJsonStr); 805 await client.query( 806 `UPDATE sites SET status = 'ignored', error_message = $1 WHERE id = $2`, 807 ['Ignored: Law firm (LLM detected)', siteId] 808 ); 809 logger.info(`Score: ${site.domain} is a law firm — ignored`); 810 return siteId; 811 } 812 813 const industryClassification = 814 item.overall_calculation?.industry_classification?.toLowerCase() || ''; 815 const regulatedPatterns = [ 816 'dental', 817 'medical', 818 'clinic', 819 'hospital', 820 'pharmacy', 821 'veterinary', 822 'financial advis', 823 'mortgage broker', 824 'accounting', 825 ]; 826 const isRegulated = regulatedPatterns.some(p => industryClassification.includes(p)); 827 if (isRegulated) { 828 setScoreJson(siteId, scoreJsonStr); 829 await client.query( 830 `UPDATE sites SET status = 'ignored', error_message = $1 WHERE id = $2`, 831 [`Ignored: Regulated industry (${industryClassification})`, siteId] 832 ); 833 logger.info( 834 `Score: ${site.domain} is regulated industry (${industryClassification}) — ignored` 835 ); 836 return siteId; 837 } 838 839 // Error page detection 840 const isErrorPage = item.overall_calculation?.is_error_page; 841 const errorType = item.overall_calculation?.error_type; 842 const errorDescription = item.overall_calculation?.error_description; 843 if (isErrorPage && errorType) { 844 const permanentErrors = ['404', '403', '410']; 845 if (permanentErrors.includes(errorType)) { 846 setScoreJson(siteId, scoreJsonStr); 847 await client.query( 848 `UPDATE sites SET status = 'ignored', error_message = $1 WHERE id = $2`, 849 [errorDescription || `Permanent error: ${errorType}`, siteId] 850 ); 851 logger.info(`Score: ${site.domain} is a ${errorType} error page — ignored`); 852 return siteId; 853 } 854 // Temporary errors — keep at assets_captured for retry 855 const temporaryErrors = ['5xx', 'maintenance', 'redirect']; 856 if (temporaryErrors.includes(errorType)) { 857 setScoreJson(siteId, scoreJsonStr); 858 await client.query( 859 `UPDATE sites SET status = 'assets_captured', error_message = $1 WHERE id = $2`, 860 [errorDescription || `Temporary error: ${errorType}`, siteId] 861 ); 862 logger.info(`Score: ${site.domain} has temporary error ${errorType} — will retry`); 863 return siteId; 864 } 865 } 866 867 // Broken site detection 868 const isBrokenSite = item.overall_calculation?.is_broken_site || false; 869 const brokenSiteDetails = item.overall_calculation?.broken_site_details || []; 870 if (isBrokenSite) { 871 const siteRowResult = await client.query( 872 'SELECT recapture_count FROM sites WHERE id = $1', 873 [siteId] 874 ); 875 const recaptureCount = (siteRowResult.rows[0]?.recapture_count || 0) + 1; 876 setScoreJson(siteId, scoreJsonStr); 877 if (recaptureCount > 3) { 878 await client.query( 879 `UPDATE sites SET status = 'ignored', error_message = $1 WHERE id = $2`, 880 [`Max recapture attempts reached (3) - Issues: ${brokenSiteDetails.join(', ')}`, siteId] 881 ); 882 return siteId; 883 } 884 await client.query( 885 `UPDATE sites SET status = 'assets_captured', error_message = $1, recapture_count = $2, 886 recapture_at = NOW() + INTERVAL '7 days' WHERE id = $3`, 887 [ 888 `Broken site detected (attempt ${recaptureCount}/3) - Issues: ${brokenSiteDetails.join(', ')}`, 889 recaptureCount, 890 siteId, 891 ] 892 ); 893 return siteId; 894 } 895 896 // Extract location 897 const city = item.overall_calculation?.city || null; 898 const countryCode = item.overall_calculation?.country_code || null; 899 const state = item.overall_calculation?.state || null; 900 901 // Determine target status (same logic as scoring.js) 902 const scoreThreshold = parseInt(process.env.LOW_SCORE_CUTOFF || '82', 10); 903 // HTML-only mode: skip straight to semantic_scored (rescoring is a no-op without screenshots) 904 const enableVision = process.env.ENABLE_VISION !== 'false'; 905 const newStatus = 906 score !== null && score > scoreThreshold 907 ? 'high_score' 908 : enableVision 909 ? 'prog_scored' 910 : 'semantic_scored'; 911 912 // When vision disabled, save contact_details to contacts_json filesystem 913 if (!enableVision && item.contact_details) { 914 setContactsJson(siteId, JSON.stringify(item.contact_details)); 915 } 916 917 // Write score to filesystem; DB no longer stores score_json/contacts_json blobs 918 setScoreJson(siteId, scoreJsonStr); 919 920 await client.query( 921 `UPDATE sites SET 922 grade = $1, score = $2, 923 city = $3, country_code = $4, state = $5, 924 status = $6, 925 scored_at = CURRENT_TIMESTAMP, 926 rescored_at = CASE WHEN $7 = 'semantic_scored' THEN CURRENT_TIMESTAMP ELSE rescored_at END, 927 error_message = NULL 928 WHERE id = $8`, 929 [grade, score, city, countryCode, state, newStatus, newStatus, siteId] 930 ); 931 932 await resetRetries(siteId); 933 934 logger.info(`Score stored: site ${siteId} (${site.domain}) → ${grade} (${score}) → ${newStatus}`); 935 return siteId; 936 } 937 938 /** 939 * Clean phone numbers from LLM enrichment output — normalizes to E.164 format. 940 * @param {Array<Object|string>} phones - Phone numbers from LLM 941 * @returns {Array<Object>} Normalized phone numbers 942 */ 943 function cleanEnrichPhones(phones) { 944 if (!Array.isArray(phones)) return []; 945 return phones 946 .map(phone => { 947 if (typeof phone === 'string') { 948 return { number: normalizePhoneNumber(phone), label: '' }; 949 } 950 if (phone && phone.number) { 951 return { number: normalizePhoneNumber(phone.number), label: phone.label || '' }; 952 } 953 return null; 954 }) 955 .filter(Boolean); 956 } 957 958 /** 959 * Store enrichment result from the orchestrator's enrich_sites batch. 960 * This handles extractInitialContacts-equivalent: sets/updates contacts_json from HTML analysis. 961 * The pipeline's enrich stage still handles browser navigation for key_pages. 962 * item = { site_id, domain, business_name, city, country_code, state, email_addresses, 963 * phone_numbers, social_profiles, key_pages, primary_contact_form } 964 */ 965 async function storeEnrichResult(client, item) { 966 const siteId = item.site_id; 967 if (!siteId) { 968 logger.warn('Enrich result missing site_id'); 969 return null; 970 } 971 972 const siteResult = await client.query( 973 'SELECT country_code, status FROM sites WHERE id = $1', 974 [siteId] 975 ); 976 const site = siteResult.rows[0]; 977 if (!site) { 978 logger.warn(`Enrich result: site ${siteId} not found in DB`); 979 return null; 980 } 981 982 // Build enriched contacts_json from LLM result (filesystem first) 983 const contactsRaw = getContactsJsonWithFallback(siteId); 984 let contactsJson = contactsRaw ? JSON.parse(contactsRaw) : {}; 985 986 // Merge LLM-extracted data into existing contacts_json 987 // Prefer LLM values for business_name/city/country_code/state (previously missing) 988 if (item.business_name && !contactsJson.business_name) { 989 contactsJson.business_name = item.business_name; 990 } 991 if (item.city && !contactsJson.city) { 992 contactsJson.city = item.city; 993 } 994 if (item.country_code && !contactsJson.country_code) { 995 contactsJson.country_code = item.country_code; 996 } 997 if (item.state && !contactsJson.state) { 998 contactsJson.state = item.state; 999 } 1000 1001 // Merge arrays (avoid duplicates by URI/email/number) 1002 if (Array.isArray(item.email_addresses)) { 1003 const existing = new Set( 1004 (contactsJson.email_addresses || []).map(e => (typeof e === 'string' ? e : e.email)) 1005 ); 1006 const newEmails = item.email_addresses.filter(e => { 1007 const addr = typeof e === 'string' ? e : e.email; 1008 return addr && !existing.has(addr); 1009 }); 1010 contactsJson.email_addresses = [...(contactsJson.email_addresses || []), ...newEmails]; 1011 } 1012 1013 if (Array.isArray(item.phone_numbers)) { 1014 const cleaned = cleanEnrichPhones(item.phone_numbers); 1015 const existing = new Set( 1016 (contactsJson.phone_numbers || []).map(p => (typeof p === 'string' ? p : p.number)) 1017 ); 1018 const newPhones = cleaned.filter(p => p.number && !existing.has(p.number)); 1019 contactsJson.phone_numbers = [...(contactsJson.phone_numbers || []), ...newPhones]; 1020 } 1021 1022 if (Array.isArray(item.social_profiles)) { 1023 const existing = new Set( 1024 (contactsJson.social_profiles || []).map(s => (typeof s === 'string' ? s : s.url)) 1025 ); 1026 const newSocial = item.social_profiles.filter(s => { 1027 const url = typeof s === 'string' ? s : s.url; 1028 return url && !existing.has(url); 1029 }); 1030 contactsJson.social_profiles = [...(contactsJson.social_profiles || []), ...newSocial]; 1031 } 1032 1033 if (Array.isArray(item.key_pages)) { 1034 const existing = new Set(contactsJson.key_pages || []); 1035 const newPages = item.key_pages.filter(p => p && !existing.has(p)); 1036 contactsJson.key_pages = [...(contactsJson.key_pages || []), ...newPages]; 1037 } 1038 1039 if (item.primary_contact_form && !contactsJson.primary_contact_form) { 1040 contactsJson.primary_contact_form = item.primary_contact_form; 1041 } 1042 1043 // Clean invalid social links 1044 contactsJson = cleanInvalidSocialLinks(contactsJson); 1045 1046 const city = contactsJson.city || null; 1047 const countryCode = contactsJson.country_code || null; 1048 const state = contactsJson.state || null; 1049 1050 // enrichment_pass is set in the input batch but not echoed back in the LLM output JSON. 1051 // Infer from DB status when the field is absent (enriched_regex = LLM pass pending). 1052 const isLlmPass = 1053 item.enrichment_pass === 'llm' || 1054 (item.enrichment_pass === undefined && site.status === 'enriched_regex'); 1055 1056 // Write contacts to filesystem; store sentinel in DB to preserve IS NOT NULL gating 1057 setContactsJson(siteId, JSON.stringify(contactsJson)); 1058 1059 if (isLlmPass) { 1060 // LLM pass over key_pages_html — set enriched_llm, clear key_pages_html to reclaim space 1061 deleteKeyPagesHtml(siteId); 1062 await client.query( 1063 `UPDATE sites SET 1064 city = COALESCE($1, city), 1065 country_code = COALESCE($2, country_code), 1066 state = COALESCE($3, state), 1067 status = 'enriched_llm', 1068 enriched_at = CURRENT_TIMESTAMP, 1069 key_pages_html = NULL, 1070 updated_at = CURRENT_TIMESTAMP 1071 WHERE id = $4`, 1072 [city, countryCode, state, siteId] 1073 ); 1074 } else { 1075 // Initial pass from html_dom — status stays at 'semantic_scored'/'vision_scored' so the 1076 // pipeline enrich stage can still run browser navigation on key_pages 1077 await client.query( 1078 `UPDATE sites SET 1079 city = COALESCE($1, city), 1080 country_code = COALESCE($2, country_code), 1081 state = COALESCE($3, state), 1082 updated_at = CURRENT_TIMESTAMP 1083 WHERE id = $4`, 1084 [city, countryCode, state, siteId] 1085 ); 1086 } 1087 1088 logger.info( 1089 `Enrich stored: site ${siteId} (${item.domain}) — emails:${contactsJson.email_addresses?.length || 0} phones:${contactsJson.phone_numbers?.length || 0} social:${contactsJson.social_profiles?.length || 0}` 1090 ); 1091 return siteId; 1092 } 1093 1094 /** 1095 * Store monitor_health result — logs findings, no DB writes. 1096 * item = { severity, summary, findings[], recommended_actions[] } 1097 */ 1098 async function storeMonitorHealthResult(client, item) { 1099 if (!item.summary) { 1100 logger.warn('monitor_health result missing summary'); 1101 return null; 1102 } 1103 logger.info(`Health check [${item.severity || 'info'}]: ${item.summary}`); 1104 for (const f of item.findings || []) { 1105 logger.info(` [${f.severity}] ${f.component}: ${f.issue}`); 1106 } 1107 for (const a of item.recommended_actions || []) { 1108 logger.info(` ACTION: ${a}`); 1109 } 1110 return item.summary; 1111 } 1112 1113 /** 1114 * Store triage_errors result — creates fix tasks for critical/high errors. 1115 * item = { error_message, type, severity, action, summary, suggested_fix } 1116 */ 1117 async function storeTriageResult(client, item) { 1118 if (!item.error_message || !item.type) { 1119 logger.warn('triage_errors result missing required fields'); 1120 return null; 1121 } 1122 1123 logger.info(`Triage [${item.severity}/${item.type}]: ${item.summary}`); 1124 1125 if (item.action !== 'create_fix_task') return item.error_message; 1126 1127 // Only create tasks for high/critical 1128 if (!['critical', 'high'].includes(item.severity)) return item.error_message; 1129 1130 const priority = item.severity === 'critical' ? 9 : 7; 1131 await client.query( 1132 `INSERT INTO tel.agent_tasks 1133 (task_type, assigned_to, created_by, priority, status, context_json) 1134 VALUES ('triage_fix', 'developer', 'incident_commander', $1, 'pending', $2)`, 1135 [ 1136 priority, 1137 JSON.stringify({ 1138 error_message: item.error_message, 1139 error_type: item.type, 1140 severity: item.severity, 1141 summary: item.summary, 1142 suggested_fix: item.suggested_fix || null, 1143 }), 1144 ] 1145 ); 1146 1147 return item.error_message; 1148 } 1149 1150 /** 1151 * Store check_docs result — logs stale docs findings, no DB writes. 1152 * item = { file_path, stale_docs[], summary } 1153 */ 1154 async function storeCheckDocsResult(client, item) { 1155 if (!item.file_path) { 1156 logger.warn('check_docs result missing file_path'); 1157 return null; 1158 } 1159 const count = Array.isArray(item.stale_docs) ? item.stale_docs.length : 0; 1160 if (count === 0) { 1161 logger.info(`check_docs: ${item.file_path} — docs current`); 1162 return item.file_path; 1163 } 1164 logger.info(`check_docs: ${item.file_path} — ${count} stale doc(s)`); 1165 for (const d of item.stale_docs) { 1166 logger.info(` [${d.location}] ${d.issue}`); 1167 } 1168 return item.file_path; 1169 } 1170 1171 /** 1172 * Store code review findings as agent_tasks for later fixing. 1173 * item = { file_path, findings: [{severity, category, line, description, suggestion}], summary } 1174 * 1175 * Each finding with severity >= 7 becomes a code_review_fix task. 1176 * Findings with severity <= 6 are stored but skipped (too low priority to auto-fix). 1177 */ 1178 async function storeCodeReviewFindings(client, item) { 1179 if (!item.file_path) { 1180 logger.warn('Code review result missing file_path'); 1181 return null; 1182 } 1183 1184 const findings = Array.isArray(item.findings) ? item.findings : []; 1185 1186 if (findings.length === 0) { 1187 logger.info(`Code review: ${item.file_path} — no findings`); 1188 return item.file_path; // Counts as stored (reviewed with no issues) 1189 } 1190 1191 let tasksCreated = 0; 1192 1193 for (const finding of findings) { 1194 const severity = parseInt(finding.severity || '5', 10); 1195 1196 // Only create fix tasks for bugs and security issues (severity >= 7) 1197 if (severity < 7) { 1198 logger.info( 1199 `Code review: ${item.file_path}:${finding.line || '?'} severity=${severity} — logged, not queued` 1200 ); 1201 continue; 1202 } 1203 1204 await client.query( 1205 `INSERT INTO tel.agent_tasks 1206 (task_type, assigned_to, created_by, priority, status, context_json) 1207 VALUES ('code_review_fix', 'developer', 'code_reviewer', $1, 'pending', $2)`, 1208 [ 1209 severity, 1210 JSON.stringify({ 1211 file_path: item.file_path, 1212 line: finding.line || null, 1213 category: finding.category || 'unknown', 1214 description: finding.description, 1215 suggestion: finding.suggestion || null, 1216 severity, 1217 }), 1218 ] 1219 ); 1220 tasksCreated++; 1221 } 1222 1223 logger.info( 1224 `Code review: ${item.file_path} — ${findings.length} findings, ${tasksCreated} fix tasks created` 1225 ); 1226 return item.file_path; 1227 } 1228 1229 /** 1230 * Store the Haiku evidence-merge result. 1231 * 1232 * item = { 1233 * site_id, 1234 * revised_scores: { factor: { original, revised, reason } }, 1235 * additional_findings: [{ factor, finding, severity }], 1236 * evidence_summary: string 1237 * } 1238 * 1239 * Writes the full merged object to sites.evidence_json. 1240 * Also patches individual factor scores in sites.score_json where revisions exist. 1241 */ 1242 async function storeEvidenceMerge(client, item) { 1243 if (!item.site_id) { 1244 logger.warn('Evidence merge result missing site_id'); 1245 return null; 1246 } 1247 1248 const siteExistsResult = await client.query( 1249 'SELECT id FROM sites WHERE id = $1', 1250 [item.site_id] 1251 ); 1252 if (!siteExistsResult.rows[0]) { 1253 logger.warn(`Evidence merge: site ${item.site_id} not found`); 1254 return null; 1255 } 1256 1257 // Patch score_json with revised scores (filesystem) 1258 const revisedScores = item.revised_scores || {}; 1259 const revisionCount = Object.keys(revisedScores).length; 1260 const scoreRaw = getScoreJsonWithFallback(item.site_id); 1261 1262 if (revisionCount > 0 && scoreRaw) { 1263 try { 1264 const scoreData = JSON.parse(scoreRaw); 1265 for (const [factor, revision] of Object.entries(revisedScores)) { 1266 if (scoreData.factor_scores && scoreData.factor_scores[factor] !== undefined) { 1267 scoreData.factor_scores[factor].score = revision.revised; 1268 scoreData.factor_scores[factor].evidence_revised = true; 1269 scoreData.factor_scores[factor].evidence_reason = revision.reason; 1270 } 1271 } 1272 // Write patched score to filesystem only; DB no longer stores score_json 1273 setScoreJson(item.site_id, JSON.stringify(scoreData)); 1274 } catch (e) { 1275 logger.warn( 1276 `Evidence merge: failed to patch score_json for site ${item.site_id}: ${e.message}` 1277 ); 1278 } 1279 } 1280 1281 // Write full evidence_json 1282 await client.query( 1283 'UPDATE sites SET evidence_json = $1 WHERE id = $2', 1284 [ 1285 JSON.stringify({ 1286 revised_scores: revisedScores, 1287 additional_findings: item.additional_findings || [], 1288 evidence_summary: item.evidence_summary || '', 1289 merged_at: new Date().toISOString(), 1290 }), 1291 item.site_id, 1292 ] 1293 ); 1294 1295 const additionalCount = (item.additional_findings || []).length; 1296 logger.info( 1297 `Evidence merge: site ${item.site_id} — ${revisionCount} score revisions, ${additionalCount} additional findings` 1298 ); 1299 if (item.evidence_summary) { 1300 logger.info(`Evidence summary: ${item.evidence_summary}`); 1301 } 1302 1303 return item.site_id; 1304 } 1305 1306 /** 1307 * Store a follow-up message generated by the LLM. 1308 * Each result item contains: site_id, contact_method, contact_uri, message_body, 1309 * subject_line (email only), sequence_step, country_code. 1310 * 1311 * Similar to storeProposal but uses message_type='followupN' and includes sequence_step. 1312 */ 1313 async function storeFollowupGenerate(client, item) { 1314 const contactMethod = item.contact_method || 'email'; 1315 let contactUri = item.contact_uri || ''; 1316 const sequenceStep = item.sequence_step || item.next_step || 2; 1317 1318 if (!contactUri || !item.site_id) { 1319 logger.warn(`Followup missing contact_uri or site_id`); 1320 return null; 1321 } 1322 1323 // Normalize phone numbers 1324 if (contactMethod === 'sms' && item.country_code) { 1325 contactUri = addCountryCode(contactUri, item.country_code); 1326 } 1327 1328 // GDPR check 1329 let approvalStatus = 'pending'; 1330 if (contactMethod === 'email' && item.country_code) { 1331 const country = getCountryByCode(item.country_code); 1332 if (country?.requiresGDPRCheck) { 1333 approvalStatus = 'gdpr_blocked'; 1334 } 1335 } 1336 1337 // Skip blocked emails 1338 if (contactMethod === 'email') { 1339 if (isGovernmentEmail(contactUri) || isEducationEmail(contactUri) || isDemoEmail(contactUri)) { 1340 logger.info(`Skipping blocked email for followup: ${contactUri}`); 1341 return null; 1342 } 1343 } 1344 1345 // Resolve spintax 1346 const messageBody = spin(item.message_body || ''); 1347 const subjectLine = spin(item.subject_line || ''); 1348 1349 if (!messageBody || messageBody.length < 10) { 1350 logger.warn(`Followup message too short for site ${item.site_id} step ${sequenceStep}`); 1351 return null; 1352 } 1353 1354 // SMS length check (more lenient for follow-ups — up to 320 chars / 2 segments) 1355 if (contactMethod === 'sms' && messageBody.length > 320) { 1356 logger.warn( 1357 `SMS followup too long (${messageBody.length}/320) for site ${item.site_id} step ${sequenceStep}` 1358 ); 1359 return null; 1360 } 1361 1362 const messageType = `followup${sequenceStep}`; 1363 1364 // Check for existing followup at this step for this site+method+uri 1365 const existingResult = await client.query( 1366 `SELECT 1 FROM messages 1367 WHERE site_id = $1 AND direction = 'outbound' 1368 AND contact_method = $2 AND contact_uri = $3 1369 AND sequence_step = $4`, 1370 [item.site_id, contactMethod, contactUri, sequenceStep] 1371 ); 1372 1373 if (existingResult.rows[0]) { 1374 logger.info(`Followup step ${sequenceStep} already exists for site ${item.site_id} ${contactMethod}`); 1375 return null; 1376 } 1377 1378 const result = await client.query( 1379 `INSERT INTO messages (site_id, direction, message_body, subject_line, contact_method, contact_uri, approval_status, message_type, sequence_step) 1380 VALUES ($1, 'outbound', $2, $3, $4, $5, $6, $7, $8) 1381 RETURNING id`, 1382 [item.site_id, messageBody, subjectLine || null, contactMethod, contactUri, approvalStatus, messageType, sequenceStep] 1383 ); 1384 1385 // Update next_followup_at for the next step 1386 const TOUCH_CADENCE = { 2: 3, 3: 7, 4: 14, 5: 21, 6: 28, 7: 35, 8: 42 }; 1387 const nextNextStep = sequenceStep + 1; 1388 if (nextNextStep <= 8 && TOUCH_CADENCE[nextNextStep]) { 1389 const daysUntilNext = TOUCH_CADENCE[nextNextStep] - TOUCH_CADENCE[sequenceStep]; 1390 await client.query( 1391 `UPDATE sites 1392 SET next_followup_at = NOW() + make_interval(days => $2), 1393 updated_at = CURRENT_TIMESTAMP 1394 WHERE id = $1`, 1395 [item.site_id, daysUntilNext] 1396 ); 1397 } else { 1398 // All follow-ups generated — clear next_followup_at 1399 await client.query( 1400 `UPDATE sites 1401 SET next_followup_at = NULL, 1402 updated_at = CURRENT_TIMESTAMP 1403 WHERE id = $1`, 1404 [item.site_id] 1405 ); 1406 } 1407 1408 logger.info(`Stored followup step ${sequenceStep} for site ${item.site_id} (${contactMethod} → ${contactUri})`); 1409 return result.rows[0]?.id; 1410 } 1411 1412 // Main 1413 const handlers = { 1414 proposals_email: storeProposal, 1415 proposals_sms: storeProposal, 1416 proofread: storeProofreadDecision, 1417 classify_replies: storeClassification, 1418 extract_names: storeName, 1419 reply_responses: storeReplyResponse, 1420 oversee: storeOverseerResult, 1421 classify_errors: storeErrorClassification, 1422 score_semantic: storeSemanticScore, 1423 score_sites: storeScoreResult, 1424 enrich_sites: storeEnrichResult, 1425 code_review: storeCodeReviewFindings, 1426 monitor_health: storeMonitorHealthResult, 1427 triage_errors: storeTriageResult, 1428 check_docs: storeCheckDocsResult, 1429 evidence_merge: storeEvidenceMerge, 1430 followup_generate: storeFollowupGenerate, 1431 }; 1432 1433 async function main() { 1434 const raw = await readStdin(); 1435 1436 let input; 1437 try { 1438 input = JSON.parse(raw); 1439 } catch { 1440 console.error('Invalid JSON on stdin'); 1441 process.exit(1); 1442 } 1443 1444 const { batch_type, results } = input; 1445 if (!batch_type || !Array.isArray(results)) { 1446 console.error('Expected { batch_type, results: [...] }'); 1447 process.exit(1); 1448 } 1449 1450 const handler = handlers[batch_type]; 1451 if (!handler) { 1452 console.error(`Unknown batch type: ${batch_type}`); 1453 process.exit(1); 1454 } 1455 1456 let stored = 0; 1457 let skipped = 0; 1458 let errors = 0; 1459 1460 try { 1461 await withTransaction(async (client) => { 1462 for (const item of results) { 1463 try { 1464 await client.query('SAVEPOINT item_save'); 1465 const id = await handler(client, item); 1466 await client.query('RELEASE SAVEPOINT item_save'); 1467 if (id) { 1468 stored++; 1469 } else { 1470 skipped++; 1471 } 1472 } catch (err) { 1473 await client.query('ROLLBACK TO SAVEPOINT item_save'); 1474 errors++; 1475 logger.error(`Failed to store item: ${err.message}`); 1476 } 1477 } 1478 }); 1479 const summary = { batch_type, total: results.length, stored, skipped, errors }; 1480 console.log(JSON.stringify(summary)); 1481 } finally { 1482 await closePool(); 1483 } 1484 } 1485 1486 // Only run main() when this script is executed directly (not imported by tests) 1487 if (import.meta.url === `file://${process.argv[1]}`) { 1488 main().catch(err => { 1489 console.error(err.message); 1490 process.exit(1); 1491 }); 1492 } 1493 1494 // Named exports for unit testing (guards prevent main() from running on import) 1495 export { storeSemanticScore, handlers };