process.js
1 /** 2 * Main Processing Pipeline 3 * Populates keywords, scrapes sites via ZenRows, captures, scores, and generates proposals 4 */ 5 6 import { join, dirname } from 'path'; 7 import { fileURLToPath } from 'url'; 8 import Logger from './utils/logger.js'; 9 import { 10 generateKeywordCombinations, 11 upsertKeyword, 12 incrementProcessedCount, 13 incrementLowScoringCount, 14 incrementReworkLowScoringCount, 15 } from './utils/keyword-manager.js'; 16 import { scrapeSERP } from './scrape.js'; 17 import { captureWebsite } from './capture.js'; 18 import { scoreWebsite, extractGrade } from './score.js'; 19 import { saveScreenshots } from './utils/screenshot-storage.js'; 20 import { writeHtmlDom } from './utils/html-storage.js'; 21 import { setScoreJson } from './utils/score-storage.js'; 22 import './utils/load-env.js'; 23 import { run, getOne, getAll, query, withTransaction, closePool, getPool } from './utils/db.js'; 24 25 const __filename = fileURLToPath(import.meta.url); 26 const __dirname = dirname(__filename); 27 const projectRoot = join(__dirname, '..'); 28 29 const logger = new Logger('Process'); 30 31 /** 32 * Main processing function 33 * @param {number} limit - Maximum number of sites to process 34 * @param {string} url - Optional URL to process directly 35 */ 36 async function main(limit = 1000, url = null) { 37 try { 38 // If URL is provided, process it directly 39 if (url) { 40 logger.info(`Processing single URL: ${url}`); 41 await processSingleUrl(url); 42 logger.success('URL processing complete!'); 43 return; 44 } 45 46 logger.info(`Starting processing pipeline (limit: ${limit} sites)...`); 47 48 // Step 1: Populate/freshen keywords table 49 await populateKeywords(); 50 51 // Step 2: Get priority keywords to scrape 52 const keywordsToScrape = await getKeywordsToScrape(Math.ceil(limit / 10)); 53 54 let sitesProcessed = 0; 55 56 // Step 3: Process each keyword 57 for (const keywordRow of keywordsToScrape) { 58 if (sitesProcessed >= limit) { 59 logger.info(`Reached site limit of ${limit}, stopping`); 60 break; 61 } 62 63 const processedCount = await processKeyword(keywordRow, limit - sitesProcessed); 64 sitesProcessed += processedCount; 65 } 66 67 logger.success(`Processing complete! Processed ${sitesProcessed} sites`); 68 } catch (error) { 69 logger.error('Processing pipeline failed', error); 70 process.exit(1); 71 } 72 } 73 74 /** 75 * Process a single keyword 76 * @param {Object} keywordRow - Keyword row from database 77 * @param {number} remainingLimit - Sites remaining to process 78 * @returns {number} Number of sites processed 79 */ 80 async function processKeyword(keywordRow, remainingLimit) { 81 const { keyword } = keywordRow; 82 logger.info(`Processing keyword: "${keyword}"`); 83 84 try { 85 // Scrape sites from ZenRows 86 const sites = await scrapeSERP(keyword, 10); 87 88 // Update keyword zenrows_count 89 await upsertKeyword(keyword, { 90 zenrows_count: sites.length, 91 last_scraped_at: new Date().toISOString(), 92 }); 93 94 logger.info(`Found ${sites.length} sites for "${keyword}"`); 95 96 // Process each site 97 let processed = 0; 98 for (const site of sites) { 99 if (processed >= remainingLimit) break; 100 101 const remainingSites = remainingLimit - processed; 102 logger.info(`Processing ${site.domain} (${remainingSites} remaining)`); 103 104 try { 105 await processSite(site, keyword); 106 processed++; 107 await incrementProcessedCount(keyword); 108 } catch (error) { 109 logger.error(`Failed to process ${site.domain}`, error); 110 // Continue to next site on error 111 } 112 } 113 114 return processed; 115 } catch (error) { 116 logger.error(`Failed to scrape keyword "${keyword}"`, error); 117 return 0; 118 } 119 } 120 121 /** 122 * Populate keywords table with all combinations 123 */ 124 async function populateKeywords() { 125 logger.info('Populating keywords table...'); 126 127 const combinations = generateKeywordCombinations(); 128 129 let newKeywords = 0; 130 let existingKeywords = 0; 131 132 for (const combo of combinations) { 133 const existing = await getOne( 134 'SELECT id FROM keywords WHERE keyword = $1 AND country_code = $2', 135 [combo.keyword, combo.countryCode] 136 ); 137 138 if (!existing) { 139 await upsertKeyword(combo.keyword, { 140 country_code: combo.countryCode, 141 google_domain: combo.googleDomain, 142 search_volume: combo.searchVolume, 143 priority: combo.priority, 144 }); 145 newKeywords++; 146 } else { 147 existingKeywords++; 148 } 149 } 150 151 logger.success( 152 `Keywords populated: ${newKeywords} new, ${existingKeywords} existing (${combinations.length} total)` 153 ); 154 } 155 156 /** 157 * Get keywords to scrape, ordered by priority 158 * @param {number} limit - Number of keywords to return 159 * @returns {Array} Keywords to scrape 160 */ 161 async function getKeywordsToScrape(limit) { 162 // Priority: never scraped first, then oldest scraped, then highest zenrows_count 163 return await getAll( 164 `SELECT * FROM keywords 165 ORDER BY 166 CASE WHEN last_scraped_at IS NULL THEN 0 ELSE 1 END, 167 last_scraped_at ASC, 168 zenrows_count DESC 169 LIMIT $1`, 170 [limit] 171 ); 172 } 173 174 /** 175 * Process a single site: capture, store, and score 176 * @param {Object} site - Site object from scraper 177 * @param {string} keyword - Search keyword 178 */ 179 async function processSite(site, keyword) { 180 const { url, domain } = site; 181 182 // Check if site already exists 183 const existing = await getOne('SELECT id FROM sites WHERE domain = $1', [domain]); 184 185 if (existing) { 186 logger.info(`Site ${domain} already exists, skipping`); 187 return; 188 } 189 190 // Step 1: Capture website (screenshots + HTML) 191 logger.info(`Capturing ${domain}...`); 192 const captureData = await captureWebsite(url); 193 194 // Check for HTTP errors 195 if (captureData.httpStatusCode && captureData.httpStatusCode >= 400) { 196 const errorMsg = `HTTP ${captureData.httpStatusCode} - Site returned error status`; 197 logger.warn(errorMsg); 198 199 // Store site with error status 200 await run( 201 `INSERT INTO sites ( 202 domain, landing_page_url, keyword, http_status_code, 203 processing_status, error_log, created_at 204 ) VALUES ($1, $2, $3, $4, 'failed', $5, CURRENT_TIMESTAMP)`, 205 [domain, url, keyword, captureData.httpStatusCode, errorMsg] 206 ); 207 208 return; 209 } 210 211 // Step 2: Score website 212 logger.info(`Scoring ${domain}...`); 213 const scoreData = await scoreWebsite({ 214 url, 215 domain, 216 screenshots: captureData.screenshots, 217 screenshotsUncropped: captureData.screenshotsUncropped, 218 html: captureData.html, 219 }); 220 221 const grade = extractGrade(scoreData); 222 const conversionScore = scoreData?.overall_calculation?.conversion_score || null; 223 224 logger.success(`${domain} scored: ${grade || 'N/A'} (${conversionScore || 'N/A'})`); 225 226 // Step 3: Insert site record first to get site_id 227 const result = await run( 228 `INSERT INTO sites ( 229 domain, landing_page_url, keyword, 230 html_dom, http_status_code, 231 score, grade, 232 status, scored_at, created_at 233 ) VALUES ($1, $2, $3, $4, $5, $6, $7, 'prog_scored', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) 234 RETURNING id`, 235 [ 236 domain, 237 url, 238 keyword, 239 captureData.html ? 'fs' : null, 240 captureData.httpStatusCode, 241 conversionScore, 242 grade, 243 ] 244 ); 245 246 const siteId = result.lastInsertRowid; 247 248 // Write score_json to filesystem (sentinel already set in DB above) 249 setScoreJson(siteId, JSON.stringify(scoreData)); 250 251 // Write HTML to filesystem 252 if (captureData.html) { 253 writeHtmlDom(siteId, captureData.html); 254 } 255 256 // Step 4: Save screenshots to disk and update screenshot_path 257 const screenshotsForStorage = { 258 desktop_above: captureData.screenshots.desktop_above, 259 desktop_below: captureData.screenshots.desktop_below, 260 mobile_above: captureData.screenshots.mobile_above, 261 desktop_above_uncropped: captureData.screenshotsUncropped.desktop_above, 262 desktop_below_uncropped: captureData.screenshotsUncropped.desktop_below, 263 mobile_above_uncropped: captureData.screenshotsUncropped.mobile_above, 264 }; 265 266 const screenshotPath = await saveScreenshots(siteId, screenshotsForStorage); 267 268 // Update screenshot_path 269 await run('UPDATE sites SET screenshot_path = $1 WHERE id = $2', [screenshotPath, siteId]); 270 271 // Step 4: Track low-scoring sites 272 const lowGrades = ['B-', 'C+', 'C', 'C-', 'D+', 'D', 'D-', 'F']; 273 if (grade && lowGrades.includes(grade)) { 274 await incrementLowScoringCount(keyword); 275 276 // Check if it's still low after resubmit (resubmit happens in scoreWebsite) 277 // If scoreData came from resubmit, it would have been updated 278 if (scoreData.resubmitted && lowGrades.includes(grade)) { 279 await incrementReworkLowScoringCount(keyword); 280 } 281 } 282 283 logger.success(`${domain} stored successfully`); 284 } 285 286 /** 287 * Process a single URL directly (for testing or manual processing) 288 * @param {string} url - URL to process 289 */ 290 async function processSingleUrl(url) { 291 // Extract domain from URL 292 const domain = new URL(url).hostname.replace(/^www\./, ''); 293 294 // Create site object 295 const site = { url, domain }; 296 297 // Use generic keyword for single URL processing 298 const keyword = 'manual'; 299 300 await processSite(site, keyword); 301 } 302 303 // Parse command line arguments and run only if executed directly 304 // Usage: node src/process.js [limit] [--url=<url>] 305 if (import.meta.url === `file://${process.argv[1]}`) { 306 const args = process.argv.slice(2); 307 let limit = 1000; 308 let url = null; 309 310 for (const arg of args) { 311 if (arg.startsWith('--url=')) { 312 url = arg.substring(6); 313 } else if (!isNaN(parseInt(arg, 10))) { 314 limit = parseInt(arg, 10); 315 } 316 } 317 318 main(limit, url).catch(error => { 319 logger.error('Fatal error', error); 320 process.exit(1); 321 }); 322 } 323 324 // Export for testing 325 export { main, processSingleUrl, processSite };