/ src / cron / poll-purchases.js
poll-purchases.js
  1  #!/usr/bin/env node
  2  
  3  /**
  4   * Poll Purchases from Cloudflare Worker
  5   *
  6   * Fetches new purchases from the auditandfix-api CF Worker KV queue,
  7   * inserts them into the local purchases table, sends confirmation emails,
  8   * and removes processed purchases from the queue.
  9   *
 10   * Follows the same pattern as sync-email-events.js and poll-paypal-events.js.
 11   */
 12  
 13  import axios from 'axios';
 14  import { getOne, run } from './../utils/db.js';
 15  import Logger from '../utils/logger.js';
 16  import { sendConfirmationEmail } from '../reports/purchase-confirmation.js';
 17  import '../utils/load-env.js';
 18  
 19  const logger = new Logger('PollPurchases');
 20  
 21  async function sendConfirmation(purchase) {
 22    try {
 23      await sendConfirmationEmail({
 24        email: purchase.email,
 25        orderId: purchase.paypal_order_id,
 26        amount: purchase.amount,
 27        currency: purchase.currency,
 28        url: purchase.landing_page_url,
 29        product: purchase.product || 'full_audit',
 30      });
 31    } catch (emailError) {
 32      logger.warn(`Confirmation email failed for ${purchase.email}: ${emailError.message}`);
 33    }
 34  }
 35  
 36  async function ingestPurchase(purchase, workerUrl, workerSecret) {
 37    const existing = await getOne(
 38      'SELECT id FROM purchases WHERE paypal_order_id = $1',
 39      [purchase.paypal_order_id]
 40    );
 41  
 42    if (existing) {
 43      logger.info(`Purchase ${purchase.paypal_order_id} already exists, skipping insert`);
 44    } else {
 45      await run(
 46        `INSERT INTO purchases
 47           (email, landing_page_url, phone, paypal_order_id, paypal_payer_id,
 48            paypal_capture_id, amount, currency, amount_usd, country_code,
 49            ip_address, user_agent, conversation_id, lang, product, status)
 50         VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, 'paid')
 51         ON CONFLICT DO NOTHING`,
 52        [
 53          purchase.email,
 54          purchase.landing_page_url,
 55          purchase.phone || null,
 56          purchase.paypal_order_id,
 57          purchase.paypal_payer_id || null,
 58          purchase.paypal_capture_id || null,
 59          purchase.amount,
 60          purchase.currency,
 61          purchase.amount_usd,
 62          purchase.country_code || null,
 63          purchase.ip_address || null,
 64          purchase.user_agent || null,
 65          purchase.conversation_id || null,
 66          purchase.lang || 'en',
 67          purchase.product || 'full_audit',
 68        ]
 69      );
 70      logger.success(`Inserted purchase ${purchase.paypal_order_id} for ${purchase.email}`);
 71      await sendConfirmation(purchase);
 72    }
 73  
 74    // Remove from CF Worker queue
 75    await axios.delete(`${workerUrl}/purchases/${purchase.id}`, {
 76      headers: { 'X-Auth-Secret': workerSecret },
 77      timeout: 10000,
 78    });
 79  }
 80  
 81  /**
 82   * Poll the CF Worker for new purchases and ingest them
 83   * @returns {Object} { processed, successful, failed }
 84   */
 85  export async function pollPurchases() {
 86    const workerUrl = process.env.API_WORKER_URL;
 87    const workerSecret = process.env.API_WORKER_SECRET;
 88  
 89    if (!workerUrl || !workerSecret) {
 90      logger.warn('API_WORKER_URL or API_WORKER_SECRET not configured, skipping');
 91      return { processed: 0, successful: 0, failed: 0 };
 92    }
 93  
 94    logger.info('Polling for new purchases...');
 95  
 96    let purchases;
 97    try {
 98      const response = await axios.get(`${workerUrl}/purchases`, {
 99        headers: { 'X-Auth-Secret': workerSecret },
100        timeout: 10000,
101      });
102      purchases = response.data.purchases || [];
103    } catch (error) {
104      logger.error('Failed to fetch purchases from CF Worker', error);
105      return { processed: 0, successful: 0, failed: 0 };
106    }
107  
108    if (purchases.length === 0) {
109      logger.info('No new purchases');
110      return { processed: 0, successful: 0, failed: 0 };
111    }
112  
113    logger.info(`Found ${purchases.length} new purchase(s)`);
114  
115    let successful = 0;
116    let failed = 0;
117  
118    for (const purchase of purchases) {
119      try {
120        await ingestPurchase(purchase, workerUrl, workerSecret);
121        successful++;
122      } catch (error) {
123        logger.error(`Failed to process purchase ${purchase.paypal_order_id}`, error);
124        failed++;
125      }
126    }
127  
128    logger.success(`Poll complete: ${successful} successful, ${failed} failed`);
129    return { processed: purchases.length, successful, failed };
130  }
131  
132  // CLI
133  if (import.meta.url === `file://${process.argv[1]}`) {
134    pollPurchases()
135      .then(result => {
136        console.log('Poll result:', result);
137        process.exit(0);
138      })
139      .catch(error => {
140        logger.error('Poll failed:', error);
141        process.exit(1);
142      });
143  }