/ src / stages / serps.js
serps.js
  1  /**
  2   * SERPs Stage
  3   * Scrapes search engine results for selected keywords
  4   */
  5  
  6  import { run, getOne, getAll, query } from '../utils/db.js';
  7  import { scrapeSERP } from '../scrape.js';
  8  import Logger from '../utils/logger.js';
  9  import { generateStageCompletion, displayProgress } from '../utils/summary-generator.js';
 10  import { processBatch } from '../utils/error-handler.js';
 11  import { checkBlocklist } from '../utils/site-filters.js';
 12  import { detectCountryFromTLD } from '../utils/tld-detector.js';
 13  
 14  const logger = new Logger('Serps');
 15  
 16  /**
 17   * Run the SERPs stage
 18   * @param {Object} options - Stage options
 19   * @param {number} options.limit - Limit number of keywords to process
 20   * @param {number} options.resultsPerKeyword - Number of results per keyword (default: 10)
 21   * @returns {Promise<Object>} Stage results
 22   */
 23  export async function runSerpsStage(options = {}) {
 24    const startTime = Date.now();
 25    const resultsPerKeyword = options.resultsPerKeyword || 10;
 26  
 27    logger.info('Starting SERPs Stage...');
 28  
 29    // Get keywords that need processing (skip those with 10+ sites already)
 30    const sql = `
 31      SELECT k.id, k.keyword, k.country_code, k.google_domain, COUNT(s.id) as site_count
 32      FROM keywords k
 33      LEFT JOIN sites s ON k.keyword = s.keyword
 34      WHERE k.status = 'active'
 35      GROUP BY k.id, k.keyword, k.country_code, k.google_domain
 36      HAVING COUNT(s.id) < ${resultsPerKeyword}
 37      ORDER BY k.priority DESC, k.last_searched_at ASC NULLS FIRST, k.search_count ASC
 38      ${options.limit ? `LIMIT ${options.limit}` : ''}
 39    `;
 40  
 41    const keywords = await getAll(sql);
 42  
 43    if (keywords.length === 0) {
 44      logger.info('No keywords to process');
 45      return {
 46        processed: 0,
 47        succeeded: 0,
 48        failed: 0,
 49        skipped: 0,
 50        duration: Date.now() - startTime,
 51      };
 52    }
 53  
 54    const zenrowsConcurrency = parseInt(process.env.ZENROWS_CONCURRENCY || '20', 10);
 55    logger.info(`Processing ${keywords.length} keywords (${resultsPerKeyword} results each)`);
 56    logger.info(
 57      `Using parallel processing (up to ${zenrowsConcurrency} concurrent ZenRows requests)`
 58    );
 59  
 60    const stats = {
 61      processed: 0,
 62      succeeded: 0,
 63      failed: 0,
 64      skipped: 0,
 65      newSites: 0,
 66      markedInactive: 0,
 67    };
 68  
 69    // Process keywords in parallel (ZenRows rate limiter handles concurrency)
 70    const { results, errors } = await processBatch(
 71      keywords,
 72      async kw => {
 73        try {
 74          // Scrape SERP with country targeting
 75          const { results, metadata } = await scrapeSERP(
 76            kw.keyword,
 77            resultsPerKeyword,
 78            kw.country_code
 79          );
 80  
 81          // Check if zero results - mark keyword as inactive
 82          if (results.length === 0) {
 83            await run(
 84              `UPDATE keywords
 85               SET status = 'inactive',
 86                   search_count = search_count + 1,
 87                   last_searched_at = CURRENT_TIMESTAMP
 88               WHERE id = $1`,
 89              [kw.id]
 90            );
 91  
 92            logger.warn(
 93              `  Scraped "${kw.keyword}" (${metadata.countryCode}): 0 URLs - marked as inactive`
 94            );
 95  
 96            return { keyword: kw.keyword, resultsCount: 0, newSites: 0, markedInactive: true };
 97          }
 98  
 99          // Insert sites with country metadata (ignore duplicates)
100          let newSites = 0;
101          const insertedDomains = [];
102          for (const result of results) {
103            // Extract domain from URL
104            const domain = new URL(result.url).hostname.replace(/^www\./, '');
105  
106            // Detect country from TLD if possible, otherwise use google_domain's country
107            const tldDetection = detectCountryFromTLD(domain);
108            const countryCode =
109              tldDetection.confidence === 'high'
110                ? tldDetection.countryCode // Use TLD detection for ccTLD domains
111                : metadata.countryCode; // Fall back to google_domain's country
112  
113            const insertResult = await run(
114              `INSERT INTO sites (
115                 domain, landing_page_url, keyword, status,
116                 country_code, google_domain
117               )
118               VALUES ($1, $2, $3, 'found', $4, $5)
119               ON CONFLICT DO NOTHING`,
120              [domain, result.url, kw.keyword, countryCode, metadata.googleDomain]
121            );
122            if (insertResult.changes > 0) {
123              newSites++;
124              insertedDomains.push(domain);
125            }
126          }
127  
128          // Check for blocklisted domains and mark as ignored (directories/social media/franchises)
129          let ignoredCount = 0;
130          for (const domain of insertedDomains) {
131            const blocked = checkBlocklist(domain, metadata.countryCode);
132            if (blocked) {
133              await run(
134                `UPDATE sites SET status = 'ignored', error_message = $1 WHERE domain = $2 AND keyword = $3`,
135                [blocked.reason, domain, kw.keyword]
136              );
137              ignoredCount++;
138            }
139          }
140  
141          if (ignoredCount > 0) {
142            logger.info(
143              `    Marked ${ignoredCount} sites as ignored (directories/social media/franchises)`
144            );
145          }
146  
147          // Update keyword metrics: search_count, zenrows_count (cumulative sites), last_searched_at
148          await run(
149            `UPDATE keywords
150             SET search_count = search_count + 1,
151                 zenrows_count = zenrows_count + $1,
152                 last_searched_at = CURRENT_TIMESTAMP
153             WHERE id = $2`,
154            [newSites, kw.id]
155          );
156  
157          logger.success(
158            `  Scraped "${kw.keyword}" (${metadata.countryCode}): ${results.length} URLs (${newSites} new)`
159          );
160  
161          return { keyword: kw.keyword, resultsCount: results.length, newSites };
162        } catch (err) {
163          logger.error(`  Failed to scrape "${kw.keyword}": ${err.message}`);
164          throw err;
165        }
166      },
167      {
168        concurrency: zenrowsConcurrency, // Read from ZENROWS_CONCURRENCY env var
169        onProgress: (completed, total) => {
170          displayProgress(completed, total, 'Scraping keywords');
171        },
172      }
173    );
174  
175    // Update stats
176    stats.processed = keywords.length;
177    stats.succeeded = results.length;
178    stats.failed = errors.length;
179    stats.newSites = results.reduce((sum, r) => sum + r.newSites, 0);
180    stats.markedInactive = results.filter(r => r.markedInactive).length;
181  
182    stats.duration = Date.now() - startTime;
183    generateStageCompletion('SERPs', stats);
184  
185    logger.info(`Total new sites added: ${stats.newSites}`);
186    if (stats.markedInactive > 0) {
187      logger.info(`Keywords marked inactive (zero results): ${stats.markedInactive}`);
188    }
189  
190    return stats;
191  }
192  
193  /**
194   * Get SERP statistics
195   * @returns {Promise<Object>} SERP statistics
196   */
197  export async function getSerpsStats() {
198    const stats = await getOne(
199      `SELECT
200         COUNT(DISTINCT keyword) as total_keywords,
201         COUNT(id) as total_sites,
202         COUNT(CASE WHEN status = 'found' THEN 1 END) as found_sites,
203         COUNT(CASE WHEN status IN ('prog_scored','semantic_scored','vision_scored') THEN 1 END) as scored_sites
204       FROM sites`
205    );
206  
207    return stats;
208  }