/ src / cli / reply-processor.js
reply-processor.js
  1  #!/usr/bin/env node
  2  
  3  /**
  4   * Reply Processor
  5   * Automated cron job to process inbound replies and handle payment flow
  6   *
  7   * Flow:
  8   * 1. Poll inbound channels (SMS, Email)
  9   * 2. Classify new replies (interested, not_interested, question, unsubscribe)
 10   * 3. Take action based on classification:
 11   *    - interested → Send payment link
 12   *    - not_interested → Close conversation
 13   *    - question → Flag for human review
 14   *    - unsubscribe → Add to opt-out list
 15   * 4. Check for paid conversations → Generate reports
 16   * 5. Deliver reports via email
 17   */
 18  
 19  import { run, getAll } from '../utils/db.js';
 20  import { join, dirname } from 'path';
 21  import { fileURLToPath } from 'url';
 22  
 23  import Logger from '../utils/logger.js';
 24  import { classifyReply } from '../utils/reply-classifier.js';
 25  import { createPaymentOrder, generatePaymentMessage } from '../payment/paypal.js';
 26  import { generateReport } from '../reports/cro-report-generator.js';
 27  import { pollAllChannels } from '../inbound/processor.js';
 28  import { pollPayPalEvents } from '../payment/poll-paypal-events.js';
 29  import { processStopKeyword } from '../utils/compliance.js';
 30  import '../utils/load-env.js';
 31  
 32  // Base62 encoder — produces opaque, non-enumerable URL slugs for /a/ and /v/
 33  const BASE62 = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz';
 34  function base62Encode(n) {
 35    if (n === 0) return '0';
 36    let result = '';
 37    while (n > 0) {
 38      result = BASE62[n % 62] + result;
 39      n = Math.floor(n / 62);
 40    }
 41    return result;
 42  }
 43  
 44  const __filename = fileURLToPath(import.meta.url);
 45  const __dirname = dirname(__filename);
 46  
 47  const logger = new Logger('ReplyProcessor');
 48  
 49  /**
 50   * Main reply processing function
 51   */
 52  export async function processReplies() {
 53    logger.info('Starting reply processing...');
 54  
 55    const stats = {
 56      polled: 0,
 57      classified: 0,
 58      paymentsSent: 0,
 59      paymentsProcessed: 0,
 60      reportsGenerated: 0,
 61      reportsDelivered: 0,
 62      errors: 0,
 63    };
 64  
 65    try {
 66      // Step 1: Poll all inbound channels
 67      logger.info('Step 1: Polling inbound channels...');
 68      const pollResults = await pollAllChannels();
 69      stats.polled = pollResults.sms.stored + pollResults.email.stored;
 70      logger.info(`Polled ${stats.polled} new messages`);
 71  
 72      // Step 2: Classify new replies
 73      logger.info('Step 2: Classifying new replies...');
 74      const classifiedCount = await classifyNewReplies();
 75      stats.classified = classifiedCount;
 76  
 77      // Step 3: Send payment links to interested prospects
 78      logger.info('Step 3: Sending payment links...');
 79      const paymentCount = await sendPaymentLinks();
 80      stats.paymentsSent = paymentCount;
 81  
 82      // Step 3.5: Poll and process PayPal payment events
 83      logger.info('Step 3.5: Polling PayPal payment events...');
 84      try {
 85        const paypalResults = await pollPayPalEvents();
 86        stats.paymentsProcessed = paypalResults.successful;
 87        logger.info(`Processed ${paypalResults.successful} payments`);
 88      } catch (error) {
 89        logger.error('PayPal polling failed (non-fatal)', error);
 90        // Don't throw - continue with report generation
 91      }
 92  
 93      // Step 4: Generate reports for paid conversations
 94      logger.info('Step 4: Generating reports for paid conversations...');
 95      const reportCount = await generatePaidReports();
 96      stats.reportsGenerated = reportCount;
 97  
 98      // Step 5: Deliver reports via email
 99      logger.info('Step 5: Delivering reports...');
100      const deliveredCount = await deliverReports();
101      stats.reportsDelivered = deliveredCount;
102  
103      logger.success('Reply processing complete');
104      logger.info(`Stats: ${JSON.stringify(stats)}`);
105  
106      return stats;
107    } catch (error) {
108      logger.error('Reply processing failed', error);
109      stats.errors++;
110      throw error;
111    }
112  }
113  
114  /**
115   * Classify all new inbound replies
116   */
117  async function classifyNewReplies() {
118    // Get unclassified inbound messages
119    const inboundMessages = await getAll(
120      `SELECT m.id, m.message_body, m.contact_method, m.contact_uri,
121              m.site_id, s.domain, s.landing_page_url
122       FROM messages m
123       JOIN sites s ON m.site_id = s.id
124       WHERE m.direction = 'inbound'
125       AND m.intent IS NULL
126       AND m.processed_at IS NULL
127       ORDER BY m.created_at DESC
128       LIMIT 50`
129    );
130  
131    if (inboundMessages.length === 0) {
132      logger.info('No messages to classify');
133      return 0;
134    }
135  
136    logger.info(`Classifying ${inboundMessages.length} messages...`);
137  
138    let classified = 0;
139  
140    for (const msg of inboundMessages) {
141      try {
142        // Build context for better classification
143        const context = `Domain: ${msg.domain}\nURL: ${msg.landing_page_url}`;
144  
145        // Classify the reply
146        const result = await classifyReply(msg.message_body, msg.contact_method, context);
147  
148        // Map classification to site conversation_status
149        const newStatus = mapClassificationToStatus(result.classification);
150  
151        // Update message with classification
152        await run(
153          `UPDATE messages
154           SET sentiment = $1,
155               intent = $2,
156               updated_at = NOW()
157           WHERE id = $3`,
158          [
159            result.classification,
160            result.classification === 'unsubscribe'
161              ? 'opt-out'
162              : result.classification === 'not_interested'
163                ? 'not-interested'
164                : result.classification,
165            msg.id,
166          ]
167        );
168  
169        // Update site conversation_status
170        await run(
171          `UPDATE sites
172           SET conversation_status = $1,
173               updated_at = NOW()
174           WHERE id = $2`,
175          [newStatus, msg.site_id]
176        );
177  
178        // Handle unsubscribe requests
179        if (result.classification === 'unsubscribe') {
180          await processStopKeyword(msg.message_body, msg.contact_uri);
181          logger.info(`Processed opt-out for ${msg.contact_uri}`);
182        }
183  
184        logger.success(
185          `Classified message ${msg.id} as "${result.classification}" (confidence: ${result.confidence})`
186        );
187        classified++;
188      } catch (error) {
189        logger.error(`Failed to classify message ${msg.id}`, error);
190        // Mark site as active for human review on error
191        await run(
192          `UPDATE sites SET conversation_status = 'active', updated_at = NOW() WHERE id = $1 AND conversation_status IS NULL`,
193          [msg.site_id]
194        );
195      }
196    }
197  
198    return classified;
199  }
200  
201  /**
202   * Map classification to conversation status
203   */
204  function mapClassificationToStatus(classification) {
205    const mapping = {
206      interested: 'qualified',
207      not_interested: 'not_interested',
208      question: 'active',
209      unsubscribe: 'unsubscribed',
210    };
211    // eslint-disable-next-line security/detect-object-injection -- Safe: classification is from controlled enum
212    return mapping[classification] || 'active';
213  }
214  
215  /**
216   * Send payment links to qualified prospects
217   */
218  async function sendPaymentLinks() {
219    // Get qualified sites with inbound messages but no payment link sent yet.
220    // Safety guard: only send payment links when the inbound message intent is explicitly
221    // "pricing" or "interested" — never to "question" (confused/Stage 2) messages.
222    // This prevents Stage 4 payment links from being sent to Stage 2 confused prospects.
223    const conversations = await getAll(
224      `SELECT m.id, m.site_id, m.contact_method as channel,
225              m.contact_uri, m.contact_method,
226              s.domain, s.landing_page_url, s.country_code, s.score, s.grade
227       FROM messages m
228       JOIN sites s ON m.site_id = s.id
229       WHERE m.direction = 'inbound'
230       AND s.conversation_status = 'qualified'
231       AND m.payment_link IS NULL
232       AND m.intent IN ('pricing', 'interested')
233       ORDER BY m.created_at DESC
234       LIMIT 20`
235    );
236  
237    if (conversations.length === 0) {
238      logger.info('No payment links to send');
239      return 0;
240    }
241  
242    logger.info(`Sending payment links to ${conversations.length} prospects...`);
243  
244    let sent = 0;
245  
246    for (const conv of conversations) {
247      try {
248        // Extract email from contact_uri (email or phone)
249        const email =
250          conv.contact_method === 'email'
251            ? conv.contact_uri
252            : process.env.DEFAULT_PAYER_EMAIL || 'customer@example.com';
253  
254        const countryCode = conv.country_code || 'US';
255  
256        // Create PayPal payment order with local currency
257        const payment = await createPaymentOrder({
258          domain: conv.domain,
259          email,
260          siteId: conv.site_id,
261          conversationId: conv.id,
262          countryCode,
263        });
264  
265        // Build short URL: auditandfix.com/o/{site_id}
266        // Uses the existing prefill short URL system (o.php reads data/orders/{site_id}.json
267        // and redirects to /?domain=...&country=...&email=...&ref=sms#order).
268        // The /o/ path matches the documented system in MEMORY.md and autoresponder.js.
269        const siteWebUrl = process.env.AUDITANDFIX_URL || 'https://auditandfix.com';
270        const shortUrl = `${siteWebUrl}/o/${conv.site_id}`;
271        try {
272          const prefillRes = await fetch(`${siteWebUrl}/api.php?action=store-prefill`, {
273            method: 'POST',
274            headers: {
275              'Content-Type': 'application/json',
276              'X-Auth-Secret': process.env.AUDITANDFIX_WORKER_SECRET || '',
277            },
278            body: JSON.stringify({
279              site_id: conv.site_id,
280              domain: conv.domain || conv.landing_page_url,
281              country: countryCode,
282              email: conv.contact_method === 'email' ? conv.contact_uri : null,
283              cid: conv.id,
284              score: conv.score !== null && conv.score !== undefined ? conv.score : undefined,
285              grade: conv.grade || undefined,
286            }),
287          });
288          if (!prefillRes.ok) {
289            logger.warn(`store-prefill HTTP ${prefillRes.status} for site ${conv.site_id}`);
290          }
291        } catch (prefillErr) {
292          logger.warn(`Could not store prefill for site ${conv.site_id}: ${prefillErr.message}`);
293        }
294  
295        // Update message with payment link and currency tracking
296        await run(
297          `UPDATE messages
298           SET payment_link = $1,
299               payment_id = $2,
300               payment_currency = $3,
301               payment_amount_local = $4,
302               payment_amount_usd = $5,
303               exchange_rate = $6,
304               updated_at = NOW()
305           WHERE id = $7`,
306          [
307            shortUrl,
308            payment.orderId,
309            payment.currency,
310            Math.round(payment.amount * 100), // Local currency in cents
311            Math.round(payment.amountUsd * 100), // USD equivalent in cents
312            payment.exchangeRate,
313            conv.id,
314          ]
315        );
316  
317        // Update site conversation_status to payment_requested
318        await run(
319          `UPDATE sites SET conversation_status = 'payment_requested', updated_at = NOW() WHERE id = $1`,
320          [conv.site_id]
321        );
322  
323        // Generate and send payment message using short URL
324        const message = await generatePaymentMessage(shortUrl, conv.channel, conv.domain, countryCode);
325  
326        // Send via appropriate channel
327        await sendPaymentMessage(conv, message);
328  
329        logger.success(`Sent payment link to conversation ${conv.id} via ${conv.channel}`);
330        sent++;
331      } catch (error) {
332        logger.error(`Failed to send payment link for conversation ${conv.id}`, error);
333      }
334    }
335  
336    return sent;
337  }
338  
339  /**
340   * Send payment message via appropriate channel
341   */
342  async function sendPaymentMessage(conversation, message) {
343    if (conversation.channel === 'sms') {
344      const { sendSMS } = await import('../outreach/sms.js');
345      await sendSMS(conversation.site_id, message, conversation.contact_uri);
346    } else if (conversation.channel === 'email') {
347      const { sendEmail } = await import('../outreach/email.js');
348      await sendEmail(
349        conversation.site_id,
350        message,
351        'Your CRO Report - Complete Payment',
352        conversation.contact_uri
353      );
354    } else {
355      throw new Error(`Unsupported channel for payment message: ${conversation.channel}`);
356    }
357  
358    // Log outbound message in messages table
359    await run(
360      `INSERT INTO messages (
361        site_id, direction, contact_method, contact_uri, message_body,
362        delivery_status, sent_at
363      ) VALUES ($1, 'outbound', $2, $3, $4, 'sent', NOW())`,
364      [conversation.site_id, conversation.channel, conversation.contact_uri, message]
365    );
366  }
367  
368  /**
369   * Generate reports for paid conversations
370   */
371  async function generatePaidReports() {
372    // Get paid sites without reports (find the latest inbound message for each)
373    const conversations = await getAll(
374      `SELECT m.id, m.site_id, s.domain
375       FROM messages m
376       JOIN sites s ON m.site_id = s.id
377       WHERE m.direction = 'inbound'
378       AND s.conversation_status = 'paid'
379       AND m.report_url IS NULL
380       ORDER BY m.created_at DESC
381       LIMIT 10`
382    );
383  
384    if (conversations.length === 0) {
385      logger.info('No reports to generate');
386      return 0;
387    }
388  
389    logger.info(`Generating reports for ${conversations.length} paid conversations...`);
390  
391    let generated = 0;
392  
393    for (const conv of conversations) {
394      try {
395        await generateReport(conv.site_id, conv.id);
396        logger.success(`Generated report for conversation ${conv.id} (${conv.domain})`);
397        generated++;
398      } catch (error) {
399        logger.error(`Failed to generate report for conversation ${conv.id}`, error);
400      }
401    }
402  
403    return generated;
404  }
405  
406  /**
407   * Deliver reports via email
408   */
409  async function deliverReports() {
410    // Get sites with reports ready to deliver (find the message with report_url)
411    const conversations = await getAll(
412      `SELECT m.id, m.site_id, m.report_url, m.contact_uri,
413              s.domain
414       FROM messages m
415       JOIN sites s ON m.site_id = s.id
416       WHERE s.conversation_status = 'report_delivered'
417       AND m.report_url IS NOT NULL
418       AND m.direction = 'inbound'
419       ORDER BY m.created_at DESC
420       LIMIT 10`
421    );
422  
423    if (conversations.length === 0) {
424      logger.info('No reports to deliver');
425      return 0;
426    }
427  
428    logger.info(`Delivering ${conversations.length} reports...`);
429  
430    let delivered = 0;
431  
432    for (const conv of conversations) {
433      try {
434        const { sendEmail } = await import('../outreach/email.js');
435  
436        const emailBody = `Thank you for your purchase!
437  
438  Attached is your Conversion Rate Optimization (CRO) Analysis Report for ${conv.domain}.
439  
440  This report includes:
441  • Detailed scoring across 5 key categories
442  • Critical conversion bottlenecks
443  • Prioritized recommendations
444  • Quick wins for immediate improvements
445  
446  Questions about the report? Just reply to this email.
447  
448  Best regards,
449  The CRO Team`;
450  
451        // Send email with report attachment
452        await sendEmail(
453          conv.site_id,
454          emailBody,
455          `Your CRO Report for ${conv.domain}`,
456          conv.contact_uri
457        );
458  
459        // Log the outbound delivery message
460        await run(
461          `INSERT INTO messages (
462            site_id, direction, contact_method, contact_uri, message_body,
463            subject_line, delivery_status, sent_at
464          ) VALUES ($1, 'outbound', 'email', $2, $3, $4, 'sent', NOW())`,
465          [conv.site_id, conv.contact_uri, emailBody, `Your CRO Report for ${conv.domain}`]
466        );
467  
468        logger.success(`Delivered report to ${conv.contact_uri}`);
469        delivered++;
470      } catch (error) {
471        logger.error(`Failed to deliver report for conversation ${conv.id}`, error);
472      }
473    }
474  
475    return delivered;
476  }
477  
478  // CLI functionality
479  if (import.meta.url === `file://${process.argv[1]}`) {
480    processReplies()
481      .then(stats => {
482        console.log('\n✅ Reply Processing Complete\n');
483        console.log(`Polled: ${stats.polled} messages`);
484        console.log(`Classified: ${stats.classified} conversations`);
485        console.log(`Payment Links Sent: ${stats.paymentsSent}`);
486        console.log(`Reports Generated: ${stats.reportsGenerated}`);
487        console.log(`Reports Delivered: ${stats.reportsDelivered}`);
488        console.log(`Errors: ${stats.errors}\n`);
489        process.exit(0);
490      })
491      .catch(error => {
492        logger.error('Processing failed', error);
493        process.exit(1);
494      });
495  }
496  
497  export default {
498    processReplies,
499    classifyNewReplies,
500    sendPaymentLinks,
501    generatePaidReports,
502    deliverReports,
503  };