reply-processor.js
1 #!/usr/bin/env node 2 3 /** 4 * Reply Processor 5 * Automated cron job to process inbound replies and handle payment flow 6 * 7 * Flow: 8 * 1. Poll inbound channels (SMS, Email) 9 * 2. Classify new replies (interested, not_interested, question, unsubscribe) 10 * 3. Take action based on classification: 11 * - interested → Send payment link 12 * - not_interested → Close conversation 13 * - question → Flag for human review 14 * - unsubscribe → Add to opt-out list 15 * 4. Check for paid conversations → Generate reports 16 * 5. Deliver reports via email 17 */ 18 19 import { run, getAll } from '../utils/db.js'; 20 import { join, dirname } from 'path'; 21 import { fileURLToPath } from 'url'; 22 23 import Logger from '../utils/logger.js'; 24 import { classifyReply } from '../utils/reply-classifier.js'; 25 import { createPaymentOrder, generatePaymentMessage } from '../payment/paypal.js'; 26 import { generateReport } from '../reports/cro-report-generator.js'; 27 import { pollAllChannels } from '../inbound/processor.js'; 28 import { pollPayPalEvents } from '../payment/poll-paypal-events.js'; 29 import { processStopKeyword } from '../utils/compliance.js'; 30 import '../utils/load-env.js'; 31 32 // Base62 encoder — produces opaque, non-enumerable URL slugs for /a/ and /v/ 33 const BASE62 = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'; 34 function base62Encode(n) { 35 if (n === 0) return '0'; 36 let result = ''; 37 while (n > 0) { 38 result = BASE62[n % 62] + result; 39 n = Math.floor(n / 62); 40 } 41 return result; 42 } 43 44 const __filename = fileURLToPath(import.meta.url); 45 const __dirname = dirname(__filename); 46 47 const logger = new Logger('ReplyProcessor'); 48 49 /** 50 * Main reply processing function 51 */ 52 export async function processReplies() { 53 logger.info('Starting reply processing...'); 54 55 const stats = { 56 polled: 0, 57 classified: 0, 58 paymentsSent: 0, 59 paymentsProcessed: 0, 60 reportsGenerated: 0, 61 reportsDelivered: 0, 62 errors: 0, 63 }; 64 65 try { 66 // Step 1: Poll all inbound channels 67 logger.info('Step 1: Polling inbound channels...'); 68 const pollResults = await pollAllChannels(); 69 stats.polled = pollResults.sms.stored + pollResults.email.stored; 70 logger.info(`Polled ${stats.polled} new messages`); 71 72 // Step 2: Classify new replies 73 logger.info('Step 2: Classifying new replies...'); 74 const classifiedCount = await classifyNewReplies(); 75 stats.classified = classifiedCount; 76 77 // Step 3: Send payment links to interested prospects 78 logger.info('Step 3: Sending payment links...'); 79 const paymentCount = await sendPaymentLinks(); 80 stats.paymentsSent = paymentCount; 81 82 // Step 3.5: Poll and process PayPal payment events 83 logger.info('Step 3.5: Polling PayPal payment events...'); 84 try { 85 const paypalResults = await pollPayPalEvents(); 86 stats.paymentsProcessed = paypalResults.successful; 87 logger.info(`Processed ${paypalResults.successful} payments`); 88 } catch (error) { 89 logger.error('PayPal polling failed (non-fatal)', error); 90 // Don't throw - continue with report generation 91 } 92 93 // Step 4: Generate reports for paid conversations 94 logger.info('Step 4: Generating reports for paid conversations...'); 95 const reportCount = await generatePaidReports(); 96 stats.reportsGenerated = reportCount; 97 98 // Step 5: Deliver reports via email 99 logger.info('Step 5: Delivering reports...'); 100 const deliveredCount = await deliverReports(); 101 stats.reportsDelivered = deliveredCount; 102 103 logger.success('Reply processing complete'); 104 logger.info(`Stats: ${JSON.stringify(stats)}`); 105 106 return stats; 107 } catch (error) { 108 logger.error('Reply processing failed', error); 109 stats.errors++; 110 throw error; 111 } 112 } 113 114 /** 115 * Classify all new inbound replies 116 */ 117 async function classifyNewReplies() { 118 // Get unclassified inbound messages 119 const inboundMessages = await getAll( 120 `SELECT m.id, m.message_body, m.contact_method, m.contact_uri, 121 m.site_id, s.domain, s.landing_page_url 122 FROM messages m 123 JOIN sites s ON m.site_id = s.id 124 WHERE m.direction = 'inbound' 125 AND m.intent IS NULL 126 AND m.processed_at IS NULL 127 ORDER BY m.created_at DESC 128 LIMIT 50` 129 ); 130 131 if (inboundMessages.length === 0) { 132 logger.info('No messages to classify'); 133 return 0; 134 } 135 136 logger.info(`Classifying ${inboundMessages.length} messages...`); 137 138 let classified = 0; 139 140 for (const msg of inboundMessages) { 141 try { 142 // Build context for better classification 143 const context = `Domain: ${msg.domain}\nURL: ${msg.landing_page_url}`; 144 145 // Classify the reply 146 const result = await classifyReply(msg.message_body, msg.contact_method, context); 147 148 // Map classification to site conversation_status 149 const newStatus = mapClassificationToStatus(result.classification); 150 151 // Update message with classification 152 await run( 153 `UPDATE messages 154 SET sentiment = $1, 155 intent = $2, 156 updated_at = NOW() 157 WHERE id = $3`, 158 [ 159 result.classification, 160 result.classification === 'unsubscribe' 161 ? 'opt-out' 162 : result.classification === 'not_interested' 163 ? 'not-interested' 164 : result.classification, 165 msg.id, 166 ] 167 ); 168 169 // Update site conversation_status 170 await run( 171 `UPDATE sites 172 SET conversation_status = $1, 173 updated_at = NOW() 174 WHERE id = $2`, 175 [newStatus, msg.site_id] 176 ); 177 178 // Handle unsubscribe requests 179 if (result.classification === 'unsubscribe') { 180 await processStopKeyword(msg.message_body, msg.contact_uri); 181 logger.info(`Processed opt-out for ${msg.contact_uri}`); 182 } 183 184 logger.success( 185 `Classified message ${msg.id} as "${result.classification}" (confidence: ${result.confidence})` 186 ); 187 classified++; 188 } catch (error) { 189 logger.error(`Failed to classify message ${msg.id}`, error); 190 // Mark site as active for human review on error 191 await run( 192 `UPDATE sites SET conversation_status = 'active', updated_at = NOW() WHERE id = $1 AND conversation_status IS NULL`, 193 [msg.site_id] 194 ); 195 } 196 } 197 198 return classified; 199 } 200 201 /** 202 * Map classification to conversation status 203 */ 204 function mapClassificationToStatus(classification) { 205 const mapping = { 206 interested: 'qualified', 207 not_interested: 'not_interested', 208 question: 'active', 209 unsubscribe: 'unsubscribed', 210 }; 211 // eslint-disable-next-line security/detect-object-injection -- Safe: classification is from controlled enum 212 return mapping[classification] || 'active'; 213 } 214 215 /** 216 * Send payment links to qualified prospects 217 */ 218 async function sendPaymentLinks() { 219 // Get qualified sites with inbound messages but no payment link sent yet. 220 // Safety guard: only send payment links when the inbound message intent is explicitly 221 // "pricing" or "interested" — never to "question" (confused/Stage 2) messages. 222 // This prevents Stage 4 payment links from being sent to Stage 2 confused prospects. 223 const conversations = await getAll( 224 `SELECT m.id, m.site_id, m.contact_method as channel, 225 m.contact_uri, m.contact_method, 226 s.domain, s.landing_page_url, s.country_code, s.score, s.grade 227 FROM messages m 228 JOIN sites s ON m.site_id = s.id 229 WHERE m.direction = 'inbound' 230 AND s.conversation_status = 'qualified' 231 AND m.payment_link IS NULL 232 AND m.intent IN ('pricing', 'interested') 233 ORDER BY m.created_at DESC 234 LIMIT 20` 235 ); 236 237 if (conversations.length === 0) { 238 logger.info('No payment links to send'); 239 return 0; 240 } 241 242 logger.info(`Sending payment links to ${conversations.length} prospects...`); 243 244 let sent = 0; 245 246 for (const conv of conversations) { 247 try { 248 // Extract email from contact_uri (email or phone) 249 const email = 250 conv.contact_method === 'email' 251 ? conv.contact_uri 252 : process.env.DEFAULT_PAYER_EMAIL || 'customer@example.com'; 253 254 const countryCode = conv.country_code || 'US'; 255 256 // Create PayPal payment order with local currency 257 const payment = await createPaymentOrder({ 258 domain: conv.domain, 259 email, 260 siteId: conv.site_id, 261 conversationId: conv.id, 262 countryCode, 263 }); 264 265 // Build short URL: {BRAND_DOMAIN}/o/{site_id} 266 // Uses the existing prefill short URL system (o.php reads data/orders/{site_id}.json 267 // and redirects to /?domain=...&country=...&email=...&ref=sms#order). 268 // The /o/ path matches the documented system in MEMORY.md and autoresponder.js. 269 const siteWebUrl = process.env.BRAND_URL; 270 const shortUrl = `${siteWebUrl}/o/${conv.site_id}`; 271 try { 272 const prefillRes = await fetch(`${siteWebUrl}/api.php?action=store-prefill`, { 273 method: 'POST', 274 headers: { 275 'Content-Type': 'application/json', 276 'X-Auth-Secret': process.env.API_WORKER_SECRET || '', 277 }, 278 body: JSON.stringify({ 279 site_id: conv.site_id, 280 domain: conv.domain || conv.landing_page_url, 281 country: countryCode, 282 email: conv.contact_method === 'email' ? conv.contact_uri : null, 283 cid: conv.id, 284 score: conv.score !== null && conv.score !== undefined ? conv.score : undefined, 285 grade: conv.grade || undefined, 286 }), 287 }); 288 if (!prefillRes.ok) { 289 logger.warn(`store-prefill HTTP ${prefillRes.status} for site ${conv.site_id}`); 290 } 291 } catch (prefillErr) { 292 logger.warn(`Could not store prefill for site ${conv.site_id}: ${prefillErr.message}`); 293 } 294 295 // Update message with payment link and currency tracking 296 await run( 297 `UPDATE messages 298 SET payment_link = $1, 299 payment_id = $2, 300 payment_currency = $3, 301 payment_amount_local = $4, 302 payment_amount_usd = $5, 303 exchange_rate = $6, 304 updated_at = NOW() 305 WHERE id = $7`, 306 [ 307 shortUrl, 308 payment.orderId, 309 payment.currency, 310 Math.round(payment.amount * 100), // Local currency in cents 311 Math.round(payment.amountUsd * 100), // USD equivalent in cents 312 payment.exchangeRate, 313 conv.id, 314 ] 315 ); 316 317 // Update site conversation_status to payment_requested 318 await run( 319 `UPDATE sites SET conversation_status = 'payment_requested', updated_at = NOW() WHERE id = $1`, 320 [conv.site_id] 321 ); 322 323 // Generate and send payment message using short URL 324 const message = await generatePaymentMessage(shortUrl, conv.channel, conv.domain, countryCode); 325 326 // Send via appropriate channel 327 await sendPaymentMessage(conv, message); 328 329 logger.success(`Sent payment link to conversation ${conv.id} via ${conv.channel}`); 330 sent++; 331 } catch (error) { 332 logger.error(`Failed to send payment link for conversation ${conv.id}`, error); 333 } 334 } 335 336 return sent; 337 } 338 339 /** 340 * Send payment message via appropriate channel 341 */ 342 async function sendPaymentMessage(conversation, message) { 343 if (conversation.channel === 'sms') { 344 const { sendSMS } = await import('../outreach/sms.js'); 345 await sendSMS(conversation.site_id, message, conversation.contact_uri); 346 } else if (conversation.channel === 'email') { 347 const { sendEmail } = await import('../outreach/email.js'); 348 await sendEmail( 349 conversation.site_id, 350 message, 351 'Your CRO Report - Complete Payment', 352 conversation.contact_uri 353 ); 354 } else { 355 throw new Error(`Unsupported channel for payment message: ${conversation.channel}`); 356 } 357 358 // Log outbound message in messages table 359 await run( 360 `INSERT INTO messages ( 361 site_id, direction, contact_method, contact_uri, message_body, 362 delivery_status, sent_at 363 ) VALUES ($1, 'outbound', $2, $3, $4, 'sent', NOW())`, 364 [conversation.site_id, conversation.channel, conversation.contact_uri, message] 365 ); 366 } 367 368 /** 369 * Generate reports for paid conversations 370 */ 371 async function generatePaidReports() { 372 // Get paid sites without reports (find the latest inbound message for each) 373 const conversations = await getAll( 374 `SELECT m.id, m.site_id, s.domain 375 FROM messages m 376 JOIN sites s ON m.site_id = s.id 377 WHERE m.direction = 'inbound' 378 AND s.conversation_status = 'paid' 379 AND m.report_url IS NULL 380 ORDER BY m.created_at DESC 381 LIMIT 10` 382 ); 383 384 if (conversations.length === 0) { 385 logger.info('No reports to generate'); 386 return 0; 387 } 388 389 logger.info(`Generating reports for ${conversations.length} paid conversations...`); 390 391 let generated = 0; 392 393 for (const conv of conversations) { 394 try { 395 await generateReport(conv.site_id, conv.id); 396 logger.success(`Generated report for conversation ${conv.id} (${conv.domain})`); 397 generated++; 398 } catch (error) { 399 logger.error(`Failed to generate report for conversation ${conv.id}`, error); 400 } 401 } 402 403 return generated; 404 } 405 406 /** 407 * Deliver reports via email 408 */ 409 async function deliverReports() { 410 // Get sites with reports ready to deliver (find the message with report_url) 411 const conversations = await getAll( 412 `SELECT m.id, m.site_id, m.report_url, m.contact_uri, 413 s.domain 414 FROM messages m 415 JOIN sites s ON m.site_id = s.id 416 WHERE s.conversation_status = 'report_delivered' 417 AND m.report_url IS NOT NULL 418 AND m.direction = 'inbound' 419 ORDER BY m.created_at DESC 420 LIMIT 10` 421 ); 422 423 if (conversations.length === 0) { 424 logger.info('No reports to deliver'); 425 return 0; 426 } 427 428 logger.info(`Delivering ${conversations.length} reports...`); 429 430 let delivered = 0; 431 432 for (const conv of conversations) { 433 try { 434 const { sendEmail } = await import('../outreach/email.js'); 435 436 const emailBody = `Thank you for your purchase! 437 438 Attached is your Conversion Rate Optimization (CRO) Analysis Report for ${conv.domain}. 439 440 This report includes: 441 • Detailed scoring across 5 key categories 442 • Critical conversion bottlenecks 443 • Prioritized recommendations 444 • Quick wins for immediate improvements 445 446 Questions about the report? Just reply to this email. 447 448 Best regards, 449 The CRO Team`; 450 451 // Send email with report attachment 452 await sendEmail( 453 conv.site_id, 454 emailBody, 455 `Your CRO Report for ${conv.domain}`, 456 conv.contact_uri 457 ); 458 459 // Log the outbound delivery message 460 await run( 461 `INSERT INTO messages ( 462 site_id, direction, contact_method, contact_uri, message_body, 463 subject_line, delivery_status, sent_at 464 ) VALUES ($1, 'outbound', 'email', $2, $3, $4, 'sent', NOW())`, 465 [conv.site_id, conv.contact_uri, emailBody, `Your CRO Report for ${conv.domain}`] 466 ); 467 468 logger.success(`Delivered report to ${conv.contact_uri}`); 469 delivered++; 470 } catch (error) { 471 logger.error(`Failed to deliver report for conversation ${conv.id}`, error); 472 } 473 } 474 475 return delivered; 476 } 477 478 // CLI functionality 479 if (import.meta.url === `file://${process.argv[1]}`) { 480 processReplies() 481 .then(stats => { 482 console.log('\n✅ Reply Processing Complete\n'); 483 console.log(`Polled: ${stats.polled} messages`); 484 console.log(`Classified: ${stats.classified} conversations`); 485 console.log(`Payment Links Sent: ${stats.paymentsSent}`); 486 console.log(`Reports Generated: ${stats.reportsGenerated}`); 487 console.log(`Reports Delivered: ${stats.reportsDelivered}`); 488 console.log(`Errors: ${stats.errors}\n`); 489 process.exit(0); 490 }) 491 .catch(error => { 492 logger.error('Processing failed', error); 493 process.exit(1); 494 }); 495 } 496 497 export default { 498 processReplies, 499 classifyNewReplies, 500 sendPaymentLinks, 501 generatePaidReports, 502 deliverReports, 503 };