/ scripts / claude-batch.js
claude-batch.js
   1  #!/usr/bin/env node
   2  
   3  /**
   4   * Claude Batch — Pull work from DB and output JSON for Claude processing.
   5   *
   6   * Usage:
   7   *   node scripts/claude-batch.js <batch_type> [limit]
   8   *
   9   * Batch types:
  10   *   proposals_email   — Sites needing email proposals
  11   *   proposals_sms     — Sites needing SMS proposals
  12   *   classify_replies  — Inbound messages needing intent classification
  13   *   extract_names     — Contacts missing names
  14   *   reply_responses   — Inbound messages needing a response
  15   *   oversee           — System health snapshot for overseer analysis
  16   *   classify_errors   — Unknown pipeline errors needing pattern proposals
  17   *   score_sites       — Sites needing HTML-only LLM scoring (ENABLE_VISION=false)
  18   *   enrich_sites      — Sites needing LLM contact enrichment from HTML
  19   *
  20   * Outputs JSON to stdout. Load appropriate prompt/context docs separately.
  21   */
  22  
  23  import { execSync } from 'child_process';
  24  import { existsSync, readFileSync, writeFileSync } from 'fs';
  25  import { join } from 'path';
  26  import { getAll, getOne, run, closePool } from '../src/utils/db.js';
  27  import { COUNTRIES } from '../src/config/countries.js';
  28  import { computeGrade } from '../src/score.js';
  29  import { categorizeError } from '../src/utils/error-categories.js';
  30  import { extractScoringText } from '../src/utils/programmatic-scorer.js';
  31  import { stripInjectionMarkers, wrapUntrusted } from '../src/utils/llm-sanitizer.js';
  32  import { safeJsonParse } from '../src/utils/error-handler.js';
  33  import { readHtmlDom, readKeyPagesHtml } from '../src/utils/html-storage.js';
  34  import { getScoreJsonWithFallback, getScoreDataWithFallback } from '../src/utils/score-storage.js';
  35  import { getContactsDataWithFallback } from '../src/utils/contacts-storage.js';
  36  import '../src/utils/load-env.js';
  37  
  38  const [, , batchType, limitArg] = process.argv;
  39  const limit = parseInt(limitArg || '10', 10);
  40  
  41  if (!batchType) {
  42    console.error(
  43      'Usage: node scripts/claude-batch.js <batch_type> [limit]\n' +
  44        'Types: proposals_email, proposals_sms, classify_replies, extract_names, reply_responses, score_sites, score_semantic, enrich_sites, proofread, followup_check, followup_generate'
  45    );
  46    process.exit(1);
  47  }
  48  
  49  function getSameLanguageCodes(countryCode) {
  50    const lang = COUNTRIES[countryCode]?.language;
  51    if (!lang) return [countryCode];
  52    return Object.entries(COUNTRIES)
  53      .filter(([, c]) => c.language === lang)
  54      .map(([code]) => code);
  55  }
  56  
  57  async function getCompetitorBenchmark(siteData) {
  58    const scoreJson = getScoreDataWithFallback(siteData.id, siteData);
  59    const industry = scoreJson?.industry_classification;
  60    if (!industry || !siteData.score) return null;
  61  
  62    const codes = getSameLanguageCodes(siteData.country_code);
  63    const placeholders = codes.map((_, i) => `$${i + 3}`).join(',');
  64  
  65    const competitor = await getOne(
  66      `SELECT domain, score, grade
  67       FROM sites
  68       WHERE industry_classification = $1
  69         AND domain != $2
  70         AND country_code IN (${placeholders})
  71         AND status IN ('prog_scored', 'semantic_scored', 'vision_scored', 'enriched', 'enriched_regex', 'enriched_llm', 'proposals_drafted', 'outreach_partial', 'outreach_sent')
  72         AND score IS NOT NULL
  73         AND score > $${codes.length + 3}
  74       ORDER BY score DESC
  75       LIMIT 1`,
  76      [industry, siteData.domain, ...codes, siteData.score]
  77    );
  78  
  79    if (!competitor) return null;
  80  
  81    return {
  82      domain: competitor.domain,
  83      score: competitor.score,
  84      grade: competitor.grade || computeGrade(competitor.score),
  85      industry,
  86      language_match: true,
  87    };
  88  }
  89  
  90  async function getIndustryBenchmark(siteData) {
  91    const scoreJson = getScoreDataWithFallback(siteData.id, siteData);
  92    const industry = scoreJson?.industry_classification;
  93    if (!industry || !siteData.score) return null;
  94  
  95    const result = await getOne(
  96      `SELECT AVG(score) as avg_score, COUNT(*) as sample_size
  97       FROM sites
  98       WHERE industry_classification = $1
  99         AND country_code = $2
 100         AND score IS NOT NULL
 101         AND domain != $3`,
 102      [industry, siteData.country_code, siteData.domain]
 103    );
 104  
 105    if (!result || !result.sample_size || result.sample_size < 10) return null;
 106  
 107    return {
 108      industry,
 109      country_code: siteData.country_code,
 110      avg_score: Math.round(result.avg_score * 10) / 10,
 111      sample_size: result.sample_size,
 112      site_score: siteData.score,
 113      difference: Math.round((siteData.score - result.avg_score) * 10) / 10,
 114    };
 115  }
 116  
 117  async function getAggregateStats() {
 118    const stats = await getOne(
 119      `SELECT COUNT(*) as total, AVG(score) as avg_score, MAX(score) as best_score
 120       FROM sites WHERE score IS NOT NULL`
 121    );
 122  
 123    return {
 124      total_sites_scored: stats.total,
 125      avg_score: Math.round(stats.avg_score * 10) / 10,
 126      best_score: stats.best_score,
 127    };
 128  }
 129  
 130  // Industry benchmarks for ROI estimation — avg job value ($), typical site conversion rate,
 131  // estimated monthly visitors for a local business website.
 132  const INDUSTRY_ROI_BENCHMARKS = {
 133    plumber:        { avg_job: 350, conversion_rate: 0.03, monthly_visitors: 800 },
 134    electrician:    { avg_job: 400, conversion_rate: 0.03, monthly_visitors: 700 },
 135    roofer:         { avg_job: 500, conversion_rate: 0.025, monthly_visitors: 600 },
 136    hvac:           { avg_job: 450, conversion_rate: 0.03, monthly_visitors: 700 },
 137    painter:        { avg_job: 300, conversion_rate: 0.03, monthly_visitors: 500 },
 138    landscaper:     { avg_job: 250, conversion_rate: 0.035, monthly_visitors: 600 },
 139    pest_control:   { avg_job: 200, conversion_rate: 0.04, monthly_visitors: 700 },
 140    cleaner:        { avg_job: 180, conversion_rate: 0.04, monthly_visitors: 800 },
 141    locksmith:      { avg_job: 200, conversion_rate: 0.035, monthly_visitors: 500 },
 142    mechanic:       { avg_job: 350, conversion_rate: 0.03, monthly_visitors: 600 },
 143    dentist:        { avg_job: 250, conversion_rate: 0.04, monthly_visitors: 900 },
 144    physio:         { avg_job: 100, conversion_rate: 0.04, monthly_visitors: 700 },
 145    chiropractor:   { avg_job: 100, conversion_rate: 0.04, monthly_visitors: 600 },
 146    lawyer:         { avg_job: 1500, conversion_rate: 0.02, monthly_visitors: 500 },
 147    accountant:     { avg_job: 800, conversion_rate: 0.025, monthly_visitors: 400 },
 148    real_estate:    { avg_job: 5000, conversion_rate: 0.015, monthly_visitors: 800 },
 149    restaurant:     { avg_job: 40, conversion_rate: 0.05, monthly_visitors: 1500 },
 150    cafe:           { avg_job: 20, conversion_rate: 0.05, monthly_visitors: 1200 },
 151    gym:            { avg_job: 60, conversion_rate: 0.04, monthly_visitors: 1000 },
 152    salon:          { avg_job: 80, conversion_rate: 0.04, monthly_visitors: 800 },
 153    veterinarian:   { avg_job: 200, conversion_rate: 0.035, monthly_visitors: 600 },
 154    builder:        { avg_job: 2000, conversion_rate: 0.02, monthly_visitors: 500 },
 155  };
 156  // Fallback for industries not in the map
 157  const DEFAULT_ROI_BENCHMARK = { avg_job: 300, conversion_rate: 0.03, monthly_visitors: 600 };
 158  
 159  async function getROIEstimate(siteData) {
 160    const scoreJson = getScoreDataWithFallback(siteData.id, siteData) || {};
 161    const industry = scoreJson.industry_classification;
 162    if (!industry || !siteData.score) return null;
 163  
 164    // Match industry — try exact, then partial match on first word
 165    const industryLower = industry.toLowerCase().replace(/[^a-z_]/g, '');
 166    const benchmark = INDUSTRY_ROI_BENCHMARKS[industryLower]
 167      || Object.entries(INDUSTRY_ROI_BENCHMARKS).find(
 168        ([k]) => industryLower.includes(k) || k.includes(industryLower)
 169      )?.[1]
 170      || DEFAULT_ROI_BENCHMARK;
 171  
 172    // Get audit price for this country
 173    const countryRow = await getOne(
 174      'SELECT price_usd, currency_symbol, currency_code FROM countries WHERE country_code = $1',
 175      [siteData.country_code]
 176    );
 177    const priceFormatted = countryRow
 178      ? `${countryRow.currency_symbol}${Math.round(countryRow.price_usd / 100)} ${countryRow.currency_code}`
 179      : '$297 USD';
 180    const priceValue = countryRow ? countryRow.price_usd / 100 : 297;
 181  
 182    // Estimate: at current score, what % of potential conversions are being missed?
 183    // Simple model: score of 50 = ~50% conversion efficiency, score of 80 = ~80% efficiency
 184    const efficiency = siteData.score / 100;
 185    const potentialMonthlyConversions = benchmark.monthly_visitors * benchmark.conversion_rate;
 186    const actualMonthlyConversions = potentialMonthlyConversions * efficiency;
 187    const missedConversions = potentialMonthlyConversions - actualMonthlyConversions;
 188    const missedMonthlyRevenue = Math.round(missedConversions * benchmark.avg_job);
 189    const missedAnnualRevenue = missedMonthlyRevenue * 12;
 190    const auditFraction = missedAnnualRevenue > 0
 191      ? (priceValue / missedAnnualRevenue * 100).toFixed(1)
 192      : null;
 193  
 194    return {
 195      estimated_monthly_visitors: benchmark.monthly_visitors,
 196      avg_job_value: benchmark.avg_job,
 197      industry_conversion_rate: benchmark.conversion_rate,
 198      estimated_monthly_revenue_at_stake: missedMonthlyRevenue,
 199      audit_price: priceFormatted,
 200      audit_as_fraction_of_annual_loss: auditFraction ? `${auditFraction}%` : null,
 201    };
 202  }
 203  
 204  async function fetchProposalsBatch(channel) {
 205    const contactMethod = channel === 'email' ? 'email' : 'sms';
 206  
 207    // Support both contact schemas:
 208    //   new: contacts_json.contacts[].type = 'email'|'sms'
 209    //   old: contacts_json.email_addresses[] or contacts_json.phone_numbers[]
 210  
 211    // Note: contacts_json column holds sentinel '{"_fs":true}' — real data is on filesystem.
 212    // We cannot use json_extract(contacts_json, ...) for contact-type gating in SQL.
 213    // Instead, fetch a wider pool and filter in JS after loading from filesystem.
 214    const candidateLimit = limit * 4; // Oversample to account for filtered-out sites
 215    const sites = await getAll(
 216      `SELECT s.id, s.domain, s.landing_page_url, s.keyword, s.score, s.grade,
 217              s.evidence_json, s.country_code, s.city, s.state
 218       FROM sites s
 219       WHERE s.status IN ('enriched', 'enriched_llm')
 220         AND s.score IS NOT NULL
 221         AND s.score < 82
 222         AND s.evidence_json IS NOT NULL
 223         AND NOT EXISTS (
 224           SELECT 1 FROM messages m
 225           WHERE m.site_id = s.id
 226             AND m.contact_method = $1
 227             AND m.direction = 'outbound'
 228             AND m.message_type = 'outreach'
 229         )
 230       ORDER BY s.score ASC
 231       LIMIT $2`,
 232      [contactMethod, candidateLimit]
 233    );
 234  
 235    const aggregateStats = await getAggregateStats();
 236  
 237    // Filter to sites that actually have the required contact type (filesystem read)
 238    const filtered = [];
 239    for (const site of sites) {
 240      if (filtered.length >= limit) break;
 241      const contactsJson = getContactsDataWithFallback(site.id, site) || {};
 242      const hasContact =
 243        (Array.isArray(contactsJson.contacts) &&
 244          contactsJson.contacts.some(c => c.type === contactMethod)) ||
 245        (contactMethod === 'email' && (contactsJson.email_addresses?.length ?? 0) > 0) ||
 246        (contactMethod === 'sms' && (contactsJson.phone_numbers?.length ?? 0) > 0);
 247      if (hasContact) filtered.push({ ...site, _contacts: contactsJson });
 248    }
 249  
 250    return Promise.all(filtered.map(async site => {
 251      const scoreJson = getScoreDataWithFallback(site.id, site) || {};
 252      const contactsJson = site._contacts || {};
 253  
 254      // Normalise both schemas into a unified contacts list
 255      let contacts;
 256      if (Array.isArray(contactsJson.contacts)) {
 257        contacts = contactsJson.contacts.filter(c => c.type === contactMethod);
 258      } else if (contactMethod === 'email') {
 259        contacts = (contactsJson.email_addresses || []).map(e => ({
 260          type: 'email',
 261          uri: e.email,
 262          name: e.name || null,
 263        }));
 264      } else {
 265        contacts = (contactsJson.phone_numbers || []).map(p => ({
 266          type: 'sms',
 267          uri: p.number,
 268          name: p.name || null,
 269        }));
 270      }
 271  
 272      return {
 273        site_id: site.id,
 274        domain: site.domain,
 275        landing_page_url: site.landing_page_url,
 276        keyword: site.keyword,
 277        score: site.score,
 278        grade: site.grade || computeGrade(site.score),
 279        industry: scoreJson.industry_classification || null,
 280        country_code: site.country_code,
 281        city: site.city || contactsJson.city || null,
 282        state: site.state || contactsJson.state || null,
 283        business_name: contactsJson.business_name || site.domain,
 284        contacts: contacts.map(c => ({
 285          type: c.type,
 286          uri: c.uri || c.value,
 287          name: c.name || null,
 288        })),
 289        weaknesses: extractWeaknesses(scoreJson),
 290        evidence_summary: site.evidence_json
 291          ? safeJsonParse(site.evidence_json)?.evidence_summary || null
 292          : null,
 293        competitor_benchmark: await getCompetitorBenchmark(site),
 294        industry_benchmark: await getIndustryBenchmark(site),
 295        aggregate_stats: aggregateStats,
 296        roi_estimate: await getROIEstimate(site),
 297      };
 298    }));
 299  }
 300  
 301  function extractWeaknesses(scoreJson) {
 302    if (!scoreJson?.factor_scores) return [];
 303    return Object.entries(scoreJson.factor_scores)
 304      .filter(([, f]) => f.score < 5)
 305      .map(([name, f]) => ({
 306        factor: name.replace(/_/g, ' '),
 307        score: f.score,
 308        reasoning: f.reasoning,
 309      }))
 310      .slice(0, 5);
 311  }
 312  
 313  async function fetchProofread() {
 314    // Pending outbound outreach messages awaiting QA approval
 315    const messages = await getAll(
 316      `SELECT m.id, m.site_id, m.message_body, m.subject_line, m.contact_method,
 317              m.contact_uri, s.domain, s.score, s.grade, s.country_code, s.landing_page_url
 318       FROM messages m
 319       JOIN sites s ON s.id = m.site_id
 320       WHERE m.direction = 'outbound'
 321         AND (m.message_type = 'outreach' OR m.message_type LIKE 'followup%')
 322         AND m.approval_status = 'pending'
 323         AND m.sent_at IS NULL
 324         AND m.delivery_status IS NULL
 325       ORDER BY m.contact_method, m.created_at ASC
 326       LIMIT $1`,
 327      [limit]
 328    );
 329  
 330    return messages.map(msg => ({
 331      message_id: msg.id,
 332      site_id: msg.site_id,
 333      domain: msg.domain,
 334      landing_page_url: msg.landing_page_url,
 335      score: msg.score,
 336      grade: msg.grade || computeGrade(msg.score),
 337      country_code: msg.country_code,
 338      contact_method: msg.contact_method,
 339      contact_uri: msg.contact_uri,
 340      message_body: msg.message_body,
 341      subject_line: msg.subject_line || null,
 342    }));
 343  }
 344  
 345  async function fetchClassifyReplies() {
 346    const replies = await getAll(
 347      `SELECT m.id, m.site_id, m.message_body, m.contact_method, m.contact_uri,
 348              s.domain
 349       FROM messages m
 350       JOIN sites s ON s.id = m.site_id
 351       WHERE m.direction = 'inbound'
 352         AND m.intent IS NULL
 353         AND m.message_body IS NOT NULL
 354       ORDER BY m.created_at ASC
 355       LIMIT $1`,
 356      [limit]
 357    );
 358  
 359    return replies.map(r => ({
 360      message_id: r.id,
 361      site_id: r.site_id,
 362      domain: r.domain,
 363      channel: r.contact_method,
 364      message: r.message_body,
 365    }));
 366  }
 367  
 368  async function fetchExtractNames() {
 369    // contacts_json column holds sentinel — real data is on filesystem.
 370    // Find sites with contacts_json IS NOT NULL (sentinel or real data), then filter in JS.
 371    const candidateLimit = limit * 3;
 372    const sites = await getAll(
 373      `SELECT s.id, s.domain, s.landing_page_url
 374       FROM sites s
 375       ORDER BY s.id DESC
 376       LIMIT $1`,
 377      [candidateLimit]
 378    );
 379  
 380    const results = [];
 381    for (const s of sites) {
 382      if (results.length >= limit) break;
 383      const contactsJson = getContactsDataWithFallback(s.id, s) || {};
 384      const namelessContacts = (contactsJson.contacts || []).filter(c => !c.name);
 385      if (namelessContacts.length === 0) continue;
 386      results.push({
 387        site_id: s.id,
 388        domain: s.domain,
 389        landing_page_url: s.landing_page_url,
 390        contacts: namelessContacts.map(c => ({
 391          type: c.type,
 392          uri: c.uri || c.value,
 393        })),
 394      });
 395    }
 396    return results;
 397  }
 398  
 399  
 400  async function fetchReplyResponses() {
 401    // Inbound messages that have been classified but have no outbound reply yet
 402    const inbounds = await getAll(
 403      `SELECT m.id, m.site_id, m.message_body, m.contact_method, m.contact_uri,
 404              m.intent, m.sentiment,
 405              s.domain, s.score, s.grade, s.country_code
 406       FROM messages m
 407       JOIN sites s ON s.id = m.site_id
 408       WHERE m.direction = 'inbound'
 409         AND m.intent IS NOT NULL
 410         AND m.intent NOT IN ('opt-out', 'autoresponder')
 411         AND NOT EXISTS (
 412           SELECT 1 FROM messages m2
 413           WHERE m2.site_id = m.site_id
 414             AND m2.direction = 'outbound'
 415             AND m2.message_type IN ('reply', 'reply_skipped')
 416             AND m2.created_at > m.created_at
 417         )
 418       ORDER BY m.created_at ASC
 419       LIMIT $1`,
 420      [limit]
 421    );
 422  
 423    return Promise.all(inbounds.map(async msg => {
 424      const scoreJson = getScoreDataWithFallback(msg.site_id, msg) || {};
 425      const contactsJson = getContactsDataWithFallback(msg.site_id, msg) || {};
 426  
 427      // Get full conversation history for this site
 428      const history = await getAll(
 429        `SELECT direction, message_body, contact_method, sent_at, created_at, intent
 430         FROM messages
 431         WHERE site_id = $1
 432         ORDER BY created_at ASC`,
 433        [msg.site_id]
 434      );
 435  
 436      // Get country-specific pricing from DB
 437      const countryRow = await getOne(
 438        'SELECT price_usd, currency_symbol, currency_code FROM countries WHERE country_code = $1',
 439        [msg.country_code]
 440      );
 441      const pricing = countryRow
 442        ? `${countryRow.currency_symbol}${Math.round(countryRow.price_usd / 100)} ${countryRow.currency_code}`
 443        : null;
 444  
 445      return {
 446        inbound_message_id: msg.id,
 447        site_id: msg.site_id,
 448        domain: msg.domain,
 449        score: msg.score,
 450        grade: msg.grade,
 451        country_code: msg.country_code,
 452        business_name: contactsJson.business_name || msg.domain,
 453        intent: msg.intent,
 454        sentiment: msg.sentiment,
 455        inbound_message: wrapUntrusted(
 456          stripInjectionMarkers(msg.message_body || ''),
 457          'prospect_reply'
 458        ),
 459        channel: msg.contact_method,
 460        contact_uri: msg.contact_uri,
 461        pricing,
 462        weaknesses: extractWeaknesses(scoreJson),
 463        conversation_history: history.map(h => ({
 464          direction: h.direction,
 465          message: wrapUntrusted(stripInjectionMarkers(h.message_body || ''), 'conversation_history'),
 466          channel: h.contact_method,
 467          timestamp: h.sent_at || h.created_at,
 468          intent: h.intent || undefined,
 469        })),
 470      };
 471    }));
 472  }
 473  
 474  async function fetchOverseeData() {
 475    // Collect system state for overseer analysis — mirrors sonnet-overseer.js data collection
 476    const services = ['333method-pipeline', 'mmo-cron.timer', '333method-dashboard'];
 477    const serviceStatus = {};
 478    for (const svc of services) {
 479      try {
 480        serviceStatus[svc] = execSync(`systemctl --user is-active ${svc}`, {
 481          encoding: 'utf8',
 482          timeout: 8000,
 483        }).trim();
 484      } catch (err) {
 485        serviceStatus[svc] = (err.stdout || '').trim() || 'inactive';
 486      }
 487    }
 488  
 489    const today = new Date().toISOString().slice(0, 10);
 490    const logFile = join(process.cwd(), `logs/pipeline-${today}.log`);
 491    let recentErrors = [];
 492    if (existsSync(logFile)) {
 493      const cutoff = new Date(Date.now() - 30 * 60 * 1000);
 494      const lines = readFileSync(logFile, 'utf8').split('\n');
 495      const seen = new Map();
 496      for (const line of lines.slice(-400)) {
 497        if (!line.includes('[ERROR]') && !line.includes('[WARN]')) continue;
 498        const tsMatch = line.match(/^(\[[\d\-T:.+Z]+\])/);
 499        if (tsMatch) {
 500          try {
 501            if (new Date(tsMatch[1].slice(1, -1)) < cutoff) continue;
 502          } catch {
 503            /* keep */
 504          }
 505        }
 506        const trimmed = line.slice(0, 200);
 507        const key = trimmed.slice(0, 80);
 508        const count = (seen.get(key) || 0) + 1;
 509        seen.set(key, count);
 510        if (count === 1) recentErrors.push(trimmed);
 511        else if (count <= 3) recentErrors[recentErrors.length - 1] = `${trimmed} (×${count})`;
 512      }
 513      recentErrors = recentErrors.slice(-30);
 514    }
 515  
 516    const now = new Date();
 517    const twoHoursAgo = new Date(now - 2 * 60 * 60 * 1000).toISOString();
 518    const sixHoursAgo = new Date(now - 6 * 60 * 60 * 1000).toISOString();
 519    const fourHoursAgo = new Date(now - 4 * 60 * 60 * 1000).toISOString();
 520    const activeStages = [
 521      'found',
 522      'assets_captured',
 523      'prog_scored',
 524      'semantic_scored',
 525      'vision_scored',
 526      'enriched',
 527    ];
 528  
 529    const agentTasks = {
 530      blocked_sample: await getAll(
 531        `SELECT id, task_type, created_at, substr(context_json::text, 1, 120) as ctx
 532         FROM tel.agent_tasks WHERE status='blocked' ORDER BY created_at ASC LIMIT 20`
 533      ),
 534      stale_blocked_count: (await getOne(
 535        `SELECT COUNT(*) as n FROM tel.agent_tasks WHERE status='blocked' AND created_at < $1`,
 536        [twoHoursAgo]
 537      ))?.n ?? 0,
 538      stale_pending_count: (await getOne(
 539        `SELECT COUNT(*) as n FROM tel.agent_tasks WHERE status='pending' AND created_at < $1`,
 540        [sixHoursAgo]
 541      ))?.n ?? 0,
 542      stale_running_count: (await getOne(
 543        `SELECT COUNT(*) as n FROM tel.agent_tasks WHERE status='running' AND started_at < $1`,
 544        [twoHoursAgo]
 545      ))?.n ?? 0,
 546      recent_failed: await getAll(
 547        `SELECT task_type, COUNT(*) as n FROM tel.agent_tasks
 548         WHERE status='failed' AND completed_at > $1
 549         GROUP BY task_type ORDER BY n DESC LIMIT 10`,
 550        [twoHoursAgo]
 551      ),
 552      running_tasks: await getAll(
 553        `SELECT id, task_type, assigned_to, started_at,
 554         ROUND(EXTRACT(EPOCH FROM (NOW() - started_at)) / 60) as age_mins
 555         FROM tel.agent_tasks WHERE status='running' ORDER BY started_at ASC LIMIT 10`
 556      ),
 557    };
 558  
 559    const stagesPlaceholders = activeStages.map((_, i) => `$${i + 1}`).join(',');
 560    const siteHealth = {
 561      distribution: await getAll(
 562        `SELECT status, COUNT(*) as n FROM sites GROUP BY status ORDER BY n DESC LIMIT 10`
 563      ),
 564      stuck_by_stage: await getAll(
 565        `SELECT status, COUNT(*) as n FROM sites
 566         WHERE status IN (${stagesPlaceholders}) AND updated_at < $${activeStages.length + 1}
 567         GROUP BY status ORDER BY n DESC`,
 568        [...activeStages, fourHoursAgo]
 569      ),
 570      looping_candidates: await getAll(
 571        `SELECT status, COUNT(*) as n FROM sites
 572         WHERE recapture_count > 5 AND status IN (${stagesPlaceholders})
 573         GROUP BY status`,
 574        activeStages
 575      ),
 576      recently_reset_counts: await getAll(
 577        `SELECT status, COUNT(*) as n FROM sites
 578         WHERE error_message LIKE '%reset by sonnet overseer%'
 579         AND updated_at > NOW() - INTERVAL '24 hours'
 580         GROUP BY status`
 581      ),
 582      queue_depths: {
 583        found: (await getOne(`SELECT COUNT(*) as n FROM sites WHERE status='found'`))?.n ?? 0,
 584        semantic_scored: (await getOne(
 585          `SELECT COUNT(*) as n FROM sites WHERE status IN ('semantic_scored','vision_scored')`
 586        ))?.n ?? 0,
 587        enriched: (await getOne(
 588          `SELECT COUNT(*) as n FROM sites WHERE status IN ('enriched','enriched_regex','enriched_llm')`
 589        ))?.n ?? 0,
 590      },
 591    };
 592  
 593    const nowIso = now.toISOString();
 594    const cronHealth = {
 595      overdue_cron_jobs: await getAll(
 596        `SELECT task_key, last_run_at, interval_value, interval_unit,
 597              (last_run_at + (interval_value || ' ' || interval_unit)::interval) AS next_expected_at
 598         FROM ops.cron_jobs
 599         WHERE enabled = 1
 600           AND (last_run_at + (interval_value || ' ' || interval_unit)::interval + INTERVAL '30 seconds') < $1
 601         ORDER BY next_expected_at ASC LIMIT 10`,
 602        [nowIso]
 603      ),
 604      recent_health_checks: await getAll(
 605        `SELECT check_type, status, action_taken, created_at
 606         FROM tel.system_health ORDER BY created_at DESC LIMIT 10`
 607      ),
 608    };
 609  
 610    const backupDbRow = await getOne(
 611      `SELECT last_run_at,
 612         ROUND(EXTRACT(EPOCH FROM (NOW() - last_run_at)) / 3600, 1) AS hours_ago
 613       FROM ops.cron_jobs WHERE task_key='backupDatabase' LIMIT 1`
 614    );
 615  
 616    // Previous overseer log (last 30 lines)
 617    const overseerLog = join(process.cwd(), 'logs/pipeline-status.txt');
 618    let previousLog = '(no previous runs)';
 619    if (existsSync(overseerLog)) {
 620      try {
 621        const lines = readFileSync(overseerLog, 'utf8').split('\n').filter(Boolean);
 622        previousLog = lines.slice(-30).join('\n') || '(no previous entries)';
 623      } catch {
 624        /* ignore */
 625      }
 626    }
 627  
 628    const systemData = {
 629      timestamp: now.toISOString(),
 630      skip_stages: process.env.SKIP_STAGES || 'none',
 631      services: serviceStatus,
 632      recent_errors_last_30min: recentErrors,
 633      agent_tasks: agentTasks,
 634      sites: siteHealth,
 635      cron: cronHealth,
 636      backup_db_status: backupDbRow ?? null,
 637      previous_overseer_findings: previousLog,
 638    };
 639  
 640    // Return as single-item batch (count=1)
 641    return [{ system_data: systemData }];
 642  }
 643  
 644  async function fetchClassifyErrors() {
 645    // Collect unknown error messages for LLM pattern proposal (Phase 2 only)
 646    const MIN_OCCURRENCES = 1;
 647    const MAX_ERRORS = 50;
 648  
 649    const unknowns = [];
 650  
 651    const siteErrors = await getAll(
 652      `SELECT error_message, COUNT(*) AS cnt
 653       FROM sites
 654       WHERE status IN ('failing', 'ignored')
 655         AND error_message IS NOT NULL AND error_message != ''
 656       GROUP BY error_message
 657       HAVING COUNT(*) >= $1
 658       ORDER BY cnt DESC LIMIT $2`,
 659      [MIN_OCCURRENCES, MAX_ERRORS * 2]
 660    );
 661  
 662    for (const row of siteErrors) {
 663      const { group } = categorizeError(row.error_message, 'site');
 664      if (group === 'unknown') {
 665        unknowns.push({ error_message: row.error_message, count: row.cnt, context: 'site' });
 666      }
 667    }
 668  
 669    const outreachErrors = await getAll(
 670      `SELECT error_message, COUNT(*) AS cnt
 671       FROM messages
 672       WHERE direction = 'outbound'
 673         AND delivery_status IN ('failed', 'retry_later')
 674         AND error_message IS NOT NULL AND error_message != ''
 675       GROUP BY error_message
 676       HAVING COUNT(*) >= $1
 677       ORDER BY cnt DESC LIMIT $2`,
 678      [MIN_OCCURRENCES, MAX_ERRORS]
 679    );
 680  
 681    for (const row of outreachErrors) {
 682      const { group } = categorizeError(row.error_message, 'outreach');
 683      if (group === 'unknown') {
 684        unknowns.push({ error_message: row.error_message, count: row.cnt, context: 'outreach' });
 685      }
 686    }
 687  
 688    unknowns.sort((a, b) => b.count - a.count);
 689    const items = unknowns.slice(0, MAX_ERRORS);
 690  
 691    // Return as single-item batch with error list embedded
 692    if (items.length === 0) return [];
 693    return [{ unknown_errors: items }];
 694  }
 695  
 696  /**
 697   * Fetch sites that need HTML-only LLM scoring via the orchestrator.
 698   * Only runs when ENABLE_VISION=false and ENABLE_LLM_SCORING=true.
 699   * Sites remain at status='assets_captured' with score IS NULL until results are stored.
 700   */
 701  async function fetchScoreSites() {
 702    const sites = await getAll(
 703      `SELECT s.id, s.domain, s.landing_page_url, s.http_headers, s.keyword, s.html_dom
 704       FROM sites s
 705       WHERE s.status = 'assets_captured'
 706         AND s.html_dom IS NOT NULL
 707         AND s.score IS NULL
 708       ORDER BY CASE WHEN s.country_code IN ('AU','CA','GB','IE','IN','NZ','US','ZA') THEN 0 ELSE 1 END ASC
 709       LIMIT $1`,
 710      [limit]
 711    );
 712  
 713    return sites.map(site => {
 714      const htmlDom = readHtmlDom(site.id) || site.html_dom;
 715      return {
 716        site_id: site.id,
 717        domain: site.domain,
 718        url: site.landing_page_url,
 719        keyword: site.keyword,
 720        html: htmlDom ? htmlDom.substring(0, 50000) : '',
 721        http_headers: site.http_headers ? JSON.parse(site.http_headers) : null,
 722      };
 723    });
 724  }
 725  
 726  /**
 727   * Fetch sites that need LLM contact enrichment via the orchestrator.
 728   * Two cases:
 729   *   1. 'semantic_scored'/'vision_scored' — no browser pass yet; extract contacts from html_dom
 730   *   2. 'enriched_regex' — browser pass done; LLM pass over key_pages_html + html_dom → enriched_llm
 731   */
 732  async function fetchEnrichSites() {
 733    // Note: contacts_json column holds sentinel '{"_fs":true}' — real data is on filesystem.
 734    // The old check json_extract(contacts_json, '$.business_name') IS NULL cannot be used.
 735    // Gate on status + enriched_at instead (same intent: find unenriched sites).
 736    const sites = await getAll(
 737      `SELECT s.id, s.domain, s.landing_page_url, s.status, s.html_dom
 738       FROM sites s
 739       WHERE (
 740         (
 741           s.status IN ('semantic_scored', 'vision_scored')
 742           AND (s.enriched_at IS NULL OR s.error_message IS NOT NULL)
 743           AND s.html_dom IS NOT NULL
 744         )
 745         OR
 746         (
 747           s.status = 'enriched_regex'
 748         )
 749       )
 750       ORDER BY
 751         CASE s.status WHEN 'enriched_regex' THEN 0 ELSE 1 END ASC,
 752         CASE WHEN s.country_code IN ('AU','CA','GB','IE','IN','NZ','US','ZA') THEN 0 ELSE 1 END ASC
 753       LIMIT $1`,
 754      [limit]
 755    );
 756  
 757    return sites.map(site => {
 758      const contactsJson = getContactsDataWithFallback(site.id);
 759      const htmlDom = readHtmlDom(site.id) || site.html_dom;
 760      const keyPagesHtml = readKeyPagesHtml(site.id);
 761  
 762      // For enriched_regex: combine key_pages_html entries + html_dom as LLM context
 763      // HTML is truncated aggressively to keep per-batch context under 150kB (15 sites × 10kB each)
 764      let html;
 765      if (site.status === 'enriched_regex' && keyPagesHtml) {
 766        const parts = Object.entries(keyPagesHtml)
 767          .map(([url, h]) => `<!-- Page: ${url} -->\n${h}`)
 768          .join('\n\n');
 769        const landingHtml = htmlDom ? `<!-- Landing page -->\n${htmlDom.substring(0, 5000)}` : '';
 770        html = `${landingHtml}\n\n${parts}`.substring(0, 20000);
 771      } else {
 772        html = htmlDom ? htmlDom.substring(0, 15000) : '';
 773      }
 774  
 775      return {
 776        site_id: site.id,
 777        domain: site.domain,
 778        url: site.landing_page_url,
 779        current_contacts: contactsJson,
 780        html,
 781        enrichment_pass: site.status === 'enriched_regex' ? 'llm' : 'initial',
 782      };
 783    });
 784  }
 785  
 786  /**
 787   * Fetch sites that need semantic scoring (headline, value prop, USP).
 788   * These are sites that have been programmatically scored but lack LLM semantic assessment.
 789   * Extracts key text from html_dom for Haiku to evaluate.
 790   */
 791  async function fetchScoreSemantic() {
 792    const sites = await getAll(
 793      `SELECT s.id, s.domain, s.landing_page_url, s.keyword, s.html_dom
 794       FROM sites s
 795       WHERE s.status IN ('prog_scored', 'semantic_scored')
 796         AND s.score IS NOT NULL
 797         AND s.html_dom IS NOT NULL
 798       ORDER BY s.updated_at DESC
 799       LIMIT $1`,
 800      [limit]
 801    );
 802  
 803    return sites
 804      .map(site => {
 805        const extracted = extractScoringText(readHtmlDom(site.id) || site.html_dom);
 806        const scoreJson = getScoreDataWithFallback(site.id, site) || {};
 807  
 808        return {
 809          site_id: site.id,
 810          domain: site.domain,
 811          keyword: site.keyword,
 812          current_score: scoreJson.conversion_score || null,
 813          current_grade: scoreJson.letter_grade || null,
 814          programmatic_factors: scoreJson.factor_scores
 815            ? {
 816                headline_quality: scoreJson.factor_scores.headline_quality,
 817                value_proposition: scoreJson.factor_scores.value_proposition,
 818                unique_selling_proposition: scoreJson.factor_scores.unique_selling_proposition,
 819              }
 820            : null,
 821          text: extracted,
 822        };
 823      })
 824      .filter(item => item.text !== null);
 825  }
 826  
 827  /**
 828   * Monitor health batch — lightweight system health snapshot for SRE analysis.
 829   * Focuses on agent_tasks state, service status, and recent error rate.
 830   * Lighter than oversee: skips site distribution, competitor data, etc.
 831   */
 832  async function fetchMonitorHealth() {
 833    const now = new Date();
 834    const thirtyMinsAgo = new Date(now - 30 * 60 * 1000).toISOString();
 835    const twoHoursAgo = new Date(now - 2 * 60 * 60 * 1000).toISOString();
 836  
 837    const agentTaskSummary = {
 838      stuck_running: await getAll(
 839        `SELECT id, task_type, started_at,
 840         ROUND(EXTRACT(EPOCH FROM (NOW() - started_at)) / 60) as age_mins
 841         FROM tel.agent_tasks WHERE status = 'running' AND started_at < $1
 842         ORDER BY started_at ASC LIMIT 10`,
 843        [thirtyMinsAgo]
 844      ),
 845      blocked_count: (await getOne(
 846        `SELECT COUNT(*) as n FROM tel.agent_tasks WHERE status = 'blocked'`
 847      ))?.n ?? 0,
 848      failed_recent: await getAll(
 849        `SELECT task_type, COUNT(*) as n FROM tel.agent_tasks
 850         WHERE status = 'failed' AND completed_at > $1
 851         GROUP BY task_type ORDER BY n DESC LIMIT 10`,
 852        [twoHoursAgo]
 853      ),
 854      pending_by_type: await getAll(
 855        `SELECT task_type, COUNT(*) as n FROM tel.agent_tasks
 856         WHERE status = 'pending'
 857         GROUP BY task_type ORDER BY n DESC LIMIT 10`
 858      ),
 859      code_review_fix_pending: (await getOne(
 860        `SELECT COUNT(*) as n FROM tel.agent_tasks WHERE task_type = 'code_review_fix' AND status = 'pending'`
 861      ))?.n ?? 0,
 862    };
 863  
 864    // Recent errors from pipeline log
 865    const today = new Date().toISOString().slice(0, 10);
 866    const logFile = join(process.cwd(), `logs/pipeline-${today}.log`);
 867    let recentErrors = [];
 868    if (existsSync(logFile)) {
 869      const cutoff = new Date(Date.now() - 30 * 60 * 1000);
 870      const lines = readFileSync(logFile, 'utf8').split('\n');
 871      for (const line of lines.slice(-200)) {
 872        if (!line.includes('[ERROR]')) continue;
 873        const tsMatch = line.match(/^(\[[\d\-T:.+Z]+\])/);
 874        if (tsMatch) {
 875          try {
 876            if (new Date(tsMatch[1].slice(1, -1)) < cutoff) continue;
 877          } catch {
 878            /* keep */
 879          }
 880        }
 881        recentErrors.push(line.slice(0, 150));
 882      }
 883      recentErrors = recentErrors.slice(-20);
 884    }
 885  
 886    return [
 887      { agent_tasks: agentTaskSummary, recent_errors: recentErrors, checked_at: now.toISOString() },
 888    ];
 889  }
 890  
 891  /**
 892   * Triage errors batch — classifies recent pipeline failures for routing/prioritization.
 893   * Pulls failed agent_tasks and recent site/message errors not yet categorized.
 894   */
 895  async function fetchTriageErrors() {
 896    const twoHoursAgo = new Date(Date.now() - 2 * 60 * 60 * 1000).toISOString();
 897  
 898    const failedTasks = (await getAll(
 899      `SELECT id, task_type, error_message, context_json, completed_at
 900       FROM tel.agent_tasks
 901       WHERE status = 'failed'
 902         AND completed_at > $1
 903         AND error_message IS NOT NULL
 904       ORDER BY completed_at DESC LIMIT 30`,
 905      [twoHoursAgo]
 906    )).map(t => ({
 907      id: t.id,
 908      task_type: t.task_type,
 909      error_message: t.error_message,
 910      context_summary: t.context_json
 911        ? JSON.parse(t.context_json || '{}')?.file_path || t.task_type
 912        : t.task_type,
 913    }));
 914  
 915    if (failedTasks.length === 0) return [];
 916  
 917    return [{ failed_tasks: failedTasks, window: '2h' }];
 918  }
 919  
 920  /**
 921   * Check docs batch — returns recently changed source files for doc freshness check.
 922   * Reads last 10 commits' changed src/ files and returns their content alongside
 923   * relevant CLAUDE.md sections.
 924   */
 925  function fetchCheckDocs() {
 926    let changedFiles = [];
 927    try {
 928      const output = execSync('git diff --name-only HEAD~5 2>/dev/null', {
 929        encoding: 'utf8',
 930        timeout: 5000,
 931        cwd: process.cwd(),
 932      });
 933      changedFiles = output
 934        .split('\n')
 935        .filter(f => f.startsWith('src/') && f.endsWith('.js'))
 936        .slice(0, 5);
 937    } catch {
 938      /* no git or no changes */
 939    }
 940  
 941    if (changedFiles.length === 0) return [];
 942  
 943    const items = changedFiles
 944      .map(filePath => {
 945        const fullPath = join(process.cwd(), filePath);
 946        if (!existsSync(fullPath)) return null;
 947        const content = readFileSync(fullPath, 'utf8');
 948        // Return first 100 lines (enough for JSDoc + exports, not entire file)
 949        const preview = content.split('\n').slice(0, 100).join('\n');
 950        return { file_path: filePath, preview };
 951      })
 952      .filter(Boolean);
 953  
 954    if (items.length === 0) return [];
 955    return [{ changed_files: items }];
 956  }
 957  
 958  /**
 959   * Code Review batch — returns the next source file to review from the queue.
 960   *
 961   * Queue file: logs/code-review-queue.json
 962   *   { "files": ["src/payment/paypal.js", ...], "next_index": 0, "reviewed": 0 }
 963   *
 964   * Run `node scripts/claude-batch.js code_review` once queue is seeded.
 965   * Queue is seeded by: scripts/init-code-review-queue.sh (security-first order).
 966   * Skips files that already have pending/running fix tasks to avoid duplicate work.
 967   */
 968  async function fetchCodeReview() {
 969    const queuePath = join(process.cwd(), 'logs/code-review-queue.json');
 970    if (!existsSync(queuePath)) {
 971      return []; // Queue not yet seeded — nothing to do
 972    }
 973  
 974    const queue = JSON.parse(readFileSync(queuePath, 'utf8'));
 975    const { files, next_index: nextIndex } = queue;
 976  
 977    if (!Array.isArray(files) || nextIndex >= files.length) {
 978      return []; // Queue exhausted
 979    }
 980  
 981    // Skip files that already have a pending/running code_review_fix task
 982    let idx = nextIndex;
 983    let filePath = null;
 984    while (idx < files.length) {
 985      const candidate = files[idx];
 986      const existing = await getOne(
 987        `SELECT id FROM tel.agent_tasks
 988         WHERE task_type = 'code_review_fix'
 989           AND context_json->>'file_path' = $1
 990           AND status IN ('pending', 'running')
 991         LIMIT 1`,
 992        [candidate]
 993      );
 994      if (!existing) {
 995        filePath = candidate;
 996        break;
 997      }
 998      idx++;
 999    }
1000  
1001    if (!filePath) return []; // All remaining files have active fix tasks
1002  
1003    // Advance queue pointer
1004    queue.next_index = idx + 1;
1005    queue.reviewed = (queue.reviewed || 0) + 1;
1006    writeFileSync(queuePath, JSON.stringify(queue, null, 2));
1007  
1008    // Read file content
1009    const fullPath = join(process.cwd(), filePath);
1010    if (!existsSync(fullPath)) {
1011      return []; // File was deleted — skip
1012    }
1013  
1014    const content = readFileSync(fullPath, 'utf8');
1015    const lineCount = content.split('\n').length;
1016  
1017    return [
1018      {
1019        file_path: filePath,
1020        line_count: lineCount,
1021        content,
1022      },
1023    ];
1024  }
1025  
1026  /**
1027   * followup_generate — Fetch sites needing follow-up message generation.
1028   *
1029   * Finds sites where:
1030   *   - status = 'outreach_sent'
1031   *   - next_followup_at <= now (due for follow-up)
1032   *   - No inbound reply received
1033   *   - Not opted out
1034   *   - Max sequence_step < 5
1035   *   - Respects 3-day cooldown
1036   *
1037   * Returns site data + weaknesses + conversation history for LLM follow-up generation.
1038   */
1039  async function fetchFollowupGenerate() {
1040    const closedStatuses = ['not_interested', 'closed', 'unsubscribed', 'paid', 'report_delivered'];
1041    const closedPlaceholders = closedStatuses.map((_, i) => `$${i + 1}`).join(',');
1042  
1043    const sites = await getAll(
1044      `SELECT
1045        s.id, s.domain, s.landing_page_url, s.keyword, s.score, s.grade,
1046        s.country_code, s.city, s.state, s.is_running_ads,
1047        s.last_outreach_at, s.next_followup_at,
1048        (SELECT MAX(m.sequence_step) FROM messages m
1049         WHERE m.site_id = s.id AND m.direction = 'outbound'
1050           AND m.sequence_step IS NOT NULL) as max_step
1051      FROM sites s
1052      WHERE s.status = 'outreach_sent'
1053        AND s.next_followup_at IS NOT NULL
1054        AND s.next_followup_at <= NOW()
1055        AND NOT EXISTS (
1056          SELECT 1 FROM messages m
1057          WHERE m.site_id = s.id AND m.direction = 'inbound'
1058        )
1059        AND NOT EXISTS (
1060          SELECT 1 FROM opt_outs o
1061          JOIN messages m ON (
1062            (o.phone = m.contact_uri AND o.method = 'sms')
1063            OR (o.email = m.contact_uri AND o.method = 'email')
1064          )
1065          WHERE m.site_id = s.id AND m.direction = 'outbound'
1066        )
1067        AND (s.conversation_status IS NULL
1068             OR s.conversation_status NOT IN (${closedPlaceholders}))
1069        AND (SELECT COALESCE(MAX(m2.sequence_step), 1) FROM messages m2
1070             WHERE m2.site_id = s.id AND m2.direction = 'outbound'
1071               AND m2.sequence_step IS NOT NULL) < 8
1072        AND (s.last_outreach_at IS NULL
1073             OR s.last_outreach_at < NOW() - INTERVAL '3 days')
1074        -- Must have at least one successfully delivered email/sms to follow up on
1075        AND EXISTS (
1076          SELECT 1 FROM messages m3
1077          WHERE m3.site_id = s.id AND m3.direction = 'outbound'
1078            AND m3.delivery_status IN ('sent', 'delivered')
1079            AND m3.contact_method IN ('email', 'sms')
1080        )
1081      ORDER BY s.next_followup_at ASC
1082      LIMIT $${closedStatuses.length + 1}`,
1083      [...closedStatuses, limit]
1084    );
1085  
1086    return Promise.all(sites.map(async site => {
1087      const scoreJson = getScoreDataWithFallback(site.id, site) || {};
1088      const contactsJson = getContactsDataWithFallback(site.id, site) || {};
1089      const nextStep = (site.max_step || 1) + 1;
1090  
1091      // Get sent outreach channels for this site
1092      const channels = await getAll(
1093        `SELECT DISTINCT contact_method, contact_uri
1094         FROM messages
1095         WHERE site_id = $1 AND direction = 'outbound'
1096           AND delivery_status IN ('sent', 'delivered')
1097           AND contact_method IN ('email', 'sms')`,
1098        [site.id]
1099      );
1100  
1101      // Get previous outreach messages for context
1102      const previousMessages = await getAll(
1103        `SELECT message_body, subject_line, contact_method, sequence_step, sent_at
1104         FROM messages
1105         WHERE site_id = $1 AND direction = 'outbound'
1106           AND delivery_status IN ('sent', 'delivered')
1107         ORDER BY sequence_step ASC, sent_at ASC`,
1108        [site.id]
1109      );
1110  
1111      return {
1112        site_id: site.id,
1113        domain: site.domain,
1114        landing_page_url: site.landing_page_url,
1115        keyword: site.keyword,
1116        score: site.score,
1117        grade: site.grade || computeGrade(site.score),
1118        country_code: site.country_code,
1119        city: site.city || contactsJson.city || null,
1120        state: site.state || contactsJson.state || null,
1121        business_name: contactsJson.business_name || site.domain,
1122        industry: scoreJson.industry_classification || scoreJson?.overall_calculation?.industry_classification || null,
1123        next_step: nextStep,
1124        channels: channels.map(c => ({
1125          method: c.contact_method,
1126          uri: c.contact_uri,
1127        })),
1128        weaknesses: extractWeaknesses(scoreJson),
1129        previous_messages: previousMessages.map(m => ({
1130          body: m.message_body?.substring(0, 200),
1131          subject: m.subject_line,
1132          channel: m.contact_method,
1133          step: m.sequence_step || 1,
1134        })),
1135      };
1136    }));
1137  }
1138  
1139  /**
1140   * followup_check — Queue approved followup messages whose send window has arrived.
1141   *
1142   * followup1: approved, queued=NULL, followup1_sent_at IS NULL,
1143   *            last_outreach_at set, 24h elapsed, conversation not closed/replied.
1144   * followup2: approved, queued=NULL, followup2_sent_at IS NULL,
1145   *            followup1_sent_at set, 3 days elapsed, conversation not closed/replied.
1146   *
1147   * Sets delivery_status='queued' on matching messages so the outreach worker picks them up.
1148   * Returns a summary item (count=1) describing how many were queued.
1149   *
1150   * Note: Uses run() which writes through the shared pool.
1151   */
1152  async function fetchFollowupCheck() {
1153    let followup1Queued = 0;
1154    let followup2Queued = 0;
1155  
1156    const closedStatuses = ['replied', 'interested', 'closed', 'not_interested'];
1157    const closedPlaceholders = closedStatuses.map((_, i) => `$${i + 1}`).join(', ');
1158  
1159    // followup1: 24h after last_outreach_at, conversation still open
1160    const followup1Result = await run(
1161      `UPDATE messages
1162       SET delivery_status = 'queued',
1163           updated_at = NOW()
1164       WHERE message_type = 'followup1'
1165         AND approval_status = 'approved'
1166         AND delivery_status IS NULL
1167         AND sent_at IS NULL
1168         AND EXISTS (
1169           SELECT 1 FROM sites s
1170           WHERE s.id = messages.site_id
1171             AND s.followup1_sent_at IS NULL
1172             AND s.last_outreach_at IS NOT NULL
1173             AND s.last_outreach_at + INTERVAL '24 hours' <= NOW()
1174             AND (s.conversation_status IS NULL
1175                  OR s.conversation_status NOT IN (${closedPlaceholders}))
1176         )`,
1177      closedStatuses
1178    );
1179    followup1Queued = followup1Result.changes;
1180  
1181    // followup2: 3 days after followup1_sent_at, conversation still open
1182    const followup2Result = await run(
1183      `UPDATE messages
1184       SET delivery_status = 'queued',
1185           updated_at = NOW()
1186       WHERE message_type = 'followup2'
1187         AND approval_status = 'approved'
1188         AND delivery_status IS NULL
1189         AND sent_at IS NULL
1190         AND EXISTS (
1191           SELECT 1 FROM sites s
1192           WHERE s.id = messages.site_id
1193             AND s.followup2_sent_at IS NULL
1194             AND s.followup1_sent_at IS NOT NULL
1195             AND s.followup1_sent_at + INTERVAL '3 days' <= NOW()
1196             AND (s.conversation_status IS NULL
1197                  OR s.conversation_status NOT IN (${closedPlaceholders}))
1198         )`,
1199      closedStatuses
1200    );
1201    followup2Queued = followup2Result.changes;
1202  
1203    const total = followup1Queued + followup2Queued;
1204    if (total === 0) return [];
1205  
1206    return [
1207      {
1208        followup1_queued: followup1Queued,
1209        followup2_queued: followup2Queued,
1210        total_queued: total,
1211      },
1212    ];
1213  }
1214  
1215  /**
1216   * Fetch sites ready for evidence merging.
1217   *
1218   * Picks sites where both evidence passes have been collected (evidence_pass1_json and
1219   * evidence_pass2_json are non-null) but the merged evidence_json hasn't been written yet.
1220   *
1221   * Returns one site at a time to keep Haiku context small.
1222   */
1223  async function fetchEvidenceMerge() {
1224    const site = await getOne(
1225      `SELECT id, domain, evidence_pass1_json, evidence_pass2_json
1226       FROM sites
1227       WHERE evidence_json IS NULL
1228         AND evidence_pass1_json IS NOT NULL
1229         AND evidence_pass2_json IS NOT NULL
1230         AND status IN ('prog_scored','semantic_scored','vision_scored','enriched','proposals_drafted','outreach_partial','outreach_sent')
1231       ORDER BY score ASC
1232       LIMIT 1`
1233    );
1234  
1235    if (!site) return [];
1236  
1237    return [
1238      {
1239        site_id: site.id,
1240        domain: site.domain,
1241        score_json: getScoreJsonWithFallback(site.id),
1242        evidence_pass1: site.evidence_pass1_json || '',
1243        evidence_pass2: site.evidence_pass2_json || '',
1244      },
1245    ];
1246  }
1247  
1248  const handlers = {
1249    proposals_email: () => fetchProposalsBatch('email'),
1250    proposals_sms: () => fetchProposalsBatch('sms'),
1251    proofread: fetchProofread,
1252    classify_replies: fetchClassifyReplies,
1253    extract_names: fetchExtractNames,
1254    reply_responses: fetchReplyResponses,
1255    oversee: fetchOverseeData,
1256    classify_errors: fetchClassifyErrors,
1257    score_semantic: fetchScoreSemantic,
1258    score_sites: fetchScoreSites,
1259    enrich_sites: fetchEnrichSites,
1260    code_review: fetchCodeReview,
1261    monitor_health: fetchMonitorHealth,
1262    triage_errors: fetchTriageErrors,
1263    check_docs: fetchCheckDocs,
1264    evidence_merge: fetchEvidenceMerge,
1265    followup_check: fetchFollowupCheck,
1266    followup_generate: fetchFollowupGenerate,
1267  };
1268  
1269  const handler = handlers[batchType];
1270  if (!handler) {
1271    console.error(`Unknown batch type: ${batchType}`);
1272    console.error(`Valid types: ${Object.keys(handlers).join(', ')}`);
1273    process.exit(1);
1274  }
1275  
1276  const result = {
1277    batch_type: batchType,
1278    timestamp: new Date().toISOString(),
1279    count: 0,
1280    items: [],
1281  };
1282  
1283  try {
1284    result.items = await handler();
1285    result.count = result.items.length;
1286    console.log(JSON.stringify(result, null, 2));
1287  } finally {
1288    await closePool();
1289  }