processor.js
1 #!/usr/bin/env node 2 3 /** 4 * Unified Inbound Processor 5 * Routes inbound messages (SMS, Email) to messages table 6 * Provides threaded view by site_id for operator review 7 */ 8 9 import { getAll, getOne, run } from '../utils/db.js'; 10 import Logger from '../utils/logger.js'; 11 import '../utils/load-env.js'; 12 13 const logger = new Logger('InboundProcessor'); 14 15 /** 16 * Poll all inbound channels (SMS + Email) 17 */ 18 export async function pollAllChannels() { 19 logger.info('Polling all inbound channels...'); 20 21 const results = { 22 sms: { processed: 0, stored: 0, unmatched: 0 }, 23 email: { processed: 0, stored: 0, unmatched: 0 }, 24 }; 25 26 try { 27 // Import handlers dynamically 28 const { pollInboundSMS } = await import('./sms.js'); 29 const { pollInboundEmails } = await import('./email.js'); 30 31 // Poll SMS 32 try { 33 logger.info('Polling SMS messages...'); 34 results.sms = await pollInboundSMS(); 35 } catch (error) { 36 logger.error('Failed to poll SMS', error); 37 } 38 39 // Poll Email 40 try { 41 logger.info('Polling email messages...'); 42 results.email = await pollInboundEmails(); 43 } catch (error) { 44 logger.error('Failed to poll emails', error); 45 } 46 47 const totalStored = results.sms.stored + results.email.stored; 48 const totalUnmatched = results.sms.unmatched + results.email.unmatched; 49 50 logger.success(`Polling complete: ${totalStored} messages stored, ${totalUnmatched} unmatched`); 51 52 return results; 53 } catch (error) { 54 logger.error('Error polling inbound channels', error); 55 throw error; 56 } 57 } 58 59 /** 60 * Process all pending operator replies (SMS + Email) 61 */ 62 export async function processAllReplies() { 63 logger.info('Processing all pending operator replies...'); 64 65 const results = { 66 sms: { sent: 0, failed: 0 }, 67 email: { sent: 0, failed: 0 }, 68 }; 69 70 try { 71 // Import handlers dynamically 72 const { processPendingReplies: processSMS } = await import('./sms.js'); 73 const { processPendingReplies: processEmail } = await import('./email.js'); 74 75 // Process SMS replies 76 try { 77 logger.info('Processing SMS replies...'); 78 results.sms = await processSMS(); 79 } catch (error) { 80 logger.error('Failed to process SMS replies', error); 81 } 82 83 // Process Email replies 84 try { 85 logger.info('Processing email replies...'); 86 results.email = await processEmail(); 87 } catch (error) { 88 logger.error('Failed to process email replies', error); 89 } 90 91 const totalSent = results.sms.sent + results.email.sent; 92 const totalFailed = results.sms.failed + results.email.failed; 93 94 logger.success(`Processed ${totalSent} replies (${totalFailed} failed)`); 95 96 return results; 97 } catch (error) { 98 logger.error('Error processing replies', error); 99 throw error; 100 } 101 } 102 103 /** 104 * Get unread conversations for operator review 105 * Groups by outreach_id to provide threaded view 106 */ 107 export async function getUnreadConversations(limit = 50) { 108 // Get conversations that haven't been read yet 109 const conversations = await getAll( 110 `SELECT 111 c.id, 112 c.site_id AS outreach_id, 113 c.direction, 114 c.contact_method AS channel, 115 c.contact_uri AS sender_identifier, 116 c.message_body, 117 c.subject_line, 118 c.sentiment, 119 c.created_at AS received_at, 120 o.site_id, 121 s.domain, 122 s.keyword, 123 o.contact_uri, 124 o.contact_method 125 FROM messages c 126 JOIN messages o ON o.site_id = c.site_id AND o.direction = 'outbound' 127 JOIN sites s ON o.site_id = s.id 128 WHERE c.direction = 'inbound' 129 AND c.read_at IS NULL 130 ORDER BY c.created_at DESC 131 LIMIT $1`, 132 [limit] 133 ); 134 135 return conversations; 136 } 137 138 /** 139 * Get conversation thread for a specific outreach 140 */ 141 export async function getConversationThread(outreachId) { 142 // Get all messages in this thread (both inbound and outbound) 143 const messages = await getAll( 144 `SELECT 145 c.id, 146 c.direction, 147 c.contact_method AS channel, 148 c.contact_uri AS sender_identifier, 149 c.message_body, 150 c.subject_line, 151 c.sentiment, 152 c.created_at AS received_at, 153 c.sent_at, 154 c.read_at 155 FROM messages c 156 WHERE c.site_id = $1 157 ORDER BY c.created_at ASC`, 158 [outreachId] 159 ); 160 161 // Get outreach details 162 const outreach = await getOne( 163 `SELECT 164 o.id, 165 o.site_id, 166 o.message_body, 167 o.subject_line, 168 o.contact_method, 169 o.contact_uri, 170 o.delivery_status AS status, 171 o.sent_at, 172 o.delivered_at, 173 o.opened_at, 174 s.domain, 175 s.keyword, 176 s.score AS conversion_score, 177 s.landing_page_url 178 FROM messages o 179 JOIN sites s ON o.site_id = s.id 180 WHERE o.direction = 'outbound' AND o.site_id = $1`, 181 [outreachId] 182 ); 183 184 return { 185 outreach, 186 messages, 187 }; 188 } 189 190 /** 191 * Mark conversation as read 192 */ 193 export async function markConversationRead(conversationId) { 194 await run( 195 `UPDATE messages 196 SET read_at = CURRENT_TIMESTAMP 197 WHERE id = $1`, 198 [conversationId] 199 ); 200 201 logger.success(`Marked conversation #${conversationId} as read`); 202 return true; 203 } 204 205 /** 206 * Mark all conversations for an outreach as read 207 */ 208 export async function markThreadRead(outreachId) { 209 const result = await run( 210 `UPDATE messages 211 SET read_at = CURRENT_TIMESTAMP 212 WHERE site_id = $1 213 AND direction = 'inbound' 214 AND read_at IS NULL`, 215 [outreachId] 216 ); 217 218 logger.success(`Marked ${result.changes} messages as read for site #${outreachId}`); 219 return result.changes; 220 } 221 222 /** 223 * Get statistics on inbound conversations 224 */ 225 export async function getInboundStats() { 226 // Total unread 227 const unreadRow = await getOne( 228 `SELECT COUNT(*) AS count 229 FROM messages 230 WHERE direction = 'inbound' 231 AND read_at IS NULL`, 232 [] 233 ); 234 const unreadCount = Number(unreadRow.count); 235 236 // By channel 237 const byChannel = await getAll( 238 `SELECT 239 contact_method AS channel, 240 COUNT(*) AS total, 241 SUM(CASE WHEN read_at IS NULL THEN 1 ELSE 0 END) AS unread 242 FROM messages 243 WHERE direction = 'inbound' 244 GROUP BY contact_method`, 245 [] 246 ); 247 248 // By sentiment 249 const bySentiment = await getAll( 250 `SELECT 251 sentiment, 252 COUNT(*) AS count 253 FROM messages 254 WHERE direction = 'inbound' 255 AND read_at IS NULL 256 GROUP BY sentiment`, 257 [] 258 ); 259 260 return { 261 unreadCount, 262 byChannel, 263 bySentiment, 264 }; 265 } 266 267 // CLI functionality 268 if (import.meta.url === `file://${process.argv[1]}`) { 269 const command = process.argv[2]; 270 271 if (command === 'poll') { 272 pollAllChannels() 273 .then(result => { 274 console.log('\nā Inbound Polling Complete\n'); 275 console.log(`SMS: ${result.sms.stored} stored, ${result.sms.unmatched} unmatched`); 276 console.log(`Email: ${result.email.stored} stored, ${result.email.unmatched} unmatched\n`); 277 process.exit(0); 278 }) 279 .catch(error => { 280 logger.error('Failed to poll inbound channels', error); 281 process.exit(1); 282 }); 283 } else if (command === 'process-replies') { 284 processAllReplies() 285 .then(result => { 286 console.log('\nā Processed Operator Replies\n'); 287 console.log(`SMS: ${result.sms.sent} sent, ${result.sms.failed} failed`); 288 console.log(`Email: ${result.email.sent} sent, ${result.email.failed} failed\n`); 289 process.exit(0); 290 }) 291 .catch(error => { 292 logger.error('Failed to process replies', error); 293 process.exit(1); 294 }); 295 } else if (command === 'inbox') { 296 const limit = parseInt(process.argv[3]) || 50; 297 getUnreadConversations(limit) 298 .then(conversations => { 299 console.log(`\nš¬ Unread Conversations (${conversations.length})\n`); 300 301 if (conversations.length === 0) { 302 console.log('No unread conversations\n'); 303 } else { 304 for (const conv of conversations) { 305 console.log(`#${conv.id} | ${conv.channel.toUpperCase()} | ${conv.domain}`); 306 console.log(` From: ${conv.sender_identifier}`); 307 console.log(` Sentiment: ${conv.sentiment || 'unknown'}`); 308 console.log(` Message: ${conv.message_body.substring(0, 100)}...`); 309 console.log(` Received: ${conv.received_at}\n`); 310 } 311 } 312 313 process.exit(0); 314 }) 315 .catch(error => { 316 logger.error('Failed to get conversations', error); 317 process.exit(1); 318 }); 319 } else if (command === 'thread') { 320 const outreachId = parseInt(process.argv[3]); 321 322 if (!outreachId) { 323 console.error('Usage: node src/inbound/processor.js thread <outreach_id>'); 324 process.exit(1); 325 } 326 327 getConversationThread(outreachId) 328 .then(thread => { 329 if (!thread.outreach) { 330 console.error(`Outreach #${outreachId} not found`); 331 process.exit(1); 332 } 333 334 console.log(`\nš¬ Conversation Thread for Outreach #${outreachId}\n`); 335 console.log(`Domain: ${thread.outreach.domain}`); 336 console.log(`Keyword: ${thread.outreach.keyword}`); 337 console.log(`Contact: ${thread.outreach.contact_uri} (${thread.outreach.contact_method})`); 338 console.log(`Score: ${thread.outreach.conversion_score}\n`); 339 console.log('Messages:\n'); 340 341 for (const msg of thread.messages) { 342 const direction = msg.direction === 'inbound' ? 'ā' : 'ā'; 343 console.log( 344 `${direction} ${msg.direction.toUpperCase()} | ${msg.received_at} | ${msg.sentiment || ''}` 345 ); 346 if (msg.subject_line) { 347 console.log(` Subject: ${msg.subject_line}`); 348 } 349 console.log(` ${msg.message_body}\n`); 350 } 351 352 process.exit(0); 353 }) 354 .catch(error => { 355 logger.error('Failed to get thread', error); 356 process.exit(1); 357 }); 358 } else if (command === 'stats') { 359 getInboundStats() 360 .then(stats => { 361 console.log('\nš Inbound Statistics\n'); 362 console.log(`Total Unread: ${stats.unreadCount}\n`); 363 364 console.log('By Channel:'); 365 for (const channel of stats.byChannel) { 366 console.log(` ${channel.channel}: ${channel.unread}/${channel.total} unread`); 367 } 368 369 console.log('\nBy Sentiment:'); 370 for (const sentiment of stats.bySentiment) { 371 console.log(` ${sentiment.sentiment || 'unknown'}: ${sentiment.count}`); 372 } 373 374 console.log(''); 375 process.exit(0); 376 }) 377 .catch(error => { 378 logger.error('Failed to get stats', error); 379 process.exit(1); 380 }); 381 } else { 382 console.log('Unified Inbound Processor'); 383 console.log(''); 384 console.log('Usage:'); 385 console.log(' poll - Poll all inbound channels (SMS + Email)'); 386 console.log(' process-replies - Process pending operator replies (SMS + Email)'); 387 console.log(' inbox [limit] - Show unread conversations (default: 50)'); 388 console.log(' thread <id> - Show conversation thread for outreach'); 389 console.log(' stats - Show inbound statistics'); 390 console.log(''); 391 console.log('Examples:'); 392 console.log(' node src/inbound/processor.js poll'); 393 console.log(' node src/inbound/processor.js inbox 20'); 394 console.log(' node src/inbound/processor.js thread 123'); 395 console.log(' node src/inbound/processor.js stats'); 396 console.log(''); 397 process.exit(1); 398 } 399 } 400 401 export default { 402 pollAllChannels, 403 processAllReplies, 404 getUnreadConversations, 405 getConversationThread, 406 markConversationRead, 407 markThreadRead, 408 getInboundStats, 409 };