serps.js
1 /** 2 * SERPs Stage 3 * Scrapes search engine results for selected keywords 4 */ 5 6 import { run, getOne, getAll, query } from '../utils/db.js'; 7 import { scrapeSERP } from '../scrape.js'; 8 import Logger from '../utils/logger.js'; 9 import { generateStageCompletion, displayProgress } from '../utils/summary-generator.js'; 10 import { processBatch } from '../utils/error-handler.js'; 11 import { checkBlocklist } from '../utils/site-filters.js'; 12 import { detectCountryFromTLD } from '../utils/tld-detector.js'; 13 14 const logger = new Logger('Serps'); 15 16 /** 17 * Run the SERPs stage 18 * @param {Object} options - Stage options 19 * @param {number} options.limit - Limit number of keywords to process 20 * @param {number} options.resultsPerKeyword - Number of results per keyword (default: 10) 21 * @returns {Promise<Object>} Stage results 22 */ 23 export async function runSerpsStage(options = {}) { 24 const startTime = Date.now(); 25 const resultsPerKeyword = options.resultsPerKeyword || 10; 26 27 logger.info('Starting SERPs Stage...'); 28 29 // Get keywords that need processing (skip those with 10+ sites already) 30 const sql = ` 31 SELECT k.id, k.keyword, k.country_code, k.google_domain, COUNT(s.id) as site_count 32 FROM keywords k 33 LEFT JOIN sites s ON k.keyword = s.keyword 34 WHERE k.status = 'active' 35 GROUP BY k.id, k.keyword, k.country_code, k.google_domain 36 HAVING COUNT(s.id) < ${resultsPerKeyword} 37 ORDER BY k.priority DESC, k.last_searched_at ASC NULLS FIRST, k.search_count ASC 38 ${options.limit ? `LIMIT ${options.limit}` : ''} 39 `; 40 41 const keywords = await getAll(sql); 42 43 if (keywords.length === 0) { 44 logger.info('No keywords to process'); 45 return { 46 processed: 0, 47 succeeded: 0, 48 failed: 0, 49 skipped: 0, 50 duration: Date.now() - startTime, 51 }; 52 } 53 54 const zenrowsConcurrency = parseInt(process.env.ZENROWS_CONCURRENCY || '20', 10); 55 logger.info(`Processing ${keywords.length} keywords (${resultsPerKeyword} results each)`); 56 logger.info( 57 `Using parallel processing (up to ${zenrowsConcurrency} concurrent ZenRows requests)` 58 ); 59 60 const stats = { 61 processed: 0, 62 succeeded: 0, 63 failed: 0, 64 skipped: 0, 65 newSites: 0, 66 markedInactive: 0, 67 }; 68 69 // Process keywords in parallel (ZenRows rate limiter handles concurrency) 70 const { results, errors } = await processBatch( 71 keywords, 72 async kw => { 73 try { 74 // Scrape SERP with country targeting 75 const { results, metadata } = await scrapeSERP( 76 kw.keyword, 77 resultsPerKeyword, 78 kw.country_code 79 ); 80 81 // Check if zero results - mark keyword as inactive 82 if (results.length === 0) { 83 await run( 84 `UPDATE keywords 85 SET status = 'inactive', 86 search_count = search_count + 1, 87 last_searched_at = CURRENT_TIMESTAMP 88 WHERE id = $1`, 89 [kw.id] 90 ); 91 92 logger.warn( 93 ` Scraped "${kw.keyword}" (${metadata.countryCode}): 0 URLs - marked as inactive` 94 ); 95 96 return { keyword: kw.keyword, resultsCount: 0, newSites: 0, markedInactive: true }; 97 } 98 99 // Insert sites with country metadata (ignore duplicates) 100 let newSites = 0; 101 const insertedDomains = []; 102 for (const result of results) { 103 // Extract domain from URL 104 const domain = new URL(result.url).hostname.replace(/^www\./, ''); 105 106 // Detect country from TLD if possible, otherwise use google_domain's country 107 const tldDetection = detectCountryFromTLD(domain); 108 const countryCode = 109 tldDetection.confidence === 'high' 110 ? tldDetection.countryCode // Use TLD detection for ccTLD domains 111 : metadata.countryCode; // Fall back to google_domain's country 112 113 const insertResult = await run( 114 `INSERT INTO sites ( 115 domain, landing_page_url, keyword, status, 116 country_code, google_domain 117 ) 118 VALUES ($1, $2, $3, 'found', $4, $5) 119 ON CONFLICT DO NOTHING`, 120 [domain, result.url, kw.keyword, countryCode, metadata.googleDomain] 121 ); 122 if (insertResult.changes > 0) { 123 newSites++; 124 insertedDomains.push(domain); 125 } 126 } 127 128 // Check for blocklisted domains and mark as ignored (directories/social media/franchises) 129 let ignoredCount = 0; 130 for (const domain of insertedDomains) { 131 const blocked = checkBlocklist(domain, metadata.countryCode); 132 if (blocked) { 133 await run( 134 `UPDATE sites SET status = 'ignored', error_message = $1 WHERE domain = $2 AND keyword = $3`, 135 [blocked.reason, domain, kw.keyword] 136 ); 137 ignoredCount++; 138 } 139 } 140 141 if (ignoredCount > 0) { 142 logger.info( 143 ` Marked ${ignoredCount} sites as ignored (directories/social media/franchises)` 144 ); 145 } 146 147 // Update keyword metrics: search_count, zenrows_count (cumulative sites), last_searched_at 148 await run( 149 `UPDATE keywords 150 SET search_count = search_count + 1, 151 zenrows_count = zenrows_count + $1, 152 last_searched_at = CURRENT_TIMESTAMP 153 WHERE id = $2`, 154 [newSites, kw.id] 155 ); 156 157 logger.success( 158 ` Scraped "${kw.keyword}" (${metadata.countryCode}): ${results.length} URLs (${newSites} new)` 159 ); 160 161 return { keyword: kw.keyword, resultsCount: results.length, newSites }; 162 } catch (err) { 163 logger.error(` Failed to scrape "${kw.keyword}": ${err.message}`); 164 throw err; 165 } 166 }, 167 { 168 concurrency: zenrowsConcurrency, // Read from ZENROWS_CONCURRENCY env var 169 onProgress: (completed, total) => { 170 displayProgress(completed, total, 'Scraping keywords'); 171 }, 172 } 173 ); 174 175 // Update stats 176 stats.processed = keywords.length; 177 stats.succeeded = results.length; 178 stats.failed = errors.length; 179 stats.newSites = results.reduce((sum, r) => sum + r.newSites, 0); 180 stats.markedInactive = results.filter(r => r.markedInactive).length; 181 182 stats.duration = Date.now() - startTime; 183 generateStageCompletion('SERPs', stats); 184 185 logger.info(`Total new sites added: ${stats.newSites}`); 186 if (stats.markedInactive > 0) { 187 logger.info(`Keywords marked inactive (zero results): ${stats.markedInactive}`); 188 } 189 190 return stats; 191 } 192 193 /** 194 * Get SERP statistics 195 * @returns {Promise<Object>} SERP statistics 196 */ 197 export async function getSerpsStats() { 198 const stats = await getOne( 199 `SELECT 200 COUNT(DISTINCT keyword) as total_keywords, 201 COUNT(id) as total_sites, 202 COUNT(CASE WHEN status = 'found' THEN 1 END) as found_sites, 203 COUNT(CASE WHEN status IN ('prog_scored','semantic_scored','vision_scored') THEN 1 END) as scored_sites 204 FROM sites` 205 ); 206 207 return stats; 208 }