/ src / stages / scoring.js
scoring.js
  1  /**
  2   * Scoring Stage
  3   * Initial AI-powered conversion scoring for sites
  4   */
  5  
  6  import { run, getOne, getAll, query, withTransaction } from '../utils/db.js';
  7  import { scoreWebsite } from '../score.js';
  8  import { normaliseCountryCode } from '../config/countries.js';
  9  import Logger from '../utils/logger.js';
 10  import { generateStageCompletion, displayProgress } from '../utils/summary-generator.js';
 11  import { processBatch } from '../utils/error-handler.js';
 12  import { loadScreenshot } from '../utils/screenshot-storage.js';
 13  import { checkBlocklist, classifyIndustry } from '../utils/site-filters.js';
 14  import { incrementLowScoring } from '../utils/keyword-counters.js';
 15  import { recordFailure, resetRetries } from '../utils/retry-handler.js';
 16  import { scoreWebsiteProgrammatically } from '../utils/programmatic-scorer.js';
 17  import { readHtmlDom } from '../utils/html-storage.js';
 18  import { setScoreJson } from '../utils/score-storage.js';
 19  import { setContactsJson } from '../utils/contacts-storage.js';
 20  
 21  const logger = new Logger('Scoring');
 22  const success = (...args) => logger.success(...args);
 23  const info = (...args) => logger.info(...args);
 24  const error = (...args) => logger.error(...args);
 25  
 26  const ENGLISH_ONLY_MARKETS = process.env.ENGLISH_ONLY_MARKETS
 27    ? process.env.ENGLISH_ONLY_MARKETS.split(',').map(c => c.trim().toUpperCase()).filter(Boolean)
 28    : null;
 29  
 30  /**
 31   * Run the scoring stage
 32   * @param {Object} options - Stage options
 33   * @param {number} options.limit - Limit number of sites to score
 34   * @param {number} options.concurrency - Number of concurrent scoring operations (default: 10)
 35   * @returns {Promise<Object>} Stage results
 36   */
 37  export async function runScoringStage(options = {}) {
 38    const startTime = Date.now();
 39    const concurrency = options.concurrency || parseInt(process.env.SCORING_CONCURRENCY || '10', 10);
 40  
 41    info('Starting Scoring Stage...');
 42  
 43    // Check ENABLE_VISION flag
 44    const ENABLE_VISION = process.env.ENABLE_VISION !== 'false';
 45  
 46    // Show deprecation warning if old flags are used
 47    const legacyFlags = [
 48      process.env.USE_COMPUTER_VISION_SCORING,
 49      process.env.USE_COMPUTER_VISION_RESCORING,
 50      process.env.USE_COMPUTER_VISION_ENRICHMENT,
 51    ];
 52    if (legacyFlags.some(flag => flag !== undefined)) {
 53      logger.warn(
 54        '[scoring] WARN: Vision flags (USE_COMPUTER_VISION_*) are deprecated. Use ENABLE_VISION instead.'
 55      );
 56    }
 57  
 58    const modeLabel = ENABLE_VISION ? 'vision enabled' : 'HTML-only (vision disabled)';
 59    info(`[scoring] Mode: ${modeLabel}`);
 60  
 61    // Orchestrator scoring mode: when ENABLE_VISION=false and ENABLE_LLM_SCORING=true,
 62    // the orchestrator's score_sites batch handles LLM scoring via Claude Max (claude -p)
 63    // at zero API cost. The pipeline stage is bypassed — orchestrator advances sites to 'scored'.
 64    const ENABLE_LLM_SCORING = process.env.ENABLE_LLM_SCORING !== 'false';
 65    if (ENABLE_LLM_SCORING && !ENABLE_VISION) {
 66      info(
 67        '[scoring] ORCHESTRATOR MODE — score_sites batch will handle LLM scoring via Claude Max. Pipeline stage skipped.'
 68      );
 69      return {
 70        processed: 0,
 71        succeeded: 0,
 72        failed: 0,
 73        skipped: 0,
 74        duration: Date.now() - startTime,
 75        mode: 'orchestrator',
 76      };
 77    }
 78  
 79    try {
 80      // Get sites that need initial scoring.
 81      // score IS NULL is the precise condition — scoring always sets score before
 82      // advancing status to 'prog_scored', so assets_captured + score IS NULL = not yet scored
 83      // (also covers retry cases: scoring errors keep status='assets_captured', score=NULL).
 84      // Avoids retrying asset-capture errors (those leave sites at 'found', not 'assets_captured').
 85      // Only check IS NOT NULL — can satisfy this without loading the html_dom value,
 86      // which avoids blocking on reading data from 3000+ rows.
 87      // The scoring function itself fetches html_dom per-site and skips empty/placeholder values.
 88      // Also respect recapture_at: broken-site retries are delayed 7 days, don't re-queue early.
 89      const englishParams = ENGLISH_ONLY_MARKETS || [];
 90      const englishFilter = ENGLISH_ONLY_MARKETS
 91        ? `AND country_code = ANY($${1}::text[])`
 92        : '';
 93  
 94      const sitesQuery = `
 95        SELECT id, domain, landing_page_url as url, country_code
 96        FROM sites
 97        WHERE status = 'assets_captured'
 98          AND html_dom IS NOT NULL
 99          AND score IS NULL
100          AND (recapture_at IS NULL OR recapture_at <= CURRENT_TIMESTAMP)
101          ${englishFilter}
102        ORDER BY CASE WHEN country_code IN ('AU','CA','GB','IE','IN','NZ','US','ZA') THEN 0 ELSE 1 END ASC
103        ${options.limit ? `LIMIT ${options.limit}` : ''}
104      `;
105  
106      const sitesParams = ENGLISH_ONLY_MARKETS ? [ENGLISH_ONLY_MARKETS] : [];
107      const sites = await getAll(sitesQuery, sitesParams.length ? sitesParams : undefined);
108  
109      // Auto-promote assets_captured sites that already have scores → semantic_scored.
110      // Happens when assets re-captures html_dom for a previously-scored site (e.g. re-enrichment
111      // pass). Scoring skips them (score IS NOT NULL), so we promote here to unblock enrich.
112      const { changes: alreadyScoredPromoted } = await run(
113        `UPDATE sites
114         SET status = 'semantic_scored',
115             rescored_at = NOW(),
116             updated_at = NOW()
117         WHERE status = 'assets_captured'
118           AND score IS NOT NULL
119           AND html_dom IS NOT NULL`
120      );
121      if (alreadyScoredPromoted > 0) {
122        info(
123          `Auto-promoted ${alreadyScoredPromoted} already-scored assets_captured → semantic_scored (re-enrichment pass)`
124        );
125      }
126  
127      if (sites.length === 0) {
128        info('No sites need initial scoring');
129        return {
130          processed: alreadyScoredPromoted,
131          succeeded: alreadyScoredPromoted,
132          failed: 0,
133          skipped: 0,
134          duration: Date.now() - startTime,
135        };
136      }
137  
138      // Filter out blocklisted sites (directories/social media/franchises)
139      let ignoredCount = 0;
140      for (const site of sites) {
141        const blocked = checkBlocklist(site.domain, site.country_code);
142        if (blocked) {
143          await run(
144            `UPDATE sites SET status = 'ignored', error_message = $1 WHERE id = $2`,
145            [blocked.reason, site.id]
146          );
147          ignoredCount++;
148        }
149      }
150  
151      if (ignoredCount > 0) {
152        info(`Marked ${ignoredCount} sites as ignored (directories/social media/franchises)`);
153      }
154  
155      // Filter out legal/regulated industry sites by domain keyword (pre-LLM, zero cost)
156      let industryIgnoredCount = 0;
157      for (const site of sites) {
158        const industry = classifyIndustry(site.domain);
159        if (industry) {
160          const reason =
161            industry.type === 'legal'
162              ? `Ignored: Legal site detected from domain (${industry.reason})`
163              : `Ignored: Regulated industry (${industry.type}) detected from domain (${industry.reason})`;
164          await run(
165            `UPDATE sites SET status = 'ignored', error_message = $1 WHERE id = $2`,
166            [reason, site.id]
167          );
168          industryIgnoredCount++;
169        }
170      }
171  
172      if (industryIgnoredCount > 0) {
173        info(
174          `Marked ${industryIgnoredCount} sites as ignored (legal/regulated industry by domain keyword)`
175        );
176      }
177  
178      info(`Scoring ${sites.length} sites (concurrency: ${concurrency})`);
179  
180      const stats = {
181        processed: 0,
182        succeeded: 0,
183        failed: 0,
184        skipped: 0,
185        gradeDistribution: {},
186      };
187  
188      // Process sites in batches
189      const { results, errors } = await processBatch(
190        sites,
191        (site, index) => {
192          displayProgress(index + 1, sites.length, `Scoring ${site.url}`);
193          return scoreSite(site.id, site.url);
194        },
195        { concurrency }
196      );
197  
198      // Count successes and failures
199      stats.processed = sites.length;
200      stats.succeeded = results.filter(r => r !== null).length;
201      stats.failed = errors.length;
202  
203      // Calculate grade distribution
204      const grades = await getAll(
205        `SELECT grade, COUNT(*) as count
206         FROM sites
207         WHERE grade IS NOT NULL
208         GROUP BY grade`
209      );
210  
211      for (const g of grades) {
212        stats.gradeDistribution[g.grade] = g.count;
213      }
214  
215      // Log errors
216      for (const err of errors) {
217        error(`  Scoring error: ${err.message || err}`);
218      }
219  
220      stats.duration = Date.now() - startTime;
221      generateStageCompletion('Scoring', stats);
222  
223      return stats;
224    } catch (err) {
225      error(`Scoring stage failed: ${err.message}`);
226      throw err;
227    }
228  }
229  
230  /**
231   * Score a single site
232   * @param {number} siteId - Site ID
233   * @param {string} url - Site URL
234   * @returns {Promise<Object>} Scoring result
235   */
236  async function scoreSite(siteId, url) {
237    try {
238      // Get site data
239      const site = await getOne(
240        `SELECT
241          id, landing_page_url as url, screenshot_path, ssl_status, http_headers, locale_data, perf_json
242         FROM sites
243         WHERE id = $1`,
244        [siteId]
245      );
246  
247      // Load HTML from filesystem (moved out of DB to reduce bloat)
248      const htmlDom = readHtmlDom(siteId);
249  
250      // Load screenshots from file system (if available)
251      // When ENABLE_SCREENSHOT_CAPTURE=false, screenshot_path will be null and scoring uses HTML only
252      let desktopAbove = null;
253      let mobileAbove = null;
254  
255      if (site.screenshot_path) {
256        desktopAbove = await loadScreenshot(site.screenshot_path, 'desktop_above');
257        mobileAbove = await loadScreenshot(site.screenshot_path, 'mobile_above');
258      }
259  
260      // Prepare site data for scoring (match scoreWebsite expectations)
261      // Screenshots are optional - scoring works with HTML only when USE_COMPUTER_VISION_SCORING=false
262      const siteData = {
263        url: site.url,
264        domain: new URL(site.url).hostname,
265        screenshots: {
266          desktop_above: desktopAbove,
267          mobile_above: mobileAbove,
268        },
269        html: htmlDom || '',
270        httpHeaders: site.http_headers,
271        localeData: site.locale_data, // Locale indicators for country detection
272      };
273  
274      // Score the site — LLM or programmatic
275      const ENABLE_LLM_SCORING = process.env.ENABLE_LLM_SCORING !== 'false';
276      let result;
277  
278      if (ENABLE_LLM_SCORING) {
279        result = await scoreWebsite(siteData, siteId);
280      } else {
281        // Programmatic scoring — no LLM call
282        // Semantic factors (headline, value prop, USP) will be scored by Haiku via
283        // the orchestrator's score_semantic batch type in a follow-up pass.
284        const keywordRow = await getOne('SELECT keyword FROM sites WHERE id = $1', [siteId]);
285        const keyword = keywordRow?.keyword;
286        let perfData = null;
287        try {
288          perfData = site.perf_json ? JSON.parse(site.perf_json) : null;
289        } catch (e) {
290          logger.warn(`Failed to parse perf_json for site ${siteId}: ${e.message}`);
291        }
292        const progResult = scoreWebsiteProgrammatically(htmlDom, site.url, keyword, perfData);
293  
294        // Wrap in the same structure the rest of scoreSite expects
295        result = {
296          scoring_method: 'programmatic',
297          overall_calculation: {
298            conversion_score: progResult.conversion_score,
299            letter_grade: progResult.letter_grade,
300            is_error_page: progResult.is_error_page,
301            is_broken_site: progResult.is_broken_site,
302            is_business_directory: progResult.is_business_directory,
303            is_local_business: progResult.is_local_business,
304            is_law_firm: progResult.is_law_firm,
305            industry_classification: progResult.industry_classification,
306            country_code: progResult.country_code,
307            city: progResult.city,
308            state: progResult.state,
309          },
310          factor_scores: progResult.factor_scores,
311          contact_details: progResult.contacts
312            ? {
313                contacts: [
314                  ...progResult.contacts.email_addresses.map(e => ({
315                    type: 'email',
316                    uri: e.email,
317                    source: e.source,
318                  })),
319                  ...progResult.contacts.phone_numbers.map(p => ({
320                    type: 'sms',
321                    uri: p.number,
322                    source: p.source,
323                  })),
324                  ...progResult.contacts.social_profiles
325                    .filter(s => s.usable)
326                    .map(s => ({ type: s.label, uri: s.url, source: 'social' })),
327                ],
328                has_contact_form: progResult.contacts.has_contact_form,
329              }
330            : null,
331        };
332      }
333  
334      // Extract grade and score from nested structure
335      const grade = result?.overall_calculation?.letter_grade || null;
336      const score = result?.overall_calculation?.conversion_score || null;
337  
338      // Check if LLM detected a business directory
339      // Skip in test mode (E2E test page is not a real local business)
340      const isDirectory = result?.overall_calculation?.is_business_directory;
341  
342      if (isDirectory && process.env.NODE_ENV !== 'test') {
343        // Mark as ignored if LLM detected directory
344        setScoreJson(siteId, result);
345        await run(
346          `UPDATE sites SET status = 'ignored', error_message = $1 WHERE id = $2`,
347          ['Ignored: Business directory (LLM detected)', siteId]
348        );
349  
350        info(`  ${url} detected as directory by LLM - marked as ignored`);
351  
352        return result;
353      }
354  
355      // Check if LLM detected a non-local business
356      // Skip in test mode (E2E test page is not a real local business)
357      const isLocalBusiness = result?.overall_calculation?.is_local_business;
358  
359      if (isLocalBusiness === false && process.env.NODE_ENV !== 'test') {
360        // Mark as ignored if LLM detected non-local business
361        setScoreJson(siteId, result);
362        await run(
363          `UPDATE sites SET status = 'ignored', error_message = $1 WHERE id = $2`,
364          ['Ignored: Not a local business (LLM detected)', siteId]
365        );
366  
367        info(`  ${url} detected as non-local business by LLM - marked as ignored`);
368  
369        return result;
370      }
371  
372      // Check if LLM detected a law firm
373      const isLawFirm = result?.overall_calculation?.is_law_firm;
374      if (isLawFirm && process.env.NODE_ENV !== 'test') {
375        setScoreJson(siteId, result);
376        await run(
377          `UPDATE sites SET status = 'ignored', error_message = $1 WHERE id = $2`,
378          ['Ignored: Law firm (LLM detected)', siteId]
379        );
380        info(`  ${url} detected as law firm by LLM - marked as ignored`);
381        return result;
382      }
383  
384      // Check if LLM detected a regulated industry (healthcare/financial)
385      const industryClassification =
386        result?.overall_calculation?.industry_classification?.toLowerCase() || '';
387      const regulatedPatterns = [
388        'dental',
389        'medical',
390        'clinic',
391        'hospital',
392        'pharmacy',
393        'veterinary',
394        'financial advis',
395        'mortgage broker',
396        'accounting',
397      ];
398      const isRegulated = regulatedPatterns.some(p => industryClassification.includes(p));
399      if (isRegulated && process.env.NODE_ENV !== 'test') {
400        setScoreJson(siteId, result);
401        await run(
402          `UPDATE sites SET status = 'ignored', error_message = $1 WHERE id = $2`,
403          [`Ignored: Regulated industry (${industryClassification})`, siteId]
404        );
405        info(
406          `  ${url} detected as regulated industry (${industryClassification}) - marked as ignored`
407        );
408        return result;
409      }
410  
411      // Check if LLM detected an error page
412      const isErrorPage = result?.overall_calculation?.is_error_page;
413      const errorType = result?.overall_calculation?.error_type;
414      const errorDescription = result?.overall_calculation?.error_description;
415  
416      if (isErrorPage && errorType) {
417        // Permanent errors (404, 403, 410) - mark as ignored
418        const permanentErrors = ['404', '403', '410'];
419  
420        if (permanentErrors.includes(errorType)) {
421          const errorMsg = errorDescription || `Permanent error: ${errorType}`;
422          setScoreJson(siteId, result);
423          await run(
424            `UPDATE sites SET status = 'ignored', error_message = $1 WHERE id = $2`,
425            [errorMsg, siteId]
426          );
427  
428          info(`  ${url} detected as ${errorType} error - marked as ignored`);
429  
430          return result;
431        }
432  
433        // Temporary errors (5xx, maintenance, redirect) - keep status unchanged for retry
434        const temporaryErrors = ['5xx', 'maintenance', 'redirect'];
435  
436        if (temporaryErrors.includes(errorType)) {
437          const errorMsg = errorDescription || `Temporary error: ${errorType}`;
438          setScoreJson(siteId, result);
439          await run(
440            `UPDATE sites SET status = 'assets_captured', error_message = $1 WHERE id = $2`,
441            [errorMsg, siteId]
442          );
443  
444          info(`  ${url} detected as ${errorType} error - will retry later`);
445  
446          return result;
447        }
448      }
449  
450      // Check if LLM detected a broken site (visual rendering failure)
451      const isBrokenSite = result?.overall_calculation?.is_broken_site || false;
452      const brokenSiteDetails = result?.overall_calculation?.broken_site_details || [];
453  
454      if (isBrokenSite) {
455        // Get current recapture count
456        const siteRow = await getOne('SELECT recapture_count FROM sites WHERE id = $1', [siteId]);
457        const recaptureCount = (siteRow?.recapture_count || 0) + 1;
458  
459        // Check if max retries exceeded (3 attempts)
460        if (recaptureCount > 3) {
461          const errorMsg = `Max recapture attempts reached (3) - Issues: ${brokenSiteDetails.join(', ')}`;
462          setScoreJson(siteId, result);
463          await run(
464            `UPDATE sites SET status = 'ignored', error_message = $1 WHERE id = $2`,
465            [errorMsg, siteId]
466          );
467  
468          error(`  ${url} broken site detected but max retries exceeded - marked as ignored`);
469  
470          return result;
471        }
472  
473        // Requeue for recapture with 1-week delay
474        const errorMsg = `Broken site detected (attempt ${recaptureCount}/3) - Issues: ${brokenSiteDetails.join(', ')}`;
475        setScoreJson(siteId, result);
476        await run(
477          `UPDATE sites SET
478            status = 'assets_captured',
479            error_message = $1,
480            recapture_count = $2,
481            recapture_at = NOW() + INTERVAL '7 days'
482           WHERE id = $3`,
483          [errorMsg, recaptureCount, siteId]
484        );
485  
486        info(
487          `  ${url} broken site detected - recapture scheduled for 7 days (attempt ${recaptureCount}/3)`
488        );
489        info(`      Issues: ${brokenSiteDetails.join(', ')}`);
490  
491        return result;
492      }
493  
494      // Extract location information from overall_calculation
495      const city = result?.overall_calculation?.city || null;
496      const countryCode = normaliseCountryCode(result?.overall_calculation?.country_code) || null;
497      const state = result?.overall_calculation?.state || null;
498  
499      // Determine status based on score threshold
500      // High-scoring sites (> cutoff) are marked as 'high_score' and end their journey
501      // Low-scoring sites (<= cutoff) advance to prog_scored (vision on) or semantic_scored (vision off)
502      const scoreThreshold = parseInt(process.env.LOW_SCORE_CUTOFF || '82', 10);
503      // In HTML-only mode, skip straight to 'semantic_scored' - rescoring stage is a no-op without screenshots
504      const ENABLE_VISION_LOCAL = process.env.ENABLE_VISION !== 'false';
505      const newStatus =
506        score !== null && score > scoreThreshold
507          ? 'high_score'
508          : ENABLE_VISION_LOCAL
509            ? 'prog_scored'
510            : 'semantic_scored';
511  
512      // Get ENABLE_VISION flag to determine if we need to save contacts
513      const ENABLE_VISION = process.env.ENABLE_VISION !== 'false';
514  
515      // When vision disabled, scoring includes contact extraction (save to contacts_json)
516      // When vision enabled, contacts are extracted in rescoring stage
517      let contactsJson = null;
518      if (!ENABLE_VISION && result.contact_details) {
519        contactsJson = JSON.stringify(result.contact_details);
520        info(`  Saved contacts for ${url} (HTML-only mode)`);
521      }
522  
523      // Wrap all DB updates for this site in a transaction so status and keyword counters
524      // are always consistent — no partial writes on crash.
525      // Score/contacts are written to filesystem; DB no longer stores the blobs.
526      // When contacts are written, store sentinel in DB to preserve IS NOT NULL gating.
527      setScoreJson(siteId, result);
528      if (contactsJson) setContactsJson(siteId, contactsJson);
529  
530      await withTransaction(async (client) => {
531        await client.query(
532          `UPDATE sites SET
533            grade = $1,
534            score = $2,
535            city = $3,
536            country_code = $4,
537            state = $5,
538            status = $6,
539            scored_at = NOW(),
540            rescored_at = CASE WHEN status = 'semantic_scored' THEN NOW() ELSE rescored_at END,
541            error_message = NULL
542           WHERE id = $7`,
543          [grade, score, city, countryCode, state, newStatus, siteId]
544        );
545  
546        // Reset retry count on successful scoring
547        await resetRetries(siteId);
548  
549        // Increment keyword counter if low-scoring (<= 82)
550        if (score !== null && score <= scoreThreshold) {
551          const siteRow = await client.query(
552            'SELECT keyword, country_code FROM sites WHERE id = $1',
553            [siteId]
554          );
555          const siteData = siteRow.rows[0];
556          if (siteData?.keyword && siteData?.country_code) {
557            await incrementLowScoring(siteData.keyword, siteData.country_code);
558          }
559        }
560      });
561  
562      const statusLabel = newStatus === 'high_score' ? ' [HIGH SCORE - COMPLETE]' : '';
563      success(`  Scored ${url}: ${grade} (${score})${statusLabel}`);
564  
565      return result;
566    } catch (err) {
567      // Record failure and increment retry count (marks as 'failing' if limit exceeded)
568      await recordFailure(siteId, 'scoring', err, 'assets_captured');
569      throw err;
570    }
571  }
572  
573  /**
574   * Get scoring statistics
575   * @returns {Promise<Object>} Scoring statistics
576   */
577  export async function getScoringStats() {
578    const stats = await getOne(
579      `SELECT
580        COUNT(id) as total_sites,
581        COUNT(CASE WHEN score IS NOT NULL THEN 1 END) as scored_sites,
582        COUNT(CASE WHEN score < 83 THEN 1 END) as low_score_sites,
583        AVG(score) as avg_score,
584        MIN(score) as min_score,
585        MAX(score) as max_score
586       FROM sites`
587    );
588  
589    const gradeDistribution = await getAll(
590      `SELECT grade, COUNT(*) as count
591       FROM sites
592       WHERE grade IS NOT NULL
593       GROUP BY grade
594       ORDER BY
595         CASE grade
596           WHEN 'A+' THEN 1 WHEN 'A' THEN 2 WHEN 'A-' THEN 3
597           WHEN 'B+' THEN 4 WHEN 'B' THEN 5 WHEN 'B-' THEN 6
598           WHEN 'C' THEN 7 WHEN 'D' THEN 8 WHEN 'E' THEN 9 WHEN 'F' THEN 10
599         END`
600    );
601  
602    return {
603      ...stats,
604      gradeDistribution: gradeDistribution.reduce((acc, g) => {
605        acc[g.grade] = g.count;
606        return acc;
607      }, {}),
608    };
609  }