/ src / process.js
process.js
  1  /**
  2   * Main Processing Pipeline
  3   * Populates keywords, scrapes sites via ZenRows, captures, scores, and generates proposals
  4   */
  5  
  6  import { join, dirname } from 'path';
  7  import { fileURLToPath } from 'url';
  8  import Logger from './utils/logger.js';
  9  import {
 10    generateKeywordCombinations,
 11    upsertKeyword,
 12    incrementProcessedCount,
 13    incrementLowScoringCount,
 14    incrementReworkLowScoringCount,
 15  } from './utils/keyword-manager.js';
 16  import { scrapeSERP } from './scrape.js';
 17  import { captureWebsite } from './capture.js';
 18  import { scoreWebsite, extractGrade } from './score.js';
 19  import { saveScreenshots } from './utils/screenshot-storage.js';
 20  import { writeHtmlDom } from './utils/html-storage.js';
 21  import { setScoreJson } from './utils/score-storage.js';
 22  import './utils/load-env.js';
 23  import { run, getOne, getAll, query, withTransaction, closePool, getPool } from './utils/db.js';
 24  
 25  const __filename = fileURLToPath(import.meta.url);
 26  const __dirname = dirname(__filename);
 27  const projectRoot = join(__dirname, '..');
 28  
 29  const logger = new Logger('Process');
 30  
 31  /**
 32   * Main processing function
 33   * @param {number} limit - Maximum number of sites to process
 34   * @param {string} url - Optional URL to process directly
 35   */
 36  async function main(limit = 1000, url = null) {
 37    try {
 38      // If URL is provided, process it directly
 39      if (url) {
 40        logger.info(`Processing single URL: ${url}`);
 41        await processSingleUrl(url);
 42        logger.success('URL processing complete!');
 43        return;
 44      }
 45  
 46      logger.info(`Starting processing pipeline (limit: ${limit} sites)...`);
 47  
 48      // Step 1: Populate/freshen keywords table
 49      await populateKeywords();
 50  
 51      // Step 2: Get priority keywords to scrape
 52      const keywordsToScrape = await getKeywordsToScrape(Math.ceil(limit / 10));
 53  
 54      let sitesProcessed = 0;
 55  
 56      // Step 3: Process each keyword
 57      for (const keywordRow of keywordsToScrape) {
 58        if (sitesProcessed >= limit) {
 59          logger.info(`Reached site limit of ${limit}, stopping`);
 60          break;
 61        }
 62  
 63        const processedCount = await processKeyword(keywordRow, limit - sitesProcessed);
 64        sitesProcessed += processedCount;
 65      }
 66  
 67      logger.success(`Processing complete! Processed ${sitesProcessed} sites`);
 68    } catch (error) {
 69      logger.error('Processing pipeline failed', error);
 70      process.exit(1);
 71    }
 72  }
 73  
 74  /**
 75   * Process a single keyword
 76   * @param {Object} keywordRow - Keyword row from database
 77   * @param {number} remainingLimit - Sites remaining to process
 78   * @returns {number} Number of sites processed
 79   */
 80  async function processKeyword(keywordRow, remainingLimit) {
 81    const { keyword } = keywordRow;
 82    logger.info(`Processing keyword: "${keyword}"`);
 83  
 84    try {
 85      // Scrape sites from ZenRows
 86      const sites = await scrapeSERP(keyword, 10);
 87  
 88      // Update keyword zenrows_count
 89      await upsertKeyword(keyword, {
 90        zenrows_count: sites.length,
 91        last_scraped_at: new Date().toISOString(),
 92      });
 93  
 94      logger.info(`Found ${sites.length} sites for "${keyword}"`);
 95  
 96      // Process each site
 97      let processed = 0;
 98      for (const site of sites) {
 99        if (processed >= remainingLimit) break;
100  
101        const remainingSites = remainingLimit - processed;
102        logger.info(`Processing ${site.domain} (${remainingSites} remaining)`);
103  
104        try {
105          await processSite(site, keyword);
106          processed++;
107          await incrementProcessedCount(keyword);
108        } catch (error) {
109          logger.error(`Failed to process ${site.domain}`, error);
110          // Continue to next site on error
111        }
112      }
113  
114      return processed;
115    } catch (error) {
116      logger.error(`Failed to scrape keyword "${keyword}"`, error);
117      return 0;
118    }
119  }
120  
121  /**
122   * Populate keywords table with all combinations
123   */
124  async function populateKeywords() {
125    logger.info('Populating keywords table...');
126  
127    const combinations = generateKeywordCombinations();
128  
129    let newKeywords = 0;
130    let existingKeywords = 0;
131  
132    for (const combo of combinations) {
133      const existing = await getOne(
134        'SELECT id FROM keywords WHERE keyword = $1 AND country_code = $2',
135        [combo.keyword, combo.countryCode]
136      );
137  
138      if (!existing) {
139        await upsertKeyword(combo.keyword, {
140          country_code: combo.countryCode,
141          google_domain: combo.googleDomain,
142          search_volume: combo.searchVolume,
143          priority: combo.priority,
144        });
145        newKeywords++;
146      } else {
147        existingKeywords++;
148      }
149    }
150  
151    logger.success(
152      `Keywords populated: ${newKeywords} new, ${existingKeywords} existing (${combinations.length} total)`
153    );
154  }
155  
156  /**
157   * Get keywords to scrape, ordered by priority
158   * @param {number} limit - Number of keywords to return
159   * @returns {Array} Keywords to scrape
160   */
161  async function getKeywordsToScrape(limit) {
162    // Priority: never scraped first, then oldest scraped, then highest zenrows_count
163    return await getAll(
164      `SELECT * FROM keywords
165       ORDER BY
166         CASE WHEN last_scraped_at IS NULL THEN 0 ELSE 1 END,
167         last_scraped_at ASC,
168         zenrows_count DESC
169       LIMIT $1`,
170      [limit]
171    );
172  }
173  
174  /**
175   * Process a single site: capture, store, and score
176   * @param {Object} site - Site object from scraper
177   * @param {string} keyword - Search keyword
178   */
179  async function processSite(site, keyword) {
180    const { url, domain } = site;
181  
182    // Check if site already exists
183    const existing = await getOne('SELECT id FROM sites WHERE domain = $1', [domain]);
184  
185    if (existing) {
186      logger.info(`Site ${domain} already exists, skipping`);
187      return;
188    }
189  
190    // Step 1: Capture website (screenshots + HTML)
191    logger.info(`Capturing ${domain}...`);
192    const captureData = await captureWebsite(url);
193  
194    // Check for HTTP errors
195    if (captureData.httpStatusCode && captureData.httpStatusCode >= 400) {
196      const errorMsg = `HTTP ${captureData.httpStatusCode} - Site returned error status`;
197      logger.warn(errorMsg);
198  
199      // Store site with error status
200      await run(
201        `INSERT INTO sites (
202          domain, landing_page_url, keyword, http_status_code,
203          processing_status, error_log, created_at
204        ) VALUES ($1, $2, $3, $4, 'failed', $5, CURRENT_TIMESTAMP)`,
205        [domain, url, keyword, captureData.httpStatusCode, errorMsg]
206      );
207  
208      return;
209    }
210  
211    // Step 2: Score website
212    logger.info(`Scoring ${domain}...`);
213    const scoreData = await scoreWebsite({
214      url,
215      domain,
216      screenshots: captureData.screenshots,
217      screenshotsUncropped: captureData.screenshotsUncropped,
218      html: captureData.html,
219    });
220  
221    const grade = extractGrade(scoreData);
222    const conversionScore = scoreData?.overall_calculation?.conversion_score || null;
223  
224    logger.success(`${domain} scored: ${grade || 'N/A'} (${conversionScore || 'N/A'})`);
225  
226    // Step 3: Insert site record first to get site_id
227    const result = await run(
228      `INSERT INTO sites (
229        domain, landing_page_url, keyword,
230        html_dom, http_status_code,
231        score, grade,
232        status, scored_at, created_at
233      ) VALUES ($1, $2, $3, $4, $5, $6, $7, 'prog_scored', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
234      RETURNING id`,
235      [
236        domain,
237        url,
238        keyword,
239        captureData.html ? 'fs' : null,
240        captureData.httpStatusCode,
241        conversionScore,
242        grade,
243      ]
244    );
245  
246    const siteId = result.lastInsertRowid;
247  
248    // Write score_json to filesystem (sentinel already set in DB above)
249    setScoreJson(siteId, JSON.stringify(scoreData));
250  
251    // Write HTML to filesystem
252    if (captureData.html) {
253      writeHtmlDom(siteId, captureData.html);
254    }
255  
256    // Step 4: Save screenshots to disk and update screenshot_path
257    const screenshotsForStorage = {
258      desktop_above: captureData.screenshots.desktop_above,
259      desktop_below: captureData.screenshots.desktop_below,
260      mobile_above: captureData.screenshots.mobile_above,
261      desktop_above_uncropped: captureData.screenshotsUncropped.desktop_above,
262      desktop_below_uncropped: captureData.screenshotsUncropped.desktop_below,
263      mobile_above_uncropped: captureData.screenshotsUncropped.mobile_above,
264    };
265  
266    const screenshotPath = await saveScreenshots(siteId, screenshotsForStorage);
267  
268    // Update screenshot_path
269    await run('UPDATE sites SET screenshot_path = $1 WHERE id = $2', [screenshotPath, siteId]);
270  
271    // Step 4: Track low-scoring sites
272    const lowGrades = ['B-', 'C+', 'C', 'C-', 'D+', 'D', 'D-', 'F'];
273    if (grade && lowGrades.includes(grade)) {
274      await incrementLowScoringCount(keyword);
275  
276      // Check if it's still low after resubmit (resubmit happens in scoreWebsite)
277      // If scoreData came from resubmit, it would have been updated
278      if (scoreData.resubmitted && lowGrades.includes(grade)) {
279        await incrementReworkLowScoringCount(keyword);
280      }
281    }
282  
283    logger.success(`${domain} stored successfully`);
284  }
285  
286  /**
287   * Process a single URL directly (for testing or manual processing)
288   * @param {string} url - URL to process
289   */
290  async function processSingleUrl(url) {
291    // Extract domain from URL
292    const domain = new URL(url).hostname.replace(/^www\./, '');
293  
294    // Create site object
295    const site = { url, domain };
296  
297    // Use generic keyword for single URL processing
298    const keyword = 'manual';
299  
300    await processSite(site, keyword);
301  }
302  
303  // Parse command line arguments and run only if executed directly
304  // Usage: node src/process.js [limit] [--url=<url>]
305  if (import.meta.url === `file://${process.argv[1]}`) {
306    const args = process.argv.slice(2);
307    let limit = 1000;
308    let url = null;
309  
310    for (const arg of args) {
311      if (arg.startsWith('--url=')) {
312        url = arg.substring(6);
313      } else if (!isNaN(parseInt(arg, 10))) {
314        limit = parseInt(arg, 10);
315      }
316    }
317  
318    main(limit, url).catch(error => {
319      logger.error('Fatal error', error);
320      process.exit(1);
321    });
322  }
323  
324  // Export for testing
325  export { main, processSingleUrl, processSite };