poll-purchases.js
1 #!/usr/bin/env node 2 3 /** 4 * Poll Purchases from Cloudflare Worker 5 * 6 * Fetches new purchases from the auditandfix-api CF Worker KV queue, 7 * inserts them into the local purchases table, sends confirmation emails, 8 * and removes processed purchases from the queue. 9 * 10 * Follows the same pattern as sync-email-events.js and poll-paypal-events.js. 11 */ 12 13 import axios from 'axios'; 14 import { getOne, run } from './../utils/db.js'; 15 import Logger from '../utils/logger.js'; 16 import { sendConfirmationEmail } from '../reports/purchase-confirmation.js'; 17 import '../utils/load-env.js'; 18 19 const logger = new Logger('PollPurchases'); 20 21 async function sendConfirmation(purchase) { 22 try { 23 await sendConfirmationEmail({ 24 email: purchase.email, 25 orderId: purchase.paypal_order_id, 26 amount: purchase.amount, 27 currency: purchase.currency, 28 url: purchase.landing_page_url, 29 product: purchase.product || 'full_audit', 30 }); 31 } catch (emailError) { 32 logger.warn(`Confirmation email failed for ${purchase.email}: ${emailError.message}`); 33 } 34 } 35 36 async function ingestPurchase(purchase, workerUrl, workerSecret) { 37 const existing = await getOne( 38 'SELECT id FROM purchases WHERE paypal_order_id = $1', 39 [purchase.paypal_order_id] 40 ); 41 42 if (existing) { 43 logger.info(`Purchase ${purchase.paypal_order_id} already exists, skipping insert`); 44 } else { 45 await run( 46 `INSERT INTO purchases 47 (email, landing_page_url, phone, paypal_order_id, paypal_payer_id, 48 paypal_capture_id, amount, currency, amount_usd, country_code, 49 ip_address, user_agent, conversation_id, lang, product, status) 50 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, 'paid') 51 ON CONFLICT DO NOTHING`, 52 [ 53 purchase.email, 54 purchase.landing_page_url, 55 purchase.phone || null, 56 purchase.paypal_order_id, 57 purchase.paypal_payer_id || null, 58 purchase.paypal_capture_id || null, 59 purchase.amount, 60 purchase.currency, 61 purchase.amount_usd, 62 purchase.country_code || null, 63 purchase.ip_address || null, 64 purchase.user_agent || null, 65 purchase.conversation_id || null, 66 purchase.lang || 'en', 67 purchase.product || 'full_audit', 68 ] 69 ); 70 logger.success(`Inserted purchase ${purchase.paypal_order_id} for ${purchase.email}`); 71 await sendConfirmation(purchase); 72 } 73 74 // Remove from CF Worker queue 75 await axios.delete(`${workerUrl}/purchases/${purchase.id}`, { 76 headers: { 'X-Auth-Secret': workerSecret }, 77 timeout: 10000, 78 }); 79 } 80 81 /** 82 * Poll the CF Worker for new purchases and ingest them 83 * @returns {Object} { processed, successful, failed } 84 */ 85 export async function pollPurchases() { 86 const workerUrl = process.env.AUDITANDFIX_WORKER_URL; 87 const workerSecret = process.env.AUDITANDFIX_WORKER_SECRET; 88 89 if (!workerUrl || !workerSecret) { 90 logger.warn('AUDITANDFIX_WORKER_URL or AUDITANDFIX_WORKER_SECRET not configured, skipping'); 91 return { processed: 0, successful: 0, failed: 0 }; 92 } 93 94 logger.info('Polling for new purchases...'); 95 96 let purchases; 97 try { 98 const response = await axios.get(`${workerUrl}/purchases`, { 99 headers: { 'X-Auth-Secret': workerSecret }, 100 timeout: 10000, 101 }); 102 purchases = response.data.purchases || []; 103 } catch (error) { 104 logger.error('Failed to fetch purchases from CF Worker', error); 105 return { processed: 0, successful: 0, failed: 0 }; 106 } 107 108 if (purchases.length === 0) { 109 logger.info('No new purchases'); 110 return { processed: 0, successful: 0, failed: 0 }; 111 } 112 113 logger.info(`Found ${purchases.length} new purchase(s)`); 114 115 let successful = 0; 116 let failed = 0; 117 118 for (const purchase of purchases) { 119 try { 120 await ingestPurchase(purchase, workerUrl, workerSecret); 121 successful++; 122 } catch (error) { 123 logger.error(`Failed to process purchase ${purchase.paypal_order_id}`, error); 124 failed++; 125 } 126 } 127 128 logger.success(`Poll complete: ${successful} successful, ${failed} failed`); 129 return { processed: purchases.length, successful, failed }; 130 } 131 132 // CLI 133 if (import.meta.url === `file://${process.argv[1]}`) { 134 pollPurchases() 135 .then(result => { 136 console.log('Poll result:', result); 137 process.exit(0); 138 }) 139 .catch(error => { 140 logger.error('Poll failed:', error); 141 process.exit(1); 142 }); 143 }