proposals.js
1 /** 2 * Proposals Stage 3 * Generate personalized proposals for low-scoring sites 4 * 5 * Supports two modes: 6 * 1. LLM-based (USE_LLM_PROPOSALS=true): $0.18/site, requires approval workflow 7 * 2. Template-based (USE_LLM_PROPOSALS=false): $0/site, pre-approved templates 8 */ 9 10 import { run, getOne, getAll } from '../utils/db.js'; 11 import { readdirSync, existsSync } from 'fs'; 12 import { join, dirname } from 'path'; 13 import { fileURLToPath } from 'url'; 14 import { 15 generateProposalVariants as generateLLMProposals, 16 processReworkQueue as processReworkRequests, 17 } from '../proposal-generator-v2.js'; 18 import { generateProposalVariants as generateTemplateProposals } from '../proposal-generator-templates.js'; 19 import Logger from '../utils/logger.js'; 20 import { generateStageCompletion, displayProgress } from '../utils/summary-generator.js'; 21 import { processBatch } from '../utils/error-handler.js'; 22 import { checkBlocklist } from '../utils/site-filters.js'; 23 import { recordFailure, resetRetries } from '../utils/retry-handler.js'; 24 25 const __dirname = dirname(fileURLToPath(import.meta.url)); 26 27 const ENGLISH_ONLY_MARKETS = process.env.ENGLISH_ONLY_MARKETS 28 ? new Set(process.env.ENGLISH_ONLY_MARKETS.split(',').map(c => c.trim().toUpperCase())) 29 : null; 30 31 function getTemplateCountries() { 32 const templatesDir = join(__dirname, '../../data/templates'); 33 if (!existsSync(templatesDir)) return []; 34 let countries = readdirSync(templatesDir, { withFileTypes: true }) 35 .filter(d => { 36 if (!d.isDirectory() && !d.isSymbolicLink()) return false; 37 // Legacy flat path: data/templates/{CC}/email.json (symlinks like UK→GB are followed) 38 if (existsSync(join(templatesDir, d.name, 'email.json'))) return true; 39 // Language-specific path: data/templates/{CC}/{lang}/email.json 40 const countryDir = join(templatesDir, d.name); 41 try { 42 return readdirSync(countryDir, { withFileTypes: true }).some( 43 sub => sub.isDirectory() && existsSync(join(countryDir, sub.name, 'email.json')) 44 ); 45 } catch { 46 return false; 47 } 48 }) 49 .map(d => d.name.toUpperCase()); 50 if (ENGLISH_ONLY_MARKETS) { 51 countries = countries.filter(c => ENGLISH_ONLY_MARKETS.has(c)); 52 } 53 return countries; 54 } 55 56 // Check which proposal generation mode to use 57 const USE_LLM_PROPOSALS = process.env.USE_LLM_PROPOSALS === 'true'; 58 59 const logger = new Logger('Proposals'); 60 const success = (...args) => logger.success(...args); 61 const info = (...args) => logger.info(...args); 62 const error = (...args) => logger.error(...args); 63 64 /** 65 * Run the proposals stage 66 * @param {Object} options - Stage options 67 * @param {number} options.limit - Limit number of proposals to generate 68 * @param {number} options.concurrency - Number of concurrent generations (default: 2) 69 * @param {number} options.minScore - Minimum score to include (default: 0) 70 * @param {number} options.maxScore - Maximum score to include (default: 82) 71 * @returns {Promise<Object>} Stage results 72 */ 73 export async function runProposalsStage(options = {}) { 74 const startTime = Date.now(); 75 const concurrency = options.concurrency || 2; // Lower concurrency for API-heavy operation 76 77 // Get cutoff from environment variable or use default 78 const defaultMaxScore = parseInt(process.env.LOW_SCORE_CUTOFF || '82', 10); 79 80 const minScore = options.minScore ?? 0; 81 const maxScore = options.maxScore ?? defaultMaxScore; 82 83 info('Starting Proposals Stage...'); 84 info(`Mode: ${USE_LLM_PROPOSALS ? 'LLM-based ($0.18/site)' : 'Template-based ($0/site)'}`); 85 info(`Score range: ${minScore}-${maxScore}`); 86 87 // Process any rework-flagged messages first (operator feedback from QA review) 88 const reworkRow = await getOne( 89 `SELECT COUNT(DISTINCT site_id) AS cnt FROM messages 90 WHERE direction = 'outbound' AND approval_status = 'rework'` 91 ); 92 const reworkCount = reworkRow?.cnt ?? 0; 93 if (reworkCount > 0) { 94 info(`Processing ${reworkCount} rework site(s) before generating new proposals...`); 95 await processReworkRequests(); 96 } 97 98 // Re-queue enriched sites that have no country_code back to the enrich stage. 99 // These get stranded because the main query filters by country_code = ANY($1::text[]), 100 // so NULL country_code sites are silently skipped every run. Resetting them lets enrich 101 // try again to detect the country from phone numbers / addresses. 102 const requeued = await run( 103 `UPDATE sites 104 SET status = CASE WHEN rescored_at IS NOT NULL THEN 'semantic_scored' ELSE 'prog_scored' END, 105 error_message = NULL, 106 enriched_at = NULL 107 WHERE status IN ('enriched', 'enriched_llm', 'enriched_regex') 108 AND (country_code IS NULL OR country_code = '')` 109 ); 110 if (requeued.changes > 0) { 111 info( 112 `Re-queued ${requeued.changes} enriched sites with unknown country back to enrich stage` 113 ); 114 } 115 116 // Get sites that need proposals (only enriched low-scoring sites) 117 // Sites must complete: prog_scored → semantic_scored/vision_scored → enrichment before proposals 118 // Filter by countries that have email templates in data/templates/ 119 const templateCountries = getTemplateCountries(); 120 if (templateCountries.length === 0) { 121 info('No email templates found in data/templates/ — skipping proposals'); 122 return { 123 processed: 0, 124 succeeded: 0, 125 failed: 0, 126 skipped: 0, 127 duration: Date.now() - startTime, 128 }; 129 } 130 131 const sql = ` 132 SELECT id, domain, landing_page_url as url, score, grade, keyword, country_code 133 FROM sites 134 WHERE status IN ('enriched', 'enriched_llm', 'enriched_regex') 135 AND score >= $1 136 AND score <= $2 137 AND country_code = ANY($3::text[]) 138 AND NOT EXISTS ( 139 SELECT 1 FROM messages m 140 WHERE m.site_id = sites.id AND m.direction = 'outbound' 141 ) 142 ORDER BY CASE WHEN language_code = 'en' OR country_code IN ('AU','CA','GB','IE','IN','NZ','US','ZA') THEN 0 ELSE 1 END ASC, score ASC 143 ${options.limit ? `LIMIT ${options.limit}` : ''} 144 `; 145 146 const sites = await getAll(sql, [minScore, maxScore, templateCountries]); 147 148 if (sites.length === 0) { 149 info('No sites need proposals'); 150 return { 151 processed: 0, 152 succeeded: 0, 153 failed: 0, 154 skipped: 0, 155 duration: Date.now() - startTime, 156 }; 157 } 158 159 // Filter out blocklisted sites (directories/social media/franchises) 160 let ignoredCount = 0; 161 for (const site of sites) { 162 const blocked = checkBlocklist(site.domain, site.country_code); 163 if (blocked) { 164 await run( 165 `UPDATE sites SET status = 'ignored', error_message = $1 WHERE id = $2`, 166 [blocked.reason, site.id] 167 ); 168 ignoredCount++; 169 } 170 } 171 172 if (ignoredCount > 0) { 173 info(`Marked ${ignoredCount} sites as ignored (directories/social media/franchises)`); 174 } 175 176 info(`Generating proposals for ${sites.length} sites`); 177 178 const stats = { 179 processed: 0, 180 succeeded: 0, 181 failed: 0, 182 skipped: 0, 183 }; 184 185 // Process sites in batches 186 const { results, errors } = await processBatch( 187 sites, 188 async (site, index) => { 189 displayProgress(index + 1, sites.length, `Generating proposal for ${site.url}`); 190 return generateProposalForSite(site.id, site.url, site.keyword); 191 }, 192 { concurrency } 193 ); 194 195 // Count successes and failures 196 stats.processed = sites.length; 197 stats.succeeded = results.filter(r => r !== null).length; 198 stats.failed = errors.length; 199 200 // Log errors 201 for (const err of errors) { 202 const url = err.item?.url || 'unknown site'; 203 const message = err.error?.message || err.toString(); 204 error(` Failed to generate proposal for ${url}: ${message}`); 205 } 206 207 stats.duration = Date.now() - startTime; 208 generateStageCompletion('Proposals', stats); 209 210 return stats; 211 } 212 213 /** 214 * Generate proposal for a single site 215 * Routes to LLM-based or template-based generator based on USE_LLM_PROPOSALS env var 216 * @param {number} siteId - Site ID 217 * @param {string} url - Site URL 218 * @param {string} _keyword - Associated keyword (unused, kept for compatibility) 219 * @returns {Promise<Object>} Proposal result 220 */ 221 async function generateProposalForSite(siteId, url, _keyword) { 222 try { 223 // Route to appropriate generator based on mode 224 const generateProposalVariants = USE_LLM_PROPOSALS 225 ? generateLLMProposals 226 : generateTemplateProposals; 227 228 // Generate proposal variants (handles database insertion and status update internally) 229 const result = await generateProposalVariants(siteId); 230 231 // Reset retry count on successful proposal generation 232 await resetRetries(siteId); 233 234 const mode = USE_LLM_PROPOSALS ? 'LLM' : 'template'; 235 success( 236 ` Generated ${result.variants.length} ${mode} proposals for ${result.contactCount || 0} contacts for ${url}` 237 ); 238 239 return result; 240 } catch (err) { 241 // Record failure and increment retry count (marks as 'failing' if limit exceeded) 242 await recordFailure(siteId, 'proposals', err, 'enriched'); 243 throw err; 244 } 245 } 246 247 /** 248 * Get proposals statistics 249 * @returns {Promise<Object>} Proposals statistics 250 */ 251 export async function getProposalsStats() { 252 const stats = await getOne( 253 `SELECT 254 COUNT(DISTINCT site_id) as sites_with_proposals, 255 COUNT(id) as total_messages, 256 COUNT(CASE WHEN approval_status = 'pending' THEN 1 END) as pending_messages, 257 COUNT(CASE WHEN delivery_status = 'sent' THEN 1 END) as sent_messages 258 FROM messages 259 WHERE direction = 'outbound'` 260 ); 261 262 return stats; 263 } 264 265 /** 266 * Regenerate proposals for specific sites 267 * @param {number[]} siteIds - Array of site IDs 268 * @returns {Promise<Object>} Regeneration results 269 */ 270 export async function regenerateProposals(siteIds) { 271 info(`Regenerating proposals for ${siteIds.length} sites...`); 272 273 // Delete existing outbound messages 274 await run( 275 `DELETE FROM messages WHERE direction = 'outbound' AND site_id = ANY($1::int[])`, 276 [siteIds] 277 ); 278 279 // Get site data 280 const sites = await getAll( 281 `SELECT id, landing_page_url as url, keyword 282 FROM sites 283 WHERE id = ANY($1::int[])`, 284 [siteIds] 285 ); 286 287 // Generate new proposals 288 let succeeded = 0; 289 let failed = 0; 290 291 for (const site of sites) { 292 try { 293 await generateProposalForSite(site.id, site.url, site.keyword); 294 succeeded++; 295 } catch (err) { 296 error(`Failed to regenerate proposal for ${site.url}: ${err.message}`); 297 failed++; 298 } 299 } 300 301 return { processed: siteIds.length, succeeded, failed }; 302 }