/ src / stages / gather-evidence.js
gather-evidence.js
  1  /**
  2   * Evidence Gathering Stage
  3   *
  4   * Performs two-pass evidence collection on enriched sites before proposal generation:
  5   *   Pass 1 (unbriefed): fetches live site, extracts HTML + inline CSS, calls Haiku to
  6   *            identify specific evidence for each CRO factor with CSS selectors.
  7   *   Pass 2 (briefed): same fetch, but provides score_json as context so the LLM
  8   *            confirms/challenges existing scores rather than free-ranging.
  9   *
 10   * Results stored in:
 11   *   sites.evidence_pass1_json — raw JSON from unbriefed pass
 12   *   sites.evidence_pass2_json — raw JSON from briefed pass
 13   *
 14   * After both passes are stored, the evidence_merge orchestrator batch picks up the site
 15   * and merges them into sites.evidence_json (revised scores + evidence_summary for proposals).
 16   *
 17   * Pipeline gate: proposals_email/proposals_sms query requires evidence_json IS NOT NULL.
 18   */
 19  
 20  import { run, getAll } from '../utils/db.js';
 21  import { readFileSync } from 'fs';
 22  import { join, dirname } from 'path';
 23  import { fileURLToPath } from 'url';
 24  import Logger from '../utils/logger.js';
 25  import { processBatch } from '../utils/error-handler.js';
 26  import { callLLM } from '../utils/llm-provider.js';
 27  import { openRouterLimiter } from '../utils/rate-limiter.js';
 28  import { openRouterBreaker } from '../utils/circuit-breaker.js';
 29  import { safeJsonParse } from '../utils/error-handler.js';
 30  import { getAdaptiveConcurrencyFast } from '../utils/adaptive-concurrency.js';
 31  import { sanitizeHtmlForPrompt } from '../utils/llm-sanitizer.js';
 32  import { getScoreJsonWithFallback } from '../utils/score-storage.js';
 33  import '../utils/load-env.js';
 34  
 35  const __filename = fileURLToPath(import.meta.url);
 36  const __dirname = dirname(__filename);
 37  const projectRoot = join(__dirname, '../..');
 38  
 39  const logger = new Logger('GatherEvidence');
 40  const EVIDENCE_PROMPT = readFileSync(join(projectRoot, 'prompts/EVIDENCE-COLLECT.md'), 'utf-8');
 41  
 42  const BATCH_SIZE = parseInt(process.env.EVIDENCE_BATCH_SIZE || '8', 10);
 43  const MODEL = process.env.EVIDENCE_MODEL || process.env.CLAUDE_HAIKU_MODEL || 'anthropic/claude-haiku-4-5';
 44  const HTML_TRUNCATE = 50_000; // chars fed to LLM
 45  const CSS_TRUNCATE = 15_000;
 46  
 47  /**
 48   * Extract inline CSS from raw HTML (style tags + first 200 chars of linked stylesheets src attrs)
 49   */
 50  function extractInlineCss(html) {
 51    const styleMatches = html.match(/<style[^>]*>([\s\S]*?)<\/style>/gi) || [];
 52    return styleMatches
 53      .map(block => block.replace(/<\/?style[^>]*>/gi, ''))
 54      .join('\n')
 55      .slice(0, CSS_TRUNCATE);
 56  }
 57  
 58  /**
 59   * Validate that a URL is a safe external HTTP/HTTPS target.
 60   * Guards against SSRF from poisoned DB entries.
 61   */
 62  function isSafeUrl(urlStr) {
 63    try {
 64      const u = new URL(urlStr);
 65      if (u.protocol !== 'http:' && u.protocol !== 'https:') return false;
 66      const h = u.hostname;
 67      // Block localhost, loopback, link-local, and private RFC1918 ranges
 68      if (h === 'localhost' || h === '127.0.0.1' || h === '::1') return false;
 69      if (/^169\.254\./.test(h)) return false;
 70      if (/^10\./.test(h) || /^192\.168\./.test(h)) return false;
 71      if (/^172\.(1[6-9]|2\d|3[01])\./.test(h)) return false;
 72      return true;
 73    } catch {
 74      return false;
 75    }
 76  }
 77  
 78  /**
 79   * Fetch page HTML using Node's built-in fetch with browser-like headers.
 80   * Returns null on failure rather than throwing.
 81   */
 82  async function fetchPageHtml(url) {
 83    if (!isSafeUrl(url)) {
 84      logger.warn(`Skipping unsafe URL: ${url}`);
 85      return null;
 86    }
 87    try {
 88      const res = await fetch(url, {
 89        headers: {
 90          'User-Agent':
 91            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36',
 92          Accept: 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
 93          'Accept-Language': 'en-US,en;q=0.9',
 94          'Accept-Encoding': 'gzip, deflate, br',
 95          'Cache-Control': 'no-cache',
 96        },
 97        signal: AbortSignal.timeout(15_000),
 98      });
 99      if (!res.ok) return null;
100      return await res.text();
101    } catch {
102      return null;
103    }
104  }
105  
106  /**
107   * Build the LLM prompt for one evidence pass.
108   */
109  function buildPrompt(domain, url, scoreJson, homepageHtml, cssContent, pass) {
110    const truncatedHtml = homepageHtml ? homepageHtml.slice(0, HTML_TRUNCATE) : '(unavailable)';
111    const truncatedCss = cssContent ? cssContent.slice(0, CSS_TRUNCATE) : '(unavailable)';
112  
113    const scoreContext =
114      pass === 'briefed' && scoreJson
115        ? JSON.stringify(safeJsonParse(scoreJson) || {}, null, 2).slice(0, 8000)
116        : '(not provided for this pass)';
117  
118    return EVIDENCE_PROMPT.replace('{{domain}}', sanitizeHtmlForPrompt(domain))
119      .replace('{{url}}', sanitizeHtmlForPrompt(url))
120      .replace('{{score_json}}', scoreContext)
121      .replace('{{homepage_html}}', sanitizeHtmlForPrompt(truncatedHtml))
122      .replace('{{css_content}}', sanitizeHtmlForPrompt(truncatedCss));
123  }
124  
125  /**
126   * Run one evidence pass (unbriefed or briefed) for a site.
127   * Returns the raw JSON result object, or null on failure.
128   */
129  async function runEvidencePass(site, pass) {
130    const homepageHtml = await fetchPageHtml(site.landing_page_url);
131  
132    if (!homepageHtml) {
133      logger.warn(`[${site.domain}] Could not fetch HTML for evidence pass (${pass})`);
134      return null;
135    }
136  
137    const cssContent = extractInlineCss(homepageHtml);
138    const prompt = buildPrompt(
139      site.domain,
140      site.landing_page_url,
141      pass === 'briefed' ? getScoreJsonWithFallback(site.id, site) : null,
142      homepageHtml,
143      cssContent,
144      pass
145    );
146  
147    const response = await openRouterLimiter.schedule(() =>
148      openRouterBreaker.fire(() =>
149        callLLM({
150          model: MODEL,
151          messages: [{ role: 'user', content: prompt }],
152          max_tokens: 4096,
153          temperature: 0,
154          stage: 'other',
155        })
156      )
157    );
158  
159    const result = safeJsonParse(response?.content);
160    if (!result) {
161      logger.warn(`[${site.domain}] Evidence pass (${pass}) returned non-JSON response`);
162      return null;
163    }
164  
165    return result;
166  }
167  
168  /**
169   * Process a single site through both evidence passes.
170   */
171  async function gatherEvidenceForSite(site) {
172    logger.info(`[${site.domain}] Starting evidence collection`);
173  
174    // Pass 1: unbriefed
175    let pass1 = null;
176    if (!site.evidence_pass1_json) {
177      pass1 = await runEvidencePass(site, 'unbriefed');
178      if (pass1) {
179        const { changes } = await run(
180          'UPDATE sites SET evidence_pass1_json = $1, updated_at = NOW() WHERE id = $2',
181          [JSON.stringify(pass1), site.id]
182        );
183        if (changes === 0) {
184          logger.error(`[${site.domain}] Pass 1 UPDATE affected 0 rows — site id=${site.id} may have been deleted`);
185          return;
186        }
187        logger.info(`[${site.domain}] Pass 1 (unbriefed) stored`);
188      }
189    } else {
190      logger.debug(`[${site.domain}] Pass 1 already exists — skipping`);
191      pass1 = safeJsonParse(site.evidence_pass1_json);
192    }
193  
194    // Pass 2: briefed (provides score_json context)
195    if (!site.evidence_pass2_json) {
196      const pass2 = await runEvidencePass(site, 'briefed');
197      if (pass2) {
198        const { changes } = await run(
199          'UPDATE sites SET evidence_pass2_json = $1, updated_at = NOW() WHERE id = $2',
200          [JSON.stringify(pass2), site.id]
201        );
202        if (changes === 0) {
203          logger.error(`[${site.domain}] Pass 2 UPDATE affected 0 rows — site id=${site.id} may have been deleted`);
204          return;
205        }
206        logger.info(`[${site.domain}] Pass 2 (briefed) stored`);
207      }
208    } else {
209      logger.debug(`[${site.domain}] Pass 2 already exists — skipping`);
210    }
211  
212    logger.info(`[${site.domain}] Evidence collection complete`);
213  }
214  
215  export async function gatherEvidence() {
216    try {
217      // Skip sites in blocked countries — no proposals will ever be sent to them
218      const blockedCountries = (process.env.OUTREACH_BLOCKED_COUNTRIES || '')
219        .split(',').map(s => s.trim().toUpperCase()).filter(Boolean);
220  
221      // Pick enriched sites missing either evidence pass
222      let sitesQuery;
223      let sitesParams;
224  
225      if (blockedCountries.length) {
226        sitesQuery = `
227          SELECT id, domain, landing_page_url,
228                 evidence_pass1_json, evidence_pass2_json
229          FROM sites
230          WHERE status IN ('enriched', 'enriched_llm')
231            AND score IS NOT NULL
232            AND score < 82
233            AND (evidence_pass1_json IS NULL OR evidence_pass2_json IS NULL)
234            AND UPPER(country_code) != ALL($1::text[])
235          ORDER BY score ASC
236          LIMIT $2
237        `;
238        sitesParams = [blockedCountries, BATCH_SIZE];
239      } else {
240        sitesQuery = `
241          SELECT id, domain, landing_page_url,
242                 evidence_pass1_json, evidence_pass2_json
243          FROM sites
244          WHERE status IN ('enriched', 'enriched_llm')
245            AND score IS NOT NULL
246            AND score < 82
247            AND (evidence_pass1_json IS NULL OR evidence_pass2_json IS NULL)
248          ORDER BY score ASC
249          LIMIT $1
250        `;
251        sitesParams = [BATCH_SIZE];
252      }
253  
254      const sites = await getAll(sitesQuery, sitesParams);
255  
256      if (sites.length === 0) {
257        logger.info('No sites pending evidence collection');
258        return;
259      }
260  
261      logger.info(`Gathering evidence for ${sites.length} sites`);
262  
263      const concurrency = getAdaptiveConcurrencyFast(2, parseInt(process.env.EVIDENCE_CONCURRENCY || '8', 10));
264  
265      await processBatch(sites, site => gatherEvidenceForSite(site), {
266        concurrency,
267        label: 'evidence',
268        logger,
269      });
270  
271      logger.success(`Evidence collection complete for ${sites.length} sites`);
272    } catch (err) {
273      logger.error('Evidence gathering failed', err);
274      throw err;
275    }
276  }
277  
278  // CLI entry point
279  if (process.argv[1] === fileURLToPath(import.meta.url)) {
280    gatherEvidence().catch(err => {
281      logger.error('Evidence gathering failed', err);
282      process.exit(1);
283    });
284  }