/ scripts / claude-store.js
claude-store.js
   1  #!/usr/bin/env node
   2  
   3  /**
   4   * Claude Store — Accept Claude-generated results on stdin, write to DB.
   5   *
   6   * Usage:
   7   *   echo '{"batch_type":"classify_replies","results":[...]}' | node scripts/claude-store.js
   8   *
   9   * Or pipe from claude:
  10   *   claude -p "..." | node scripts/claude-store.js
  11   *
  12   * Expected input format (JSON on stdin):
  13   *   {
  14   *     "batch_type": "proposals_email|proposals_sms|classify_replies|extract_names|reply_responses|score_semantic|enrich_sites|proofread",
  15   *     "results": [ ... ]
  16   *   }
  17   *
  18   * Each batch type expects different result item shapes — see handlers below.
  19   */
  20  
  21  import { execSync } from 'child_process';
  22  import { appendFileSync } from 'fs';
  23  import { join } from 'path';
  24  import { run, getOne, getAll, withTransaction, closePool } from '../src/utils/db.js';
  25  import { getCountryByCode } from '../src/config/countries.js';
  26  import { computeGrade, FACTOR_WEIGHTS, computeScoreFromFactors } from '../src/score.js';
  27  import { computeWeightedScore } from '../src/utils/programmatic-scorer.js';
  28  import {
  29    isGovernmentEmail,
  30    isEducationEmail,
  31    isDemoEmail,
  32    checkBlocklist,
  33    classifyIndustry,
  34  } from '../src/utils/site-filters.js';
  35  import { addCountryCode, normalizePhoneNumber } from '../src/utils/phone-normalizer.js';
  36  import { spin } from '../src/utils/spintax.js';
  37  import { cleanInvalidSocialLinks } from '../src/contacts/prioritize.js';
  38  import { resetRetries, recordFailure } from '../src/utils/retry-handler.js';
  39  import Logger from '../src/utils/logger.js';
  40  import { deleteHtmlDom, deleteKeyPagesHtml } from '../src/utils/html-storage.js';
  41  import { setScoreJson, getScoreJsonWithFallback } from '../src/utils/score-storage.js';
  42  import { setContactsJson, getContactsJsonWithFallback } from '../src/utils/contacts-storage.js';
  43  import '../src/utils/load-env.js';
  44  
  45  const logger = new Logger('ClaudeStore');
  46  
  47  async function readStdin() {
  48    const chunks = [];
  49    for await (const chunk of process.stdin) {
  50      chunks.push(chunk);
  51    }
  52    return Buffer.concat(chunks).toString('utf8');
  53  }
  54  
  55  async function storeProposal(client, item) {
  56    const contactMethod = item.contact_method || 'email';
  57    let contactUri = item.contact_uri || 'PENDING_CONTACT_EXTRACTION';
  58  
  59    // Normalize phone numbers
  60    if (contactMethod === 'sms' && contactUri !== 'PENDING_CONTACT_EXTRACTION' && item.country_code) {
  61      contactUri = addCountryCode(contactUri, item.country_code);
  62    }
  63  
  64    // GDPR check
  65    let approvalStatus = 'pending';
  66    if (contactMethod === 'email' && item.country_code) {
  67      const country = getCountryByCode(item.country_code);
  68      if (country?.requiresGDPRCheck) {
  69        approvalStatus = 'gdpr_blocked';
  70      }
  71    }
  72  
  73    // Skip blocked emails
  74    if (contactMethod === 'email') {
  75      if (isGovernmentEmail(contactUri) || isEducationEmail(contactUri) || isDemoEmail(contactUri)) {
  76        logger.info(`Skipping blocked email: ${contactUri}`);
  77        return null;
  78      }
  79    }
  80  
  81    // Resolve spintax
  82    let messageBody = spin(item.message_body || item.proposal_text || '');
  83    let subjectLine = spin(item.subject_line || '');
  84  
  85    // Reject broken templates
  86    if (messageBody.startsWith('{') || messageBody.startsWith(', ')) {
  87      logger.warn(`Broken template for site ${item.site_id} — skipping`);
  88      return null;
  89    }
  90  
  91    // ── Auto-fix known LLM proposal defects ─────────────────────────────────
  92    // These catch the exact issues that cause pending↔rework cycles in proofread.
  93    // Fix at generation time so proofread can focus on genuine quality issues.
  94  
  95    // 1. Unresolved template variables — LLM sometimes leaves [name], [score] etc.
  96    messageBody = messageBody.replace(/\[name\]/gi, 'Marcus');
  97    messageBody = messageBody.replace(/\[score\]/gi, String(item.score ?? ''));
  98    messageBody = messageBody.replace(/\[grade\]/gi, item.grade ?? '');
  99    subjectLine = subjectLine.replace(/\[name\]/gi, 'Marcus');
 100  
 101    // 2. Sender identity + opt-out — appended dynamically based on country and channel.
 102    //    SMS: dynamic signature that fits remaining char budget.
 103    //    Email/form: full identity always appended.
 104    const hasIdentity = /Marcus|Audit&Fix|AuditFix|auditandfix/i.test(messageBody);
 105    if (contactMethod === 'sms') {
 106      const cc = (item.country_code || '').toUpperCase();
 107      const needsStop = cc === 'US' || cc === 'CA';
 108      // Strip any LLM-generated STOP text or signature — we control these
 109      messageBody = messageBody
 110        .replace(/\s*Reply STOP to opt out\.?\s*/gi, '')
 111        .replace(/\s*-\s*Marcus\s*(Webb)?\s*,?\s*(Audit\s*&\s*Fix)?\s*$/i, '')
 112        .trimEnd();
 113      const stopText = needsStop ? ' Reply STOP to opt out.' : '';
 114      // Signature tiers: try longest first, fall back to shorter
 115      const sigTiers = needsStop
 116        ? [' - Marcus, Audit&Fix', ' - Audit&Fix'] // legal minimum is company name
 117        : [' - Marcus', ''];                        // no legal requirement, personal touch if room
 118      let bestSig = sigTiers[sigTiers.length - 1]; // fallback = shortest
 119      for (const sig of sigTiers) {
 120        if (messageBody.length + stopText.length + sig.length <= 160) {
 121          bestSig = sig;
 122          break;
 123        }
 124      }
 125      messageBody = `${messageBody}${stopText}${bestSig}`;
 126    } else if (!hasIdentity) {
 127      messageBody = `${messageBody.trimEnd()}\n\nMarcus Webb, Audit&Fix (auditandfix.com)`;
 128    }
 129  
 130    // 3. Score/grade mismatch — LLM sometimes hallucinates a different score.
 131    //    Replace any "scored X/100" or "X/100 score" with the actual DB value.
 132    if (item.score !== null && item.score !== undefined) {
 133      const actualScore = Math.round(item.score);
 134      messageBody = messageBody.replace(
 135        /\b\d{1,3}\/100\b/g,
 136        `${actualScore}/100`
 137      );
 138    }
 139  
 140    // Channel character limits
 141    const limits = { sms: 160, x: 280 };
 142    const maxChars = limits[contactMethod];
 143    if (maxChars && messageBody.length > maxChars) {
 144      logger.warn(
 145        `${contactMethod} too long (${messageBody.length}/${maxChars}) for site ${item.site_id}`
 146      );
 147      return null;
 148    }
 149  
 150    const result = await client.query(
 151      `INSERT INTO messages (site_id, direction, message_body, subject_line, contact_method, contact_uri, approval_status, message_type)
 152       VALUES ($1, 'outbound', $2, $3, $4, $5, $6, 'outreach')
 153       RETURNING id`,
 154      [item.site_id, messageBody, subjectLine, contactMethod, contactUri, approvalStatus]
 155    );
 156  
 157    return result.rows[0]?.id;
 158  }
 159  
 160  async function storeProofreadDecision(client, item) {
 161    if (!item.message_id) {
 162      logger.warn('Proofread result missing message_id');
 163      return null;
 164    }
 165  
 166    const { decision } = item;
 167    if (!['approve', 'rework', 'reject'].includes(decision)) {
 168      logger.warn(`Invalid proofread decision "${decision}" for message ${item.message_id}`);
 169      return null;
 170    }
 171  
 172    // Guard: never change approval_status on already-sent messages (race condition protection)
 173    if (decision === 'approve') {
 174      const r = await client.query(
 175        "UPDATE messages SET approval_status = 'approved' WHERE id = $1 AND sent_at IS NULL",
 176        [item.message_id]
 177      );
 178      if (r.rowCount === 0) {
 179        logger.warn(`Skipped approve for message ${item.message_id}: already sent`);
 180        return null;
 181      }
 182      logger.info(`Approved message ${item.message_id}`);
 183    } else if (decision === 'rework') {
 184      // Circuit breaker: after 3 rework cycles, park the message instead of looping.
 185      // Prevents pending↔rework cycles from blocking Gate 2 indefinitely.
 186      const MAX_PROOFREAD_ATTEMPTS = 3;
 187      const current = await client.query(
 188        'SELECT proofread_attempts FROM messages WHERE id = $1',
 189        [item.message_id]
 190      );
 191      const attempts = (current.rows[0]?.proofread_attempts || 0) + 1;
 192  
 193      if (attempts >= MAX_PROOFREAD_ATTEMPTS) {
 194        await client.query(
 195          "UPDATE messages SET approval_status = 'parked', proofread_attempts = $1, rework_instructions = $2 WHERE id = $3 AND sent_at IS NULL",
 196          [attempts, `Auto-parked after ${attempts} rework cycles: ${item.rework_instructions || ''}`, item.message_id]
 197        );
 198        logger.warn(`Parked message ${item.message_id} after ${attempts} rework cycles (circuit breaker)`);
 199        return { stored: true, parked: true };
 200      }
 201  
 202      const r = await client.query(
 203        "UPDATE messages SET approval_status = 'rework', proofread_attempts = $1, rework_instructions = $2 WHERE id = $3 AND sent_at IS NULL",
 204        [attempts, item.rework_instructions || 'QA: needs rework', item.message_id]
 205      );
 206      if (r.rowCount === 0) {
 207        logger.warn(`Skipped rework for message ${item.message_id}: already sent`);
 208        return null;
 209      }
 210      logger.info(`Rework message ${item.message_id} (attempt ${attempts}/${MAX_PROOFREAD_ATTEMPTS}): ${item.rework_instructions}`);
 211    } else if (decision === 'reject') {
 212      const r = await client.query(
 213        "UPDATE messages SET approval_status = 'rejected', rework_instructions = $1 WHERE id = $2 AND sent_at IS NULL",
 214        [item.reject_reason || 'QA: rejected', item.message_id]
 215      );
 216      if (r.rowCount === 0) {
 217        logger.warn(`Skipped reject for message ${item.message_id}: already sent`);
 218        return null;
 219      }
 220      logger.info(`Rejected message ${item.message_id}: ${item.reject_reason}`);
 221    }
 222  
 223    return item.message_id;
 224  }
 225  
 226  async function storeClassification(client, item) {
 227    if (!item.message_id) {
 228      logger.warn('Classification missing message_id');
 229      return null;
 230    }
 231  
 232    const validIntents = [
 233      'inquiry',
 234      'opt-out',
 235      'interested',
 236      'not-interested',
 237      'pricing',
 238      'schedule',
 239      'unknown',
 240      'autoresponder',
 241    ];
 242    const validSentiments = ['positive', 'neutral', 'negative', 'objection'];
 243  
 244    const intent = validIntents.includes(item.intent) ? item.intent : 'unknown';
 245    const sentiment = validSentiments.includes(item.sentiment) ? item.sentiment : 'neutral';
 246  
 247    await client.query(
 248      'UPDATE messages SET intent = $1, sentiment = $2 WHERE id = $3',
 249      [intent, sentiment, item.message_id]
 250    );
 251  
 252    // If opt-out, mark in opt_outs table
 253    if (intent === 'opt-out') {
 254      const msgResult = await client.query(
 255        'SELECT contact_uri, contact_method FROM messages WHERE id = $1',
 256        [item.message_id]
 257      );
 258      const msg = msgResult.rows[0];
 259      if (msg) {
 260        await client.query(
 261          `INSERT INTO opt_outs (phone, email, method, opted_out_at, source)
 262           VALUES (
 263             CASE WHEN $2 = 'sms' THEN $1 ELSE NULL END,
 264             CASE WHEN $2 = 'email' THEN $1 ELSE NULL END,
 265             $2, NOW(), 'llm_classify'
 266           )
 267           ON CONFLICT DO NOTHING`,
 268          [msg.contact_uri, msg.contact_method]
 269        );
 270      }
 271    }
 272  
 273    return item.message_id;
 274  }
 275  
 276  async function storeName(client, item) {
 277    if (!item.site_id || !item.contacts) {
 278      logger.warn('Name extraction missing site_id or contacts');
 279      return null;
 280    }
 281  
 282    // Update contacts_json with extracted names (filesystem)
 283    const contactsRaw = getContactsJsonWithFallback(item.site_id);
 284    if (!contactsRaw) return null;
 285  
 286    const contactsData = JSON.parse(contactsRaw);
 287    let updated = false;
 288  
 289    for (const extracted of item.contacts) {
 290      if (!extracted.name || !extracted.uri) continue;
 291      const match = (contactsData.contacts || []).find(
 292        c => (c.uri || c.value) === extracted.uri && !c.name
 293      );
 294      if (match) {
 295        match.name = extracted.name;
 296        updated = true;
 297      }
 298    }
 299  
 300    if (updated) {
 301      setContactsJson(item.site_id, JSON.stringify(contactsData));
 302    }
 303  
 304    return item.site_id;
 305  }
 306  
 307  async function storeReplyResponse(client, item) {
 308    if (!item.site_id) {
 309      logger.warn('Reply response missing site_id');
 310      return null;
 311    }
 312  
 313    // Don't respond to opt-outs or not-interested, but mark the inbound as handled
 314    // so it doesn't recur in every batch (NOT EXISTS check would never clear it otherwise)
 315    if (item.skip) {
 316      logger.info(`Skipping response for site ${item.site_id}: ${item.skip_reason || 'opted out'}`);
 317      const inboundResult = await client.query(
 318        `SELECT contact_method, contact_uri FROM messages WHERE site_id=$1 AND direction='inbound' ORDER BY created_at DESC LIMIT 1`,
 319        [item.site_id]
 320      );
 321      const inboundRow = inboundResult.rows[0];
 322      const skipChannel = item.channel || item.contact_method || inboundRow?.contact_method || 'sms';
 323      const skipUri = item.contact_uri || inboundRow?.contact_uri || '';
 324      await client.query(
 325        `INSERT INTO messages (site_id, direction, message_body, message_type, contact_method, contact_uri)
 326         VALUES ($1, 'outbound', $2, 'reply_skipped', $3, $4)`,
 327        [item.site_id, `[skipped: ${item.skip_reason || 'not interested'}]`, skipChannel, skipUri]
 328      );
 329      return null;
 330    }
 331  
 332    if (!item.message_body) {
 333      logger.warn(`Reply response missing message_body for site ${item.site_id}`);
 334      return null;
 335    }
 336  
 337    // channel/contact_uri come from the LLM echoing back the inbound data.
 338    // Fall back to the most recent inbound for this site if LLM omits them.
 339    let contactMethod = item.channel || item.contact_method;
 340    let contactUri = item.contact_uri || '';
 341    if (!contactMethod) {
 342      const inboundResult = await client.query(
 343        `SELECT contact_method, contact_uri FROM messages
 344         WHERE site_id = $1 AND direction = 'inbound'
 345         ORDER BY created_at DESC LIMIT 1`,
 346        [item.site_id]
 347      );
 348      const inbound = inboundResult.rows[0];
 349      contactMethod = inbound?.contact_method || 'sms';
 350      if (!contactUri) contactUri = inbound?.contact_uri || '';
 351    }
 352  
 353    if (contactMethod === 'sms' && item.country_code) {
 354      contactUri = addCountryCode(contactUri, item.country_code);
 355    }
 356  
 357    const result = await client.query(
 358      `INSERT INTO messages (site_id, direction, message_body, subject_line, contact_method, contact_uri, message_type, approval_status)
 359       VALUES ($1, 'outbound', $2, $3, $4, $5, 'reply', 'approved')
 360       RETURNING id`,
 361      [item.site_id, item.message_body, item.subject_line || null, contactMethod, contactUri]
 362    );
 363  
 364    return result.rows[0]?.id;
 365  }
 366  
 367  const PROJECT_ROOT = join(import.meta.dirname || process.cwd(), '..');
 368  const OVERSEER_LOG = '/tmp/sonnet-overseer.log';
 369  const STATUS_FILE = join(PROJECT_ROOT, 'logs/pipeline-status.txt');
 370  
 371  function overseerLog(msg) {
 372    const line = `[${new Date().toISOString()}] [SonnetOverseer] ${msg}`;
 373    logger.info(msg);
 374    try {
 375      appendFileSync(OVERSEER_LOG, `${line}\n`);
 376    } catch {
 377      /* ignore */
 378    }
 379    try {
 380      appendFileSync(STATUS_FILE, `${line}\n`);
 381    } catch {
 382      /* ignore */
 383    }
 384  }
 385  
 386  async function executeOverseerAction(action, client) {
 387    const { code, description, params = {} } = action;
 388    overseerLog(`ACTION: ${code} — ${description}`);
 389  
 390    switch (code) {
 391      case 'RESTART_PIPELINE': {
 392        try {
 393          execSync('systemctl --user restart 333method-pipeline --no-block', { timeout: 30000 });
 394          overseerLog('Pipeline service restart requested');
 395          await client.query(
 396            `INSERT INTO tel.system_health (check_type, status, details, action_taken)
 397             VALUES ('sonnet_overseer', 'ok', $1, 'restart_pipeline')`,
 398            [JSON.stringify({ reason: description })]
 399          );
 400        } catch (err) {
 401          overseerLog(`ERROR: Failed to restart pipeline: ${err.message}`);
 402        }
 403        break;
 404      }
 405      case 'CLEAR_STALE_TASKS': {
 406        const minAgeHours = params.min_age_hours || 2;
 407        const cutoff = new Date(Date.now() - minAgeHours * 60 * 60 * 1000).toISOString();
 408        const res = await client.query(
 409          `UPDATE tel.agent_tasks SET status='failed', completed_at=CURRENT_TIMESTAMP,
 410           result_json='{"reason":"cleared by sonnet overseer - stale blocked/pending task"}'
 411           WHERE status IN ('blocked','pending') AND created_at < $1`,
 412          [cutoff]
 413        );
 414        const runningCutoff = new Date(Date.now() - 2 * 60 * 60 * 1000).toISOString();
 415        const res2 = await client.query(
 416          `UPDATE tel.agent_tasks SET status='failed', completed_at=CURRENT_TIMESTAMP,
 417           result_json='{"reason":"cleared by sonnet overseer - stuck in running state >2h"}'
 418           WHERE status='running' AND started_at < $1`,
 419          [runningCutoff]
 420        );
 421        overseerLog(
 422          `Cleared ${res.rowCount} stale blocked/pending + ${res2.rowCount} stuck-running agent tasks`
 423        );
 424        await client.query(
 425          `INSERT INTO tel.system_health (check_type, status, details, action_taken)
 426           VALUES ('sonnet_overseer', 'ok', $1, 'clear_stale_tasks')`,
 427          [JSON.stringify({ cleared: res.rowCount, min_age_hours: minAgeHours })]
 428        );
 429        break;
 430      }
 431      case 'RESET_STUCK_SITES': {
 432        const { stage } = params;
 433        const MIN_HOURS_FLOOR = 4;
 434        const minHours = Math.max(params.min_stuck_hours ?? MIN_HOURS_FLOOR, MIN_HOURS_FLOOR);
 435        if (!stage) {
 436          overseerLog('RESET_STUCK_SITES: missing stage param, skipping');
 437          break;
 438        }
 439        const priorStage = {
 440          assets_captured: 'found',
 441          prog_scored: 'assets_captured',
 442          semantic_scored: 'prog_scored',
 443          vision_scored: 'prog_scored',
 444          // enriched intentionally omitted — enriched is a deliberate queue for evidence
 445          // collection (gather_evidence cron). Resetting to semantic_scored causes loop.
 446        };
 447        const prevStage = priorStage[stage];
 448        if (!prevStage) {
 449          overseerLog(`RESET_STUCK_SITES: unknown stage '${stage}', skipping`);
 450          break;
 451        }
 452        const cutoff = new Date(Date.now() - minHours * 60 * 60 * 1000).toISOString();
 453        const excludedResult = await client.query(
 454          `SELECT id FROM sites WHERE recapture_count >= 2 AND status = $1`,
 455          [stage]
 456        );
 457        const excludeIds = excludedResult.rows.map(r => r.id);
 458        const clearAssets = stage === 'assets_captured';
 459  
 460        // When clearing assets, also delete HTML files from filesystem
 461        const errorMsg = `reset by sonnet overseer - stuck >${minHours}h`;
 462        if (clearAssets) {
 463          let affectedResult;
 464          if (excludeIds.length > 0) {
 465            affectedResult = await client.query(
 466              `SELECT id FROM sites WHERE status=$1 AND updated_at < $2 AND NOT (id = ANY($3))`,
 467              [stage, cutoff, excludeIds]
 468            );
 469          } else {
 470            affectedResult = await client.query(
 471              `SELECT id FROM sites WHERE status=$1 AND updated_at < $2`,
 472              [stage, cutoff]
 473            );
 474          }
 475          for (const row of affectedResult.rows) {
 476            deleteHtmlDom(row.id);
 477          }
 478        }
 479  
 480        let result;
 481        if (excludeIds.length > 0) {
 482          if (clearAssets) {
 483            result = await client.query(
 484              `UPDATE sites SET status=$1, updated_at=NOW(),
 485               error_message=$2,
 486               recapture_count=recapture_count+1, html_dom=NULL, screenshot_path=NULL
 487               WHERE status=$3 AND updated_at < $4 AND NOT (id = ANY($5))`,
 488              [prevStage, errorMsg, stage, cutoff, excludeIds]
 489            );
 490          } else {
 491            result = await client.query(
 492              `UPDATE sites SET status=$1, updated_at=NOW(),
 493               error_message=$2,
 494               recapture_count=recapture_count+1
 495               WHERE status=$3 AND updated_at < $4 AND NOT (id = ANY($5))`,
 496              [prevStage, errorMsg, stage, cutoff, excludeIds]
 497            );
 498          }
 499        } else {
 500          if (clearAssets) {
 501            result = await client.query(
 502              `UPDATE sites SET status=$1, updated_at=NOW(),
 503               error_message=$2,
 504               recapture_count=recapture_count+1, html_dom=NULL, screenshot_path=NULL
 505               WHERE status=$3 AND updated_at < $4`,
 506              [prevStage, errorMsg, stage, cutoff]
 507            );
 508          } else {
 509            result = await client.query(
 510              `UPDATE sites SET status=$1, updated_at=NOW(),
 511               error_message=$2,
 512               recapture_count=recapture_count+1
 513               WHERE status=$3 AND updated_at < $4`,
 514              [prevStage, errorMsg, stage, cutoff]
 515            );
 516          }
 517        }
 518        overseerLog(
 519          `Reset ${result.rowCount} sites stuck at '${stage}' → '${prevStage}' (skipped ${excludeIds.length} anti-loop)`
 520        );
 521        await client.query(
 522          `INSERT INTO tel.system_health (check_type, status, details, action_taken)
 523           VALUES ('sonnet_overseer', 'ok', $1, 'reset_stuck_sites')`,
 524          [JSON.stringify({
 525            stage,
 526            prev_stage: prevStage,
 527            reset: result.rowCount,
 528            skipped_anti_loop: excludeIds.length,
 529          })]
 530        );
 531        break;
 532      }
 533      case 'LOG_ONLY':
 534        overseerLog(`NOTE: ${params.note || description}`);
 535        break;
 536      default:
 537        overseerLog(`WARN: Unknown action code '${code}', skipping`);
 538    }
 539  }
 540  
 541  async function storeOverseerResult(client, item) {
 542    // item = { summary, severity, findings, actions }
 543    if (!item.summary) {
 544      logger.warn('Overseer result missing summary');
 545      return null;
 546    }
 547  
 548    overseerLog(`Analysis: severity=${item.severity} | ${item.summary}`);
 549    overseerLog(`Findings: ${item.findings?.length || 0}, Actions: ${item.actions?.length || 0}`);
 550  
 551    let actionsTaken = 0;
 552    for (const action of item.actions || []) {
 553      try {
 554        await executeOverseerAction(action, client);
 555        if (action.code !== 'LOG_ONLY') actionsTaken++;
 556      } catch (err) {
 557        overseerLog(`ERROR: Action ${action.code} failed: ${err.message}`);
 558      }
 559    }
 560  
 561    // Write report to status file
 562    const report = [
 563      `\n${'='.repeat(70)}`,
 564      `SONNET OVERSEER — ${new Date().toISOString()} [${(item.severity || 'unknown').toUpperCase()}]`,
 565      `Summary: ${item.summary}`,
 566      '',
 567      `Findings: ${(item.findings || []).map(f => `[${f.severity}] ${f.issue}`).join(' | ') || 'none'}`,
 568      `Actions taken: ${actionsTaken}`,
 569      `${'='.repeat(70)}`,
 570    ].join('\n');
 571    try {
 572      appendFileSync(STATUS_FILE, `${report}\n`);
 573    } catch {
 574      /* ignore */
 575    }
 576  
 577    return actionsTaken;
 578  }
 579  
 580  async function storeErrorClassification(client, item) {
 581    // item = { pattern, label, group, context, example_errors, occurrence_count }
 582    const { pattern, label, group, context, example_errors, occurrence_count } = item;
 583  
 584    if (
 585      !pattern ||
 586      !label ||
 587      !['terminal', 'retriable'].includes(group) ||
 588      !['site', 'outreach'].includes(context)
 589    ) {
 590      logger.warn(`Skipping invalid error proposal: ${JSON.stringify(item)}`);
 591      return null;
 592    }
 593  
 594    try {
 595      new RegExp(pattern);
 596    } catch {
 597      logger.warn(`Skipping invalid regex "${pattern}"`);
 598      return null;
 599    }
 600  
 601    // Avoid duplicates
 602    const existing = await client.query(
 603      `SELECT id FROM error_pattern_proposals WHERE pattern = $1 AND context = $2 AND status = 'pending'`,
 604      [pattern, context]
 605    );
 606    if (existing.rows[0]) return null;
 607  
 608    const examples = Array.isArray(example_errors) ? example_errors.slice(0, 5) : [];
 609    const result = await client.query(
 610      `INSERT INTO error_pattern_proposals (pattern, label, group_name, context, example_errors, occurrence_count)
 611       VALUES ($1, $2, $3, $4, $5, $6)
 612       RETURNING id`,
 613      [pattern, label, group, context, JSON.stringify(examples), occurrence_count || 0]
 614    );
 615  
 616    return result.rows[0]?.id;
 617  }
 618  
 619  /**
 620   * Store semantic scoring results from Haiku.
 621   * Merges LLM-assessed semantic factor scores (headline, value prop, USP)
 622   * into the existing programmatic score_json and recomputes the final score.
 623   */
 624  async function storeSemanticScore(client, item) {
 625    const siteId = item.site_id;
 626    if (!siteId) return null;
 627  
 628    const siteResult = await client.query('SELECT score FROM sites WHERE id = $1', [siteId]);
 629    const site = siteResult.rows[0];
 630    const scoreRaw = getScoreJsonWithFallback(siteId);
 631    if (!scoreRaw) {
 632      // score_json lost in migration — advance semantic_scored → enriched using existing DB score
 633      const existingScore = site?.score;
 634      if (existingScore === null || existingScore === undefined) return null;
 635      const scoreThreshold = parseInt(process.env.LOW_SCORE_CUTOFF || '82', 10);
 636      const isHighScore = existingScore > scoreThreshold;
 637      await client.query(
 638        `UPDATE sites SET
 639          status = CASE WHEN status = 'semantic_scored' THEN $1 ELSE status END,
 640          updated_at = CURRENT_TIMESTAMP
 641         WHERE id = $2`,
 642        [isHighScore ? 'high_score' : 'enriched', siteId]
 643      );
 644      return siteId;
 645    }
 646  
 647    const scoreJson = JSON.parse(scoreRaw);
 648    if (!scoreJson.factor_scores) return null;
 649  
 650    // Merge Haiku's semantic scores into factor_scores
 651    const semanticFactors = ['headline_quality', 'value_proposition', 'unique_selling_proposition'];
 652    for (const factor of semanticFactors) {
 653      if (item[factor] && typeof item[factor].score === 'number') {
 654        scoreJson.factor_scores[factor] = {
 655          score: Math.min(Math.max(item[factor].score, 0), 10),
 656          reasoning: item[factor].reasoning || 'LLM semantic assessment',
 657          evidence: scoreJson.factor_scores[factor]?.evidence || '',
 658          source: 'haiku_semantic',
 659        };
 660      }
 661    }
 662  
 663    // Recompute weighted score with vision-aware weights
 664    const visionEnabled = process.env.ENABLE_VISION !== 'false';
 665    const newScore = computeWeightedScore(scoreJson.factor_scores, visionEnabled);
 666    const newGrade = computeGrade(newScore);
 667  
 668    // Mark as semantically scored
 669    scoreJson.semantic_scored = true;
 670    scoreJson.scoring_method = 'hybrid';
 671    scoreJson.conversion_score = newScore;
 672    scoreJson.letter_grade = newGrade;
 673  
 674    // Advance status:
 675    //   prog_scored     → semantic_scored (or high_score) — vision-on path
 676    //   semantic_scored → enriched         (or high_score) — vision-off path (LLM overlay done)
 677    const scoreThreshold = parseInt(process.env.LOW_SCORE_CUTOFF || '82', 10);
 678    const isHighScore = newScore !== null && newScore > scoreThreshold;
 679    const newStatusFromProg = isHighScore ? 'high_score' : 'semantic_scored';
 680    const newStatusFromSemantic = isHighScore ? 'high_score' : 'enriched';
 681  
 682    setScoreJson(siteId, scoreJson);
 683    await client.query(
 684      `UPDATE sites
 685       SET score = $1, grade = $2,
 686           status = CASE
 687             WHEN status = 'prog_scored'     THEN $3
 688             WHEN status = 'semantic_scored' THEN $4
 689             ELSE status
 690           END,
 691           rescored_at = CURRENT_TIMESTAMP,
 692           updated_at = CURRENT_TIMESTAMP
 693       WHERE id = $5`,
 694      [newScore, newGrade, newStatusFromProg, newStatusFromSemantic, siteId]
 695    );
 696  
 697    return siteId;
 698  }
 699  
 700  /**
 701   * Store scoring result from the orchestrator's score_sites batch.
 702   * Replicates the site-classification and DB-update logic from src/stages/scoring.js:scoreSite().
 703   * item = { site_id, domain, factor_scores, overall_calculation, contact_details, ... }
 704   */
 705  async function storeScoreResult(client, item) {
 706    const siteId = item.site_id;
 707    if (!siteId) {
 708      logger.warn('Score result missing site_id');
 709      return null;
 710    }
 711  
 712    // Validate factor_scores are present — incomplete responses are unusable
 713    if (!item.factor_scores) {
 714      logger.warn(`Score result for site ${siteId} missing factor_scores — skipping`);
 715      return null;
 716    }
 717  
 718    // Compute score and grade programmatically from factor scores (same as scoreWebsite())
 719    const computedScore = computeScoreFromFactors(item.factor_scores);
 720    const computedGrade = computeGrade(computedScore);
 721  
 722    // Ensure overall_calculation has the computed values
 723    if (!item.overall_calculation) item.overall_calculation = {};
 724    item.overall_calculation.conversion_score = computedScore;
 725    item.overall_calculation.letter_grade = computedGrade;
 726  
 727    const grade = computedGrade;
 728    const score = computedScore;
 729  
 730    // Domain-level blocklist check (same logic as scoring stage)
 731    const siteResult = await client.query(
 732      'SELECT domain, country_code FROM sites WHERE id = $1',
 733      [siteId]
 734    );
 735    const site = siteResult.rows[0];
 736    if (!site) {
 737      logger.warn(`Score result: site ${siteId} not found in DB`);
 738      return null;
 739    }
 740  
 741    const blocked = checkBlocklist(site.domain, site.country_code);
 742    if (blocked) {
 743      await client.query(
 744        `UPDATE sites SET status = 'ignored', error_message = $1 WHERE id = $2`,
 745        [blocked.reason, siteId]
 746      );
 747      logger.info(`Score: ${site.domain} blocklisted — ${blocked.reason}`);
 748      return siteId;
 749    }
 750  
 751    const industry = classifyIndustry(site.domain);
 752    if (industry) {
 753      const reason =
 754        industry.type === 'legal'
 755          ? `Ignored: Legal site detected from domain (${industry.reason})`
 756          : `Ignored: Regulated industry (${industry.type}) detected from domain (${industry.reason})`;
 757      await client.query(
 758        `UPDATE sites SET status = 'ignored', error_message = $1 WHERE id = $2`,
 759        [reason, siteId]
 760      );
 761      logger.info(`Score: ${site.domain} regulated industry — ${reason}`);
 762      return siteId;
 763    }
 764  
 765    // Helper: dual-write score_json to both DB and filesystem
 766    const scoreJsonStr = JSON.stringify(item);
 767  
 768    // LLM-detected classifications (directory, local business, law firm, regulated industry)
 769    const isDirectory = item.overall_calculation?.is_business_directory;
 770    if (isDirectory) {
 771      setScoreJson(siteId, scoreJsonStr);
 772      await client.query(
 773        `UPDATE sites SET status = 'ignored', error_message = $1 WHERE id = $2`,
 774        ['Ignored: Business directory (LLM detected)', siteId]
 775      );
 776      logger.info(`Score: ${site.domain} is a business directory — ignored`);
 777      return siteId;
 778    }
 779  
 780    const isLocalBusiness = item.overall_calculation?.is_local_business;
 781    if (isLocalBusiness === false) {
 782      setScoreJson(siteId, scoreJsonStr);
 783      await client.query(
 784        `UPDATE sites SET status = 'ignored', error_message = $1 WHERE id = $2`,
 785        ['Ignored: Not a local business (LLM detected)', siteId]
 786      );
 787      logger.info(`Score: ${site.domain} is not a local business — ignored`);
 788      return siteId;
 789    }
 790  
 791    const isLawFirm = item.overall_calculation?.is_law_firm;
 792    if (isLawFirm) {
 793      setScoreJson(siteId, scoreJsonStr);
 794      await client.query(
 795        `UPDATE sites SET status = 'ignored', error_message = $1 WHERE id = $2`,
 796        ['Ignored: Law firm (LLM detected)', siteId]
 797      );
 798      logger.info(`Score: ${site.domain} is a law firm — ignored`);
 799      return siteId;
 800    }
 801  
 802    const industryClassification =
 803      item.overall_calculation?.industry_classification?.toLowerCase() || '';
 804    const regulatedPatterns = [
 805      'dental',
 806      'medical',
 807      'clinic',
 808      'hospital',
 809      'pharmacy',
 810      'veterinary',
 811      'financial advis',
 812      'mortgage broker',
 813      'accounting',
 814    ];
 815    const isRegulated = regulatedPatterns.some(p => industryClassification.includes(p));
 816    if (isRegulated) {
 817      setScoreJson(siteId, scoreJsonStr);
 818      await client.query(
 819        `UPDATE sites SET status = 'ignored', error_message = $1 WHERE id = $2`,
 820        [`Ignored: Regulated industry (${industryClassification})`, siteId]
 821      );
 822      logger.info(
 823        `Score: ${site.domain} is regulated industry (${industryClassification}) — ignored`
 824      );
 825      return siteId;
 826    }
 827  
 828    // Error page detection
 829    const isErrorPage = item.overall_calculation?.is_error_page;
 830    const errorType = item.overall_calculation?.error_type;
 831    const errorDescription = item.overall_calculation?.error_description;
 832    if (isErrorPage && errorType) {
 833      const permanentErrors = ['404', '403', '410'];
 834      if (permanentErrors.includes(errorType)) {
 835        setScoreJson(siteId, scoreJsonStr);
 836        await client.query(
 837          `UPDATE sites SET status = 'ignored', error_message = $1 WHERE id = $2`,
 838          [errorDescription || `Permanent error: ${errorType}`, siteId]
 839        );
 840        logger.info(`Score: ${site.domain} is a ${errorType} error page — ignored`);
 841        return siteId;
 842      }
 843      // Temporary errors — keep at assets_captured for retry
 844      const temporaryErrors = ['5xx', 'maintenance', 'redirect'];
 845      if (temporaryErrors.includes(errorType)) {
 846        setScoreJson(siteId, scoreJsonStr);
 847        await client.query(
 848          `UPDATE sites SET status = 'assets_captured', error_message = $1 WHERE id = $2`,
 849          [errorDescription || `Temporary error: ${errorType}`, siteId]
 850        );
 851        logger.info(`Score: ${site.domain} has temporary error ${errorType} — will retry`);
 852        return siteId;
 853      }
 854    }
 855  
 856    // Broken site detection
 857    const isBrokenSite = item.overall_calculation?.is_broken_site || false;
 858    const brokenSiteDetails = item.overall_calculation?.broken_site_details || [];
 859    if (isBrokenSite) {
 860      const siteRowResult = await client.query(
 861        'SELECT recapture_count FROM sites WHERE id = $1',
 862        [siteId]
 863      );
 864      const recaptureCount = (siteRowResult.rows[0]?.recapture_count || 0) + 1;
 865      setScoreJson(siteId, scoreJsonStr);
 866      if (recaptureCount > 3) {
 867        await client.query(
 868          `UPDATE sites SET status = 'ignored', error_message = $1 WHERE id = $2`,
 869          [`Max recapture attempts reached (3) - Issues: ${brokenSiteDetails.join(', ')}`, siteId]
 870        );
 871        return siteId;
 872      }
 873      await client.query(
 874        `UPDATE sites SET status = 'assets_captured', error_message = $1, recapture_count = $2,
 875         recapture_at = NOW() + INTERVAL '7 days' WHERE id = $3`,
 876        [
 877          `Broken site detected (attempt ${recaptureCount}/3) - Issues: ${brokenSiteDetails.join(', ')}`,
 878          recaptureCount,
 879          siteId,
 880        ]
 881      );
 882      return siteId;
 883    }
 884  
 885    // Extract location
 886    const city = item.overall_calculation?.city || null;
 887    const countryCode = item.overall_calculation?.country_code || null;
 888    const state = item.overall_calculation?.state || null;
 889  
 890    // Determine target status (same logic as scoring.js)
 891    const scoreThreshold = parseInt(process.env.LOW_SCORE_CUTOFF || '82', 10);
 892    // HTML-only mode: skip straight to semantic_scored (rescoring is a no-op without screenshots)
 893    const enableVision = process.env.ENABLE_VISION !== 'false';
 894    const newStatus =
 895      score !== null && score > scoreThreshold
 896        ? 'high_score'
 897        : enableVision
 898          ? 'prog_scored'
 899          : 'semantic_scored';
 900  
 901    // When vision disabled, save contact_details to contacts_json filesystem
 902    if (!enableVision && item.contact_details) {
 903      setContactsJson(siteId, JSON.stringify(item.contact_details));
 904    }
 905  
 906    // Write score to filesystem; DB no longer stores score_json/contacts_json blobs
 907    setScoreJson(siteId, scoreJsonStr);
 908  
 909    await client.query(
 910      `UPDATE sites SET
 911         grade = $1, score = $2,
 912         city = $3, country_code = $4, state = $5,
 913         status = $6,
 914         scored_at = CURRENT_TIMESTAMP,
 915         rescored_at = CASE WHEN $7 = 'semantic_scored' THEN CURRENT_TIMESTAMP ELSE rescored_at END,
 916         error_message = NULL
 917       WHERE id = $8`,
 918      [grade, score, city, countryCode, state, newStatus, newStatus, siteId]
 919    );
 920  
 921    await resetRetries(siteId);
 922  
 923    logger.info(`Score stored: site ${siteId} (${site.domain}) → ${grade} (${score}) → ${newStatus}`);
 924    return siteId;
 925  }
 926  
 927  /**
 928   * Clean phone numbers from LLM enrichment output — normalizes to E.164 format.
 929   * @param {Array<Object|string>} phones - Phone numbers from LLM
 930   * @returns {Array<Object>} Normalized phone numbers
 931   */
 932  function cleanEnrichPhones(phones) {
 933    if (!Array.isArray(phones)) return [];
 934    return phones
 935      .map(phone => {
 936        if (typeof phone === 'string') {
 937          return { number: normalizePhoneNumber(phone), label: '' };
 938        }
 939        if (phone && phone.number) {
 940          return { number: normalizePhoneNumber(phone.number), label: phone.label || '' };
 941        }
 942        return null;
 943      })
 944      .filter(Boolean);
 945  }
 946  
 947  /**
 948   * Store enrichment result from the orchestrator's enrich_sites batch.
 949   * This handles extractInitialContacts-equivalent: sets/updates contacts_json from HTML analysis.
 950   * The pipeline's enrich stage still handles browser navigation for key_pages.
 951   * item = { site_id, domain, business_name, city, country_code, state, email_addresses,
 952   *          phone_numbers, social_profiles, key_pages, primary_contact_form }
 953   */
 954  async function storeEnrichResult(client, item) {
 955    const siteId = item.site_id;
 956    if (!siteId) {
 957      logger.warn('Enrich result missing site_id');
 958      return null;
 959    }
 960  
 961    const siteResult = await client.query(
 962      'SELECT country_code, status FROM sites WHERE id = $1',
 963      [siteId]
 964    );
 965    const site = siteResult.rows[0];
 966    if (!site) {
 967      logger.warn(`Enrich result: site ${siteId} not found in DB`);
 968      return null;
 969    }
 970  
 971    // Build enriched contacts_json from LLM result (filesystem first)
 972    const contactsRaw = getContactsJsonWithFallback(siteId);
 973    let contactsJson = contactsRaw ? JSON.parse(contactsRaw) : {};
 974  
 975    // Merge LLM-extracted data into existing contacts_json
 976    // Prefer LLM values for business_name/city/country_code/state (previously missing)
 977    if (item.business_name && !contactsJson.business_name) {
 978      contactsJson.business_name = item.business_name;
 979    }
 980    if (item.city && !contactsJson.city) {
 981      contactsJson.city = item.city;
 982    }
 983    if (item.country_code && !contactsJson.country_code) {
 984      contactsJson.country_code = item.country_code;
 985    }
 986    if (item.state && !contactsJson.state) {
 987      contactsJson.state = item.state;
 988    }
 989  
 990    // Merge arrays (avoid duplicates by URI/email/number)
 991    if (Array.isArray(item.email_addresses)) {
 992      const existing = new Set(
 993        (contactsJson.email_addresses || []).map(e => (typeof e === 'string' ? e : e.email))
 994      );
 995      const newEmails = item.email_addresses.filter(e => {
 996        const addr = typeof e === 'string' ? e : e.email;
 997        return addr && !existing.has(addr);
 998      });
 999      contactsJson.email_addresses = [...(contactsJson.email_addresses || []), ...newEmails];
1000    }
1001  
1002    if (Array.isArray(item.phone_numbers)) {
1003      const cleaned = cleanEnrichPhones(item.phone_numbers);
1004      const existing = new Set(
1005        (contactsJson.phone_numbers || []).map(p => (typeof p === 'string' ? p : p.number))
1006      );
1007      const newPhones = cleaned.filter(p => p.number && !existing.has(p.number));
1008      contactsJson.phone_numbers = [...(contactsJson.phone_numbers || []), ...newPhones];
1009    }
1010  
1011    if (Array.isArray(item.social_profiles)) {
1012      const existing = new Set(
1013        (contactsJson.social_profiles || []).map(s => (typeof s === 'string' ? s : s.url))
1014      );
1015      const newSocial = item.social_profiles.filter(s => {
1016        const url = typeof s === 'string' ? s : s.url;
1017        return url && !existing.has(url);
1018      });
1019      contactsJson.social_profiles = [...(contactsJson.social_profiles || []), ...newSocial];
1020    }
1021  
1022    if (Array.isArray(item.key_pages)) {
1023      const existing = new Set(contactsJson.key_pages || []);
1024      const newPages = item.key_pages.filter(p => p && !existing.has(p));
1025      contactsJson.key_pages = [...(contactsJson.key_pages || []), ...newPages];
1026    }
1027  
1028    if (item.primary_contact_form && !contactsJson.primary_contact_form) {
1029      contactsJson.primary_contact_form = item.primary_contact_form;
1030    }
1031  
1032    // Clean invalid social links
1033    contactsJson = cleanInvalidSocialLinks(contactsJson);
1034  
1035    const city = contactsJson.city || null;
1036    const countryCode = contactsJson.country_code || null;
1037    const state = contactsJson.state || null;
1038  
1039    // enrichment_pass is set in the input batch but not echoed back in the LLM output JSON.
1040    // Infer from DB status when the field is absent (enriched_regex = LLM pass pending).
1041    const isLlmPass =
1042      item.enrichment_pass === 'llm' ||
1043      (item.enrichment_pass === undefined && site.status === 'enriched_regex');
1044  
1045    // Write contacts to filesystem; store sentinel in DB to preserve IS NOT NULL gating
1046    setContactsJson(siteId, JSON.stringify(contactsJson));
1047  
1048    if (isLlmPass) {
1049      // LLM pass over key_pages_html — set enriched_llm, clear key_pages_html to reclaim space
1050      deleteKeyPagesHtml(siteId);
1051      await client.query(
1052        `UPDATE sites SET
1053           city = COALESCE($1, city),
1054           country_code = COALESCE($2, country_code),
1055           state = COALESCE($3, state),
1056           status = 'enriched_llm',
1057           enriched_at = CURRENT_TIMESTAMP,
1058           key_pages_html = NULL,
1059           updated_at = CURRENT_TIMESTAMP
1060         WHERE id = $4`,
1061        [city, countryCode, state, siteId]
1062      );
1063    } else {
1064      // Initial pass from html_dom — status stays at 'semantic_scored'/'vision_scored' so the
1065      // pipeline enrich stage can still run browser navigation on key_pages
1066      await client.query(
1067        `UPDATE sites SET
1068           city = COALESCE($1, city),
1069           country_code = COALESCE($2, country_code),
1070           state = COALESCE($3, state),
1071           updated_at = CURRENT_TIMESTAMP
1072         WHERE id = $4`,
1073        [city, countryCode, state, siteId]
1074      );
1075    }
1076  
1077    logger.info(
1078      `Enrich stored: site ${siteId} (${item.domain}) — emails:${contactsJson.email_addresses?.length || 0} phones:${contactsJson.phone_numbers?.length || 0} social:${contactsJson.social_profiles?.length || 0}`
1079    );
1080    return siteId;
1081  }
1082  
1083  /**
1084   * Store monitor_health result — logs findings, no DB writes.
1085   * item = { severity, summary, findings[], recommended_actions[] }
1086   */
1087  async function storeMonitorHealthResult(client, item) {
1088    if (!item.summary) {
1089      logger.warn('monitor_health result missing summary');
1090      return null;
1091    }
1092    logger.info(`Health check [${item.severity || 'info'}]: ${item.summary}`);
1093    for (const f of item.findings || []) {
1094      logger.info(`  [${f.severity}] ${f.component}: ${f.issue}`);
1095    }
1096    for (const a of item.recommended_actions || []) {
1097      logger.info(`  ACTION: ${a}`);
1098    }
1099    return item.summary;
1100  }
1101  
1102  /**
1103   * Store triage_errors result — creates fix tasks for critical/high errors.
1104   * item = { error_message, type, severity, action, summary, suggested_fix }
1105   */
1106  async function storeTriageResult(client, item) {
1107    if (!item.error_message || !item.type) {
1108      logger.warn('triage_errors result missing required fields');
1109      return null;
1110    }
1111  
1112    logger.info(`Triage [${item.severity}/${item.type}]: ${item.summary}`);
1113  
1114    if (item.action !== 'create_fix_task') return item.error_message;
1115  
1116    // Only create tasks for high/critical
1117    if (!['critical', 'high'].includes(item.severity)) return item.error_message;
1118  
1119    const priority = item.severity === 'critical' ? 9 : 7;
1120    await client.query(
1121      `INSERT INTO tel.agent_tasks
1122         (task_type, assigned_to, created_by, priority, status, context_json)
1123       VALUES ('triage_fix', 'developer', 'incident_commander', $1, 'pending', $2)`,
1124      [
1125        priority,
1126        JSON.stringify({
1127          error_message: item.error_message,
1128          error_type: item.type,
1129          severity: item.severity,
1130          summary: item.summary,
1131          suggested_fix: item.suggested_fix || null,
1132        }),
1133      ]
1134    );
1135  
1136    return item.error_message;
1137  }
1138  
1139  /**
1140   * Store check_docs result — logs stale docs findings, no DB writes.
1141   * item = { file_path, stale_docs[], summary }
1142   */
1143  async function storeCheckDocsResult(client, item) {
1144    if (!item.file_path) {
1145      logger.warn('check_docs result missing file_path');
1146      return null;
1147    }
1148    const count = Array.isArray(item.stale_docs) ? item.stale_docs.length : 0;
1149    if (count === 0) {
1150      logger.info(`check_docs: ${item.file_path} — docs current`);
1151      return item.file_path;
1152    }
1153    logger.info(`check_docs: ${item.file_path} — ${count} stale doc(s)`);
1154    for (const d of item.stale_docs) {
1155      logger.info(`  [${d.location}] ${d.issue}`);
1156    }
1157    return item.file_path;
1158  }
1159  
1160  /**
1161   * Store code review findings as agent_tasks for later fixing.
1162   * item = { file_path, findings: [{severity, category, line, description, suggestion}], summary }
1163   *
1164   * Each finding with severity >= 7 becomes a code_review_fix task.
1165   * Findings with severity <= 6 are stored but skipped (too low priority to auto-fix).
1166   */
1167  async function storeCodeReviewFindings(client, item) {
1168    if (!item.file_path) {
1169      logger.warn('Code review result missing file_path');
1170      return null;
1171    }
1172  
1173    const findings = Array.isArray(item.findings) ? item.findings : [];
1174  
1175    if (findings.length === 0) {
1176      logger.info(`Code review: ${item.file_path} — no findings`);
1177      return item.file_path; // Counts as stored (reviewed with no issues)
1178    }
1179  
1180    let tasksCreated = 0;
1181  
1182    for (const finding of findings) {
1183      const severity = parseInt(finding.severity || '5', 10);
1184  
1185      // Only create fix tasks for bugs and security issues (severity >= 7)
1186      if (severity < 7) {
1187        logger.info(
1188          `Code review: ${item.file_path}:${finding.line || '?'} severity=${severity} — logged, not queued`
1189        );
1190        continue;
1191      }
1192  
1193      await client.query(
1194        `INSERT INTO tel.agent_tasks
1195           (task_type, assigned_to, created_by, priority, status, context_json)
1196         VALUES ('code_review_fix', 'developer', 'code_reviewer', $1, 'pending', $2)`,
1197        [
1198          severity,
1199          JSON.stringify({
1200            file_path: item.file_path,
1201            line: finding.line || null,
1202            category: finding.category || 'unknown',
1203            description: finding.description,
1204            suggestion: finding.suggestion || null,
1205            severity,
1206          }),
1207        ]
1208      );
1209      tasksCreated++;
1210    }
1211  
1212    logger.info(
1213      `Code review: ${item.file_path} — ${findings.length} findings, ${tasksCreated} fix tasks created`
1214    );
1215    return item.file_path;
1216  }
1217  
1218  /**
1219   * Store the Haiku evidence-merge result.
1220   *
1221   * item = {
1222   *   site_id,
1223   *   revised_scores: { factor: { original, revised, reason } },
1224   *   additional_findings: [{ factor, finding, severity }],
1225   *   evidence_summary: string
1226   * }
1227   *
1228   * Writes the full merged object to sites.evidence_json.
1229   * Also patches individual factor scores in sites.score_json where revisions exist.
1230   */
1231  async function storeEvidenceMerge(client, item) {
1232    if (!item.site_id) {
1233      logger.warn('Evidence merge result missing site_id');
1234      return null;
1235    }
1236  
1237    const siteExistsResult = await client.query(
1238      'SELECT id FROM sites WHERE id = $1',
1239      [item.site_id]
1240    );
1241    if (!siteExistsResult.rows[0]) {
1242      logger.warn(`Evidence merge: site ${item.site_id} not found`);
1243      return null;
1244    }
1245  
1246    // Patch score_json with revised scores (filesystem)
1247    const revisedScores = item.revised_scores || {};
1248    const revisionCount = Object.keys(revisedScores).length;
1249    const scoreRaw = getScoreJsonWithFallback(item.site_id);
1250  
1251    if (revisionCount > 0 && scoreRaw) {
1252      try {
1253        const scoreData = JSON.parse(scoreRaw);
1254        for (const [factor, revision] of Object.entries(revisedScores)) {
1255          if (scoreData.factor_scores && scoreData.factor_scores[factor] !== undefined) {
1256            scoreData.factor_scores[factor].score = revision.revised;
1257            scoreData.factor_scores[factor].evidence_revised = true;
1258            scoreData.factor_scores[factor].evidence_reason = revision.reason;
1259          }
1260        }
1261        // Write patched score to filesystem only; DB no longer stores score_json
1262        setScoreJson(item.site_id, JSON.stringify(scoreData));
1263      } catch (e) {
1264        logger.warn(
1265          `Evidence merge: failed to patch score_json for site ${item.site_id}: ${e.message}`
1266        );
1267      }
1268    }
1269  
1270    // Write full evidence_json
1271    await client.query(
1272      'UPDATE sites SET evidence_json = $1 WHERE id = $2',
1273      [
1274        JSON.stringify({
1275          revised_scores: revisedScores,
1276          additional_findings: item.additional_findings || [],
1277          evidence_summary: item.evidence_summary || '',
1278          merged_at: new Date().toISOString(),
1279        }),
1280        item.site_id,
1281      ]
1282    );
1283  
1284    const additionalCount = (item.additional_findings || []).length;
1285    logger.info(
1286      `Evidence merge: site ${item.site_id} — ${revisionCount} score revisions, ${additionalCount} additional findings`
1287    );
1288    if (item.evidence_summary) {
1289      logger.info(`Evidence summary: ${item.evidence_summary}`);
1290    }
1291  
1292    return item.site_id;
1293  }
1294  
1295  /**
1296   * Store a follow-up message generated by the LLM.
1297   * Each result item contains: site_id, contact_method, contact_uri, message_body,
1298   * subject_line (email only), sequence_step, country_code.
1299   *
1300   * Similar to storeProposal but uses message_type='followupN' and includes sequence_step.
1301   */
1302  async function storeFollowupGenerate(client, item) {
1303    const contactMethod = item.contact_method || 'email';
1304    let contactUri = item.contact_uri || '';
1305    const sequenceStep = item.sequence_step || item.next_step || 2;
1306  
1307    if (!contactUri || !item.site_id) {
1308      logger.warn(`Followup missing contact_uri or site_id`);
1309      return null;
1310    }
1311  
1312    // Normalize phone numbers
1313    if (contactMethod === 'sms' && item.country_code) {
1314      contactUri = addCountryCode(contactUri, item.country_code);
1315    }
1316  
1317    // GDPR check
1318    let approvalStatus = 'pending';
1319    if (contactMethod === 'email' && item.country_code) {
1320      const country = getCountryByCode(item.country_code);
1321      if (country?.requiresGDPRCheck) {
1322        approvalStatus = 'gdpr_blocked';
1323      }
1324    }
1325  
1326    // Skip blocked emails
1327    if (contactMethod === 'email') {
1328      if (isGovernmentEmail(contactUri) || isEducationEmail(contactUri) || isDemoEmail(contactUri)) {
1329        logger.info(`Skipping blocked email for followup: ${contactUri}`);
1330        return null;
1331      }
1332    }
1333  
1334    // Resolve spintax
1335    const messageBody = spin(item.message_body || '');
1336    const subjectLine = spin(item.subject_line || '');
1337  
1338    if (!messageBody || messageBody.length < 10) {
1339      logger.warn(`Followup message too short for site ${item.site_id} step ${sequenceStep}`);
1340      return null;
1341    }
1342  
1343    // SMS length check (more lenient for follow-ups — up to 320 chars / 2 segments)
1344    if (contactMethod === 'sms' && messageBody.length > 320) {
1345      logger.warn(
1346        `SMS followup too long (${messageBody.length}/320) for site ${item.site_id} step ${sequenceStep}`
1347      );
1348      return null;
1349    }
1350  
1351    const messageType = `followup${sequenceStep}`;
1352  
1353    // Check for existing followup at this step for this site+method+uri
1354    const existingResult = await client.query(
1355      `SELECT 1 FROM messages
1356       WHERE site_id = $1 AND direction = 'outbound'
1357         AND contact_method = $2 AND contact_uri = $3
1358         AND sequence_step = $4`,
1359      [item.site_id, contactMethod, contactUri, sequenceStep]
1360    );
1361  
1362    if (existingResult.rows[0]) {
1363      logger.info(`Followup step ${sequenceStep} already exists for site ${item.site_id} ${contactMethod}`);
1364      return null;
1365    }
1366  
1367    const result = await client.query(
1368      `INSERT INTO messages (site_id, direction, message_body, subject_line, contact_method, contact_uri, approval_status, message_type, sequence_step)
1369       VALUES ($1, 'outbound', $2, $3, $4, $5, $6, $7, $8)
1370       RETURNING id`,
1371      [item.site_id, messageBody, subjectLine || null, contactMethod, contactUri, approvalStatus, messageType, sequenceStep]
1372    );
1373  
1374    // Update next_followup_at for the next step
1375    const TOUCH_CADENCE = { 2: 3, 3: 7, 4: 14, 5: 21, 6: 28, 7: 35, 8: 42 };
1376    const nextNextStep = sequenceStep + 1;
1377    if (nextNextStep <= 8 && TOUCH_CADENCE[nextNextStep]) {
1378      const daysUntilNext = TOUCH_CADENCE[nextNextStep] - TOUCH_CADENCE[sequenceStep];
1379      await client.query(
1380        `UPDATE sites
1381         SET next_followup_at = NOW() + make_interval(days => $2),
1382             updated_at = CURRENT_TIMESTAMP
1383         WHERE id = $1`,
1384        [item.site_id, daysUntilNext]
1385      );
1386    } else {
1387      // All follow-ups generated — clear next_followup_at
1388      await client.query(
1389        `UPDATE sites
1390         SET next_followup_at = NULL,
1391             updated_at = CURRENT_TIMESTAMP
1392         WHERE id = $1`,
1393        [item.site_id]
1394      );
1395    }
1396  
1397    logger.info(`Stored followup step ${sequenceStep} for site ${item.site_id} (${contactMethod} → ${contactUri})`);
1398    return result.rows[0]?.id;
1399  }
1400  
1401  // Main
1402  const handlers = {
1403    proposals_email: storeProposal,
1404    proposals_sms: storeProposal,
1405    proofread: storeProofreadDecision,
1406    classify_replies: storeClassification,
1407    extract_names: storeName,
1408    reply_responses: storeReplyResponse,
1409    oversee: storeOverseerResult,
1410    classify_errors: storeErrorClassification,
1411    score_semantic: storeSemanticScore,
1412    score_sites: storeScoreResult,
1413    enrich_sites: storeEnrichResult,
1414    code_review: storeCodeReviewFindings,
1415    monitor_health: storeMonitorHealthResult,
1416    triage_errors: storeTriageResult,
1417    check_docs: storeCheckDocsResult,
1418    evidence_merge: storeEvidenceMerge,
1419    followup_generate: storeFollowupGenerate,
1420  };
1421  
1422  async function main() {
1423    const raw = await readStdin();
1424  
1425    let input;
1426    try {
1427      input = JSON.parse(raw);
1428    } catch {
1429      console.error('Invalid JSON on stdin');
1430      process.exit(1);
1431    }
1432  
1433    const { batch_type, results } = input;
1434    if (!batch_type || !Array.isArray(results)) {
1435      console.error('Expected { batch_type, results: [...] }');
1436      process.exit(1);
1437    }
1438  
1439    const handler = handlers[batch_type];
1440    if (!handler) {
1441      console.error(`Unknown batch type: ${batch_type}`);
1442      process.exit(1);
1443    }
1444  
1445    let stored = 0;
1446    let skipped = 0;
1447    let errors = 0;
1448  
1449    try {
1450      await withTransaction(async (client) => {
1451        for (const item of results) {
1452          try {
1453            await client.query('SAVEPOINT item_save');
1454            const id = await handler(client, item);
1455            await client.query('RELEASE SAVEPOINT item_save');
1456            if (id) {
1457              stored++;
1458            } else {
1459              skipped++;
1460            }
1461          } catch (err) {
1462            await client.query('ROLLBACK TO SAVEPOINT item_save');
1463            errors++;
1464            logger.error(`Failed to store item: ${err.message}`);
1465          }
1466        }
1467      });
1468      const summary = { batch_type, total: results.length, stored, skipped, errors };
1469      console.log(JSON.stringify(summary));
1470    } finally {
1471      await closePool();
1472    }
1473  }
1474  
1475  // Only run main() when this script is executed directly (not imported by tests)
1476  if (import.meta.url === `file://${process.argv[1]}`) {
1477    main().catch(err => {
1478      console.error(err.message);
1479      process.exit(1);
1480    });
1481  }
1482  
1483  // Named exports for unit testing (guards prevent main() from running on import)
1484  export { storeSemanticScore, handlers };