/ src / stages / proposals.js
proposals.js
  1  /**
  2   * Proposals Stage
  3   * Generate personalized proposals for low-scoring sites
  4   *
  5   * Supports two modes:
  6   * 1. LLM-based (USE_LLM_PROPOSALS=true): $0.18/site, requires approval workflow
  7   * 2. Template-based (USE_LLM_PROPOSALS=false): $0/site, pre-approved templates
  8   */
  9  
 10  import { run, getOne, getAll } from '../utils/db.js';
 11  import { readdirSync, existsSync } from 'fs';
 12  import { join, dirname } from 'path';
 13  import { fileURLToPath } from 'url';
 14  import {
 15    generateProposalVariants as generateLLMProposals,
 16    processReworkQueue as processReworkRequests,
 17  } from '../proposal-generator-v2.js';
 18  import { generateProposalVariants as generateTemplateProposals } from '../proposal-generator-templates.js';
 19  import Logger from '../utils/logger.js';
 20  import { generateStageCompletion, displayProgress } from '../utils/summary-generator.js';
 21  import { processBatch } from '../utils/error-handler.js';
 22  import { checkBlocklist } from '../utils/site-filters.js';
 23  import { recordFailure, resetRetries } from '../utils/retry-handler.js';
 24  
 25  const __dirname = dirname(fileURLToPath(import.meta.url));
 26  
 27  const ENGLISH_ONLY_MARKETS = process.env.ENGLISH_ONLY_MARKETS
 28    ? new Set(process.env.ENGLISH_ONLY_MARKETS.split(',').map(c => c.trim().toUpperCase()))
 29    : null;
 30  
 31  function getTemplateCountries() {
 32    const templatesDir = join(__dirname, '../../data/templates');
 33    if (!existsSync(templatesDir)) return [];
 34    let countries = readdirSync(templatesDir, { withFileTypes: true })
 35      .filter(d => {
 36        if (!d.isDirectory() && !d.isSymbolicLink()) return false;
 37        // Legacy flat path: data/templates/{CC}/email.json (symlinks like UK→GB are followed)
 38        if (existsSync(join(templatesDir, d.name, 'email.json'))) return true;
 39        // Language-specific path: data/templates/{CC}/{lang}/email.json
 40        const countryDir = join(templatesDir, d.name);
 41        try {
 42          return readdirSync(countryDir, { withFileTypes: true }).some(
 43            sub => sub.isDirectory() && existsSync(join(countryDir, sub.name, 'email.json'))
 44          );
 45        } catch {
 46          return false;
 47        }
 48      })
 49      .map(d => d.name.toUpperCase());
 50    if (ENGLISH_ONLY_MARKETS) {
 51      countries = countries.filter(c => ENGLISH_ONLY_MARKETS.has(c));
 52    }
 53    return countries;
 54  }
 55  
 56  // Check which proposal generation mode to use
 57  const USE_LLM_PROPOSALS = process.env.USE_LLM_PROPOSALS === 'true';
 58  
 59  const logger = new Logger('Proposals');
 60  const success = (...args) => logger.success(...args);
 61  const info = (...args) => logger.info(...args);
 62  const error = (...args) => logger.error(...args);
 63  
 64  /**
 65   * Run the proposals stage
 66   * @param {Object} options - Stage options
 67   * @param {number} options.limit - Limit number of proposals to generate
 68   * @param {number} options.concurrency - Number of concurrent generations (default: 2)
 69   * @param {number} options.minScore - Minimum score to include (default: 0)
 70   * @param {number} options.maxScore - Maximum score to include (default: 82)
 71   * @returns {Promise<Object>} Stage results
 72   */
 73  export async function runProposalsStage(options = {}) {
 74    const startTime = Date.now();
 75    const concurrency = options.concurrency || 2; // Lower concurrency for API-heavy operation
 76  
 77    // Get cutoff from environment variable or use default
 78    const defaultMaxScore = parseInt(process.env.LOW_SCORE_CUTOFF || '82', 10);
 79  
 80    const minScore = options.minScore ?? 0;
 81    const maxScore = options.maxScore ?? defaultMaxScore;
 82  
 83    info('Starting Proposals Stage...');
 84    info(`Mode: ${USE_LLM_PROPOSALS ? 'LLM-based ($0.18/site)' : 'Template-based ($0/site)'}`);
 85    info(`Score range: ${minScore}-${maxScore}`);
 86  
 87    // Process any rework-flagged messages first (operator feedback from QA review)
 88    const reworkRow = await getOne(
 89      `SELECT COUNT(DISTINCT site_id) AS cnt FROM messages
 90       WHERE direction = 'outbound' AND approval_status = 'rework'`
 91    );
 92    const reworkCount = reworkRow?.cnt ?? 0;
 93    if (reworkCount > 0) {
 94      info(`Processing ${reworkCount} rework site(s) before generating new proposals...`);
 95      await processReworkRequests();
 96    }
 97  
 98    // Re-queue enriched sites that have no country_code back to the enrich stage.
 99    // These get stranded because the main query filters by country_code = ANY($1::text[]),
100    // so NULL country_code sites are silently skipped every run. Resetting them lets enrich
101    // try again to detect the country from phone numbers / addresses.
102    const requeued = await run(
103      `UPDATE sites
104       SET status = CASE WHEN rescored_at IS NOT NULL THEN 'semantic_scored' ELSE 'prog_scored' END,
105           error_message = NULL,
106           enriched_at = NULL
107       WHERE status IN ('enriched', 'enriched_llm', 'enriched_regex')
108         AND (country_code IS NULL OR country_code = '')`
109    );
110    if (requeued.changes > 0) {
111      info(
112        `Re-queued ${requeued.changes} enriched sites with unknown country back to enrich stage`
113      );
114    }
115  
116    // Get sites that need proposals (only enriched low-scoring sites)
117    // Sites must complete: prog_scored → semantic_scored/vision_scored → enrichment before proposals
118    // Filter by countries that have email templates in data/templates/
119    const templateCountries = getTemplateCountries();
120    if (templateCountries.length === 0) {
121      info('No email templates found in data/templates/ — skipping proposals');
122      return {
123        processed: 0,
124        succeeded: 0,
125        failed: 0,
126        skipped: 0,
127        duration: Date.now() - startTime,
128      };
129    }
130  
131    const sql = `
132      SELECT id, domain, landing_page_url as url, score, grade, keyword, country_code
133      FROM sites
134      WHERE status IN ('enriched', 'enriched_llm', 'enriched_regex')
135        AND score >= $1
136        AND score <= $2
137        AND country_code = ANY($3::text[])
138        AND NOT EXISTS (
139          SELECT 1 FROM messages m
140          WHERE m.site_id = sites.id AND m.direction = 'outbound'
141        )
142      ORDER BY CASE WHEN language_code = 'en' OR country_code IN ('AU','CA','GB','IE','IN','NZ','US','ZA') THEN 0 ELSE 1 END ASC, score ASC
143      ${options.limit ? `LIMIT ${options.limit}` : ''}
144    `;
145  
146    const sites = await getAll(sql, [minScore, maxScore, templateCountries]);
147  
148    if (sites.length === 0) {
149      info('No sites need proposals');
150      return {
151        processed: 0,
152        succeeded: 0,
153        failed: 0,
154        skipped: 0,
155        duration: Date.now() - startTime,
156      };
157    }
158  
159    // Filter out blocklisted sites (directories/social media/franchises)
160    let ignoredCount = 0;
161    for (const site of sites) {
162      const blocked = checkBlocklist(site.domain, site.country_code);
163      if (blocked) {
164        await run(
165          `UPDATE sites SET status = 'ignored', error_message = $1 WHERE id = $2`,
166          [blocked.reason, site.id]
167        );
168        ignoredCount++;
169      }
170    }
171  
172    if (ignoredCount > 0) {
173      info(`Marked ${ignoredCount} sites as ignored (directories/social media/franchises)`);
174    }
175  
176    info(`Generating proposals for ${sites.length} sites`);
177  
178    const stats = {
179      processed: 0,
180      succeeded: 0,
181      failed: 0,
182      skipped: 0,
183    };
184  
185    // Process sites in batches
186    const { results, errors } = await processBatch(
187      sites,
188      async (site, index) => {
189        displayProgress(index + 1, sites.length, `Generating proposal for ${site.url}`);
190        return generateProposalForSite(site.id, site.url, site.keyword);
191      },
192      { concurrency }
193    );
194  
195    // Count successes and failures
196    stats.processed = sites.length;
197    stats.succeeded = results.filter(r => r !== null).length;
198    stats.failed = errors.length;
199  
200    // Log errors
201    for (const err of errors) {
202      const url = err.item?.url || 'unknown site';
203      const message = err.error?.message || err.toString();
204      error(`  Failed to generate proposal for ${url}: ${message}`);
205    }
206  
207    stats.duration = Date.now() - startTime;
208    generateStageCompletion('Proposals', stats);
209  
210    return stats;
211  }
212  
213  /**
214   * Generate proposal for a single site
215   * Routes to LLM-based or template-based generator based on USE_LLM_PROPOSALS env var
216   * @param {number} siteId - Site ID
217   * @param {string} url - Site URL
218   * @param {string} _keyword - Associated keyword (unused, kept for compatibility)
219   * @returns {Promise<Object>} Proposal result
220   */
221  async function generateProposalForSite(siteId, url, _keyword) {
222    try {
223      // Route to appropriate generator based on mode
224      const generateProposalVariants = USE_LLM_PROPOSALS
225        ? generateLLMProposals
226        : generateTemplateProposals;
227  
228      // Generate proposal variants (handles database insertion and status update internally)
229      const result = await generateProposalVariants(siteId);
230  
231      // Reset retry count on successful proposal generation
232      await resetRetries(siteId);
233  
234      const mode = USE_LLM_PROPOSALS ? 'LLM' : 'template';
235      success(
236        `  Generated ${result.variants.length} ${mode} proposals for ${result.contactCount || 0} contacts for ${url}`
237      );
238  
239      return result;
240    } catch (err) {
241      // Record failure and increment retry count (marks as 'failing' if limit exceeded)
242      await recordFailure(siteId, 'proposals', err, 'enriched');
243      throw err;
244    }
245  }
246  
247  /**
248   * Get proposals statistics
249   * @returns {Promise<Object>} Proposals statistics
250   */
251  export async function getProposalsStats() {
252    const stats = await getOne(
253      `SELECT
254         COUNT(DISTINCT site_id) as sites_with_proposals,
255         COUNT(id) as total_messages,
256         COUNT(CASE WHEN approval_status = 'pending' THEN 1 END) as pending_messages,
257         COUNT(CASE WHEN delivery_status = 'sent' THEN 1 END) as sent_messages
258       FROM messages
259       WHERE direction = 'outbound'`
260    );
261  
262    return stats;
263  }
264  
265  /**
266   * Regenerate proposals for specific sites
267   * @param {number[]} siteIds - Array of site IDs
268   * @returns {Promise<Object>} Regeneration results
269   */
270  export async function regenerateProposals(siteIds) {
271    info(`Regenerating proposals for ${siteIds.length} sites...`);
272  
273    // Delete existing outbound messages
274    await run(
275      `DELETE FROM messages WHERE direction = 'outbound' AND site_id = ANY($1::int[])`,
276      [siteIds]
277    );
278  
279    // Get site data
280    const sites = await getAll(
281      `SELECT id, landing_page_url as url, keyword
282       FROM sites
283       WHERE id = ANY($1::int[])`,
284      [siteIds]
285    );
286  
287    // Generate new proposals
288    let succeeded = 0;
289    let failed = 0;
290  
291    for (const site of sites) {
292      try {
293        await generateProposalForSite(site.id, site.url, site.keyword);
294        succeeded++;
295      } catch (err) {
296        error(`Failed to regenerate proposal for ${site.url}: ${err.message}`);
297        failed++;
298      }
299    }
300  
301    return { processed: siteIds.length, succeeded, failed };
302  }