/ src / inbound / processor.js
processor.js
  1  #!/usr/bin/env node
  2  
  3  /**
  4   * Unified Inbound Processor
  5   * Routes inbound messages (SMS, Email) to messages table
  6   * Provides threaded view by site_id for operator review
  7   */
  8  
  9  import { getAll, getOne, run } from '../utils/db.js';
 10  import Logger from '../utils/logger.js';
 11  import '../utils/load-env.js';
 12  
 13  const logger = new Logger('InboundProcessor');
 14  
 15  /**
 16   * Poll all inbound channels (SMS + Email)
 17   */
 18  export async function pollAllChannels() {
 19    logger.info('Polling all inbound channels...');
 20  
 21    const results = {
 22      sms: { processed: 0, stored: 0, unmatched: 0 },
 23      email: { processed: 0, stored: 0, unmatched: 0 },
 24    };
 25  
 26    try {
 27      // Import handlers dynamically
 28      const { pollInboundSMS } = await import('./sms.js');
 29      const { pollInboundEmails } = await import('./email.js');
 30  
 31      // Poll SMS
 32      try {
 33        logger.info('Polling SMS messages...');
 34        results.sms = await pollInboundSMS();
 35      } catch (error) {
 36        logger.error('Failed to poll SMS', error);
 37      }
 38  
 39      // Poll Email
 40      try {
 41        logger.info('Polling email messages...');
 42        results.email = await pollInboundEmails();
 43      } catch (error) {
 44        logger.error('Failed to poll emails', error);
 45      }
 46  
 47      const totalStored = results.sms.stored + results.email.stored;
 48      const totalUnmatched = results.sms.unmatched + results.email.unmatched;
 49  
 50      logger.success(`Polling complete: ${totalStored} messages stored, ${totalUnmatched} unmatched`);
 51  
 52      return results;
 53    } catch (error) {
 54      logger.error('Error polling inbound channels', error);
 55      throw error;
 56    }
 57  }
 58  
 59  /**
 60   * Process all pending operator replies (SMS + Email)
 61   */
 62  export async function processAllReplies() {
 63    logger.info('Processing all pending operator replies...');
 64  
 65    const results = {
 66      sms: { sent: 0, failed: 0 },
 67      email: { sent: 0, failed: 0 },
 68    };
 69  
 70    try {
 71      // Import handlers dynamically
 72      const { processPendingReplies: processSMS } = await import('./sms.js');
 73      const { processPendingReplies: processEmail } = await import('./email.js');
 74  
 75      // Process SMS replies
 76      try {
 77        logger.info('Processing SMS replies...');
 78        results.sms = await processSMS();
 79      } catch (error) {
 80        logger.error('Failed to process SMS replies', error);
 81      }
 82  
 83      // Process Email replies
 84      try {
 85        logger.info('Processing email replies...');
 86        results.email = await processEmail();
 87      } catch (error) {
 88        logger.error('Failed to process email replies', error);
 89      }
 90  
 91      const totalSent = results.sms.sent + results.email.sent;
 92      const totalFailed = results.sms.failed + results.email.failed;
 93  
 94      logger.success(`Processed ${totalSent} replies (${totalFailed} failed)`);
 95  
 96      return results;
 97    } catch (error) {
 98      logger.error('Error processing replies', error);
 99      throw error;
100    }
101  }
102  
103  /**
104   * Get unread conversations for operator review
105   * Groups by outreach_id to provide threaded view
106   */
107  export async function getUnreadConversations(limit = 50) {
108    // Get conversations that haven't been read yet
109    const conversations = await getAll(
110      `SELECT
111        c.id,
112        c.site_id AS outreach_id,
113        c.direction,
114        c.contact_method AS channel,
115        c.contact_uri AS sender_identifier,
116        c.message_body,
117        c.subject_line,
118        c.sentiment,
119        c.created_at AS received_at,
120        o.site_id,
121        s.domain,
122        s.keyword,
123        o.contact_uri,
124        o.contact_method
125       FROM messages c
126       JOIN messages o ON o.site_id = c.site_id AND o.direction = 'outbound'
127       JOIN sites s ON o.site_id = s.id
128       WHERE c.direction = 'inbound'
129       AND c.read_at IS NULL
130       ORDER BY c.created_at DESC
131       LIMIT $1`,
132      [limit]
133    );
134  
135    return conversations;
136  }
137  
138  /**
139   * Get conversation thread for a specific outreach
140   */
141  export async function getConversationThread(outreachId) {
142    // Get all messages in this thread (both inbound and outbound)
143    const messages = await getAll(
144      `SELECT
145        c.id,
146        c.direction,
147        c.contact_method AS channel,
148        c.contact_uri AS sender_identifier,
149        c.message_body,
150        c.subject_line,
151        c.sentiment,
152        c.created_at AS received_at,
153        c.sent_at,
154        c.read_at
155       FROM messages c
156       WHERE c.site_id = $1
157       ORDER BY c.created_at ASC`,
158      [outreachId]
159    );
160  
161    // Get outreach details
162    const outreach = await getOne(
163      `SELECT
164        o.id,
165        o.site_id,
166        o.message_body,
167        o.subject_line,
168        o.contact_method,
169        o.contact_uri,
170        o.delivery_status AS status,
171        o.sent_at,
172        o.delivered_at,
173        o.opened_at,
174        s.domain,
175        s.keyword,
176        s.score AS conversion_score,
177        s.landing_page_url
178       FROM messages o
179       JOIN sites s ON o.site_id = s.id
180       WHERE o.direction = 'outbound' AND o.site_id = $1`,
181      [outreachId]
182    );
183  
184    return {
185      outreach,
186      messages,
187    };
188  }
189  
190  /**
191   * Mark conversation as read
192   */
193  export async function markConversationRead(conversationId) {
194    await run(
195      `UPDATE messages
196       SET read_at = CURRENT_TIMESTAMP
197       WHERE id = $1`,
198      [conversationId]
199    );
200  
201    logger.success(`Marked conversation #${conversationId} as read`);
202    return true;
203  }
204  
205  /**
206   * Mark all conversations for an outreach as read
207   */
208  export async function markThreadRead(outreachId) {
209    const result = await run(
210      `UPDATE messages
211       SET read_at = CURRENT_TIMESTAMP
212       WHERE site_id = $1
213       AND direction = 'inbound'
214       AND read_at IS NULL`,
215      [outreachId]
216    );
217  
218    logger.success(`Marked ${result.changes} messages as read for site #${outreachId}`);
219    return result.changes;
220  }
221  
222  /**
223   * Get statistics on inbound conversations
224   */
225  export async function getInboundStats() {
226    // Total unread
227    const unreadRow = await getOne(
228      `SELECT COUNT(*) AS count
229       FROM messages
230       WHERE direction = 'inbound'
231       AND read_at IS NULL`,
232      []
233    );
234    const unreadCount = Number(unreadRow.count);
235  
236    // By channel
237    const byChannel = await getAll(
238      `SELECT
239        contact_method AS channel,
240        COUNT(*) AS total,
241        SUM(CASE WHEN read_at IS NULL THEN 1 ELSE 0 END) AS unread
242       FROM messages
243       WHERE direction = 'inbound'
244       GROUP BY contact_method`,
245      []
246    );
247  
248    // By sentiment
249    const bySentiment = await getAll(
250      `SELECT
251        sentiment,
252        COUNT(*) AS count
253       FROM messages
254       WHERE direction = 'inbound'
255       AND read_at IS NULL
256       GROUP BY sentiment`,
257      []
258    );
259  
260    return {
261      unreadCount,
262      byChannel,
263      bySentiment,
264    };
265  }
266  
267  // CLI functionality
268  if (import.meta.url === `file://${process.argv[1]}`) {
269    const command = process.argv[2];
270  
271    if (command === 'poll') {
272      pollAllChannels()
273        .then(result => {
274          console.log('\nāœ… Inbound Polling Complete\n');
275          console.log(`SMS: ${result.sms.stored} stored, ${result.sms.unmatched} unmatched`);
276          console.log(`Email: ${result.email.stored} stored, ${result.email.unmatched} unmatched\n`);
277          process.exit(0);
278        })
279        .catch(error => {
280          logger.error('Failed to poll inbound channels', error);
281          process.exit(1);
282        });
283    } else if (command === 'process-replies') {
284      processAllReplies()
285        .then(result => {
286          console.log('\nāœ… Processed Operator Replies\n');
287          console.log(`SMS: ${result.sms.sent} sent, ${result.sms.failed} failed`);
288          console.log(`Email: ${result.email.sent} sent, ${result.email.failed} failed\n`);
289          process.exit(0);
290        })
291        .catch(error => {
292          logger.error('Failed to process replies', error);
293          process.exit(1);
294        });
295    } else if (command === 'inbox') {
296      const limit = parseInt(process.argv[3]) || 50;
297      getUnreadConversations(limit)
298        .then(conversations => {
299          console.log(`\nšŸ“¬ Unread Conversations (${conversations.length})\n`);
300  
301          if (conversations.length === 0) {
302            console.log('No unread conversations\n');
303          } else {
304            for (const conv of conversations) {
305              console.log(`#${conv.id} | ${conv.channel.toUpperCase()} | ${conv.domain}`);
306              console.log(`  From: ${conv.sender_identifier}`);
307              console.log(`  Sentiment: ${conv.sentiment || 'unknown'}`);
308              console.log(`  Message: ${conv.message_body.substring(0, 100)}...`);
309              console.log(`  Received: ${conv.received_at}\n`);
310            }
311          }
312  
313          process.exit(0);
314        })
315        .catch(error => {
316          logger.error('Failed to get conversations', error);
317          process.exit(1);
318        });
319    } else if (command === 'thread') {
320      const outreachId = parseInt(process.argv[3]);
321  
322      if (!outreachId) {
323        console.error('Usage: node src/inbound/processor.js thread <outreach_id>');
324        process.exit(1);
325      }
326  
327      getConversationThread(outreachId)
328        .then(thread => {
329          if (!thread.outreach) {
330            console.error(`Outreach #${outreachId} not found`);
331            process.exit(1);
332          }
333  
334          console.log(`\nšŸ’¬ Conversation Thread for Outreach #${outreachId}\n`);
335          console.log(`Domain: ${thread.outreach.domain}`);
336          console.log(`Keyword: ${thread.outreach.keyword}`);
337          console.log(`Contact: ${thread.outreach.contact_uri} (${thread.outreach.contact_method})`);
338          console.log(`Score: ${thread.outreach.conversion_score}\n`);
339          console.log('Messages:\n');
340  
341          for (const msg of thread.messages) {
342            const direction = msg.direction === 'inbound' ? '←' : '→';
343            console.log(
344              `${direction} ${msg.direction.toUpperCase()} | ${msg.received_at} | ${msg.sentiment || ''}`
345            );
346            if (msg.subject_line) {
347              console.log(`  Subject: ${msg.subject_line}`);
348            }
349            console.log(`  ${msg.message_body}\n`);
350          }
351  
352          process.exit(0);
353        })
354        .catch(error => {
355          logger.error('Failed to get thread', error);
356          process.exit(1);
357        });
358    } else if (command === 'stats') {
359      getInboundStats()
360        .then(stats => {
361          console.log('\nšŸ“Š Inbound Statistics\n');
362          console.log(`Total Unread: ${stats.unreadCount}\n`);
363  
364          console.log('By Channel:');
365          for (const channel of stats.byChannel) {
366            console.log(`  ${channel.channel}: ${channel.unread}/${channel.total} unread`);
367          }
368  
369          console.log('\nBy Sentiment:');
370          for (const sentiment of stats.bySentiment) {
371            console.log(`  ${sentiment.sentiment || 'unknown'}: ${sentiment.count}`);
372          }
373  
374          console.log('');
375          process.exit(0);
376        })
377        .catch(error => {
378          logger.error('Failed to get stats', error);
379          process.exit(1);
380        });
381    } else {
382      console.log('Unified Inbound Processor');
383      console.log('');
384      console.log('Usage:');
385      console.log('  poll               - Poll all inbound channels (SMS + Email)');
386      console.log('  process-replies    - Process pending operator replies (SMS + Email)');
387      console.log('  inbox [limit]      - Show unread conversations (default: 50)');
388      console.log('  thread <id>        - Show conversation thread for outreach');
389      console.log('  stats              - Show inbound statistics');
390      console.log('');
391      console.log('Examples:');
392      console.log('  node src/inbound/processor.js poll');
393      console.log('  node src/inbound/processor.js inbox 20');
394      console.log('  node src/inbound/processor.js thread 123');
395      console.log('  node src/inbound/processor.js stats');
396      console.log('');
397      process.exit(1);
398    }
399  }
400  
401  export default {
402    pollAllChannels,
403    processAllReplies,
404    getUnreadConversations,
405    getConversationThread,
406    markConversationRead,
407    markThreadRead,
408    getInboundStats,
409  };