/ src / cron / process-purchases.js
process-purchases.js
  1  #!/usr/bin/env node
  2  
  3  /**
  4   * Process Pending Purchases
  5   *
  6   * Runs twice daily to generate and deliver audit reports for paid purchases.
  7   * Picks up purchases with status='paid' that are at least 6 hours old
  8   * (to batch efficiently rather than processing immediately).
  9   *
 10   * On failure: increments retry_count, sets error_message.
 11   * After 3 failures: status → 'failed', adds to human_review_queue.
 12   */
 13  
 14  import { getAll, run } from './../utils/db.js';
 15  import { join, dirname } from 'path';
 16  import { fileURLToPath } from 'url';
 17  import { generateAuditReportForPurchase } from '../reports/report-orchestrator.js';
 18  import { deliverReport } from '../reports/report-delivery.js';
 19  import Logger from '../utils/logger.js';
 20  import { extractDomain } from '../utils/error-handler.js';
 21  import '../utils/load-env.js';
 22  
 23  const __filename = fileURLToPath(import.meta.url);
 24  const __dirname = dirname(__filename);
 25  
 26  const logger = new Logger('ProcessPurchases');
 27  
 28  const MAX_RETRIES = 3;
 29  
 30  async function handlePurchaseFailure(purchase, error) {
 31    const retryCount = (purchase.retry_count || 0) + 1;
 32  
 33    if (retryCount >= MAX_RETRIES) {
 34      await run(
 35        `UPDATE purchases
 36         SET status = 'failed', error_message = $1, retry_count = $2, updated_at = CURRENT_TIMESTAMP
 37         WHERE id = $3`,
 38        [error.message, retryCount, purchase.id]
 39      );
 40  
 41      try {
 42        await run(
 43          `INSERT INTO human_review_queue (file, type, priority, reason, created_at)
 44           VALUES ($1, $2, $3, $4, CURRENT_TIMESTAMP)`,
 45          [
 46            `purchase_${purchase.id}`,
 47            'purchase_failure',
 48            'high',
 49            `Report generation failed for ${purchase.email}: ${error.message}`,
 50          ]
 51        );
 52      } catch {
 53        logger.warn(`Could not add purchase #${purchase.id} to human review queue`);
 54      }
 55    } else {
 56      await run(
 57        `UPDATE purchases
 58         SET status = 'paid', error_message = $1, retry_count = $2, updated_at = CURRENT_TIMESTAMP
 59         WHERE id = $3`,
 60        [error.message, retryCount, purchase.id]
 61      );
 62    }
 63  }
 64  
 65  /**
 66   * Process all pending purchases
 67   * @returns {Object} { processed, delivered, failed }
 68   */
 69  export async function processPendingPurchases() {
 70    let processed = 0;
 71    let delivered = 0;
 72    let failed = 0;
 73  
 74    // Get paid purchases (no delay — process immediately as backup for webhook handler)
 75    const purchases = await getAll(
 76      `SELECT * FROM purchases
 77       WHERE status = 'paid'
 78       ORDER BY created_at ASC`
 79    );
 80  
 81    if (purchases.length === 0) {
 82      logger.info('No pending purchases to process');
 83      return { processed: 0, delivered: 0, failed: 0 };
 84    }
 85  
 86    logger.info(`Processing ${purchases.length} pending purchase(s)`);
 87  
 88    for (const purchase of purchases) {
 89      processed++;
 90      logger.info(`Processing purchase #${purchase.id} (${purchase.email})`);
 91  
 92      try {
 93        // Set status to processing
 94        await run(
 95          'UPDATE purchases SET status = $1, updated_at = CURRENT_TIMESTAMP WHERE id = $2',
 96          ['processing', purchase.id]
 97        );
 98  
 99        // Generate report (variant based on product type)
100        const variant = purchase.product === 'quick_fixes' ? 'quick-fixes' : 'full';
101        await generateAuditReportForPurchase(purchase.id, { variant });
102  
103        // Deliver report via email
104        await deliverReport(purchase.id);
105  
106        // Mark as delivered
107        await run(
108          `UPDATE purchases
109           SET status = 'delivered', delivered_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP
110           WHERE id = $1`,
111          [purchase.id]
112        );
113  
114        // Audit+Fix: queue for manual implementation after report delivery
115        if (purchase.product === 'audit_fix') {
116          try {
117            await run(
118              `INSERT INTO human_review_queue (file, type, priority, reason, created_at)
119               VALUES ($1, $2, $3, $4, CURRENT_TIMESTAMP)`,
120              [
121                `purchase_${purchase.id}`,
122                'audit_fix_implementation',
123                'high',
124                `Audit+Fix purchase from ${purchase.email} — implement top 3 fixes for ${extractDomain(purchase.landing_page_url)}`,
125              ]
126            );
127            logger.info(`Purchase #${purchase.id} queued for manual implementation`);
128          } catch {
129            logger.warn(`Could not queue purchase #${purchase.id} for implementation`);
130          }
131        }
132  
133        delivered++;
134        logger.success(`Purchase #${purchase.id} delivered to ${purchase.email}`);
135      } catch (error) {
136        const retryCount = (purchase.retry_count || 0) + 1;
137        logger.error(
138          `Purchase #${purchase.id} failed (attempt ${retryCount}/${MAX_RETRIES})`,
139          error
140        );
141        await handlePurchaseFailure(purchase, error);
142        failed++;
143      }
144    }
145  
146    logger.success(
147      `Processing complete: ${delivered} delivered, ${failed} failed out of ${processed}`
148    );
149    return { processed, delivered, failed };
150  }
151  
152  // CLI
153  if (import.meta.url === `file://${process.argv[1]}`) {
154    processPendingPurchases()
155      .then(result => {
156        console.log('Result:', result);
157        process.exit(result.failed > 0 ? 1 : 0);
158      })
159      .catch(error => {
160        logger.error('Processing failed:', error);
161        process.exit(1);
162      });
163  }