sms.js
1 #!/usr/bin/env node 2 3 /** 4 * Twilio SMS Webhook Receiver 5 * Handles inbound SMS messages and stores them in messages table 6 */ 7 8 import twilio from 'twilio'; 9 import { getAll, getOne, run } from '../utils/db.js'; 10 import Logger from '../utils/logger.js'; 11 import { processStopKeyword, processStartKeyword } from '../utils/compliance.js'; 12 import '../utils/load-env.js'; 13 14 const logger = new Logger('InboundSMS'); 15 16 /** 17 * Find outbound message by phone number 18 * Returns { site_id, contact_uri } from the messages table 19 */ 20 export async function findOutreachByPhone(phoneNumber) { 21 // Normalize phone number (remove spaces, dashes, etc.) 22 const normalized = phoneNumber.replace(/[\s\-()]/g, ''); 23 24 // For Australian numbers (and others), handle leading 0 -> country code conversion 25 // E.g., 0412345678 should match +61412345678 26 const withoutLeadingZero = normalized.startsWith('0') ? normalized.slice(1) : normalized; 27 28 // Find outbound message with matching phone in contact_uri 29 const outreach = await getOne( 30 `SELECT id, site_id, contact_uri, contact_method 31 FROM messages 32 WHERE direction = 'outbound' 33 AND contact_method = 'sms' 34 AND ( 35 contact_uri LIKE $1 36 OR contact_uri LIKE $2 37 OR contact_uri LIKE $3 38 OR contact_uri LIKE $4 39 ) 40 ORDER BY sent_at DESC 41 LIMIT 1`, 42 [ 43 `%${normalized}%`, 44 `%${normalized.slice(-10)}%`, 45 `%${phoneNumber}%`, 46 `%${withoutLeadingZero}%`, 47 ] 48 ); 49 50 return outreach; 51 } 52 53 /** 54 * Store inbound SMS in messages table 55 */ 56 export async function storeInboundSMS(siteId, fromNumber, messageBody) { 57 // Content-level dedup: skip if same sender sent identical text within 5 minutes 58 const contentDupe = await getOne( 59 `SELECT id FROM messages 60 WHERE contact_uri = $1 61 AND message_body = $2 62 AND direction = 'inbound' 63 AND created_at > NOW() - INTERVAL '5 minutes'`, 64 [fromNumber, messageBody] 65 ); 66 67 if (contentDupe) { 68 logger.info( 69 `Content-duplicate from ${fromNumber} (matches message #${contentDupe.id}), skipping` 70 ); 71 return contentDupe.id; 72 } 73 74 const result = await run( 75 `INSERT INTO messages ( 76 site_id, direction, contact_method, contact_uri, 77 message_body, sentiment 78 ) VALUES ($1, 'inbound', 'sms', $2, $3, NULL) 79 RETURNING id`, 80 [siteId, fromNumber, messageBody] 81 ); 82 83 logger.success( 84 `Stored inbound SMS from ${fromNumber} for site #${siteId} (message #${result.lastInsertRowid})` 85 ); 86 87 return result.lastInsertRowid; 88 } 89 90 /** 91 * Poll Twilio API for inbound SMS messages 92 * This is an alternative to webhooks that doesn't require ngrok 93 */ 94 export async function pollInboundSMS() { 95 const accountSid = process.env.TWILIO_ACCOUNT_SID; 96 const authToken = process.env.TWILIO_AUTH_TOKEN; 97 98 if (!accountSid || !authToken) { 99 throw new Error('Missing Twilio credentials. Set TWILIO_ACCOUNT_SID and TWILIO_AUTH_TOKEN'); 100 } 101 102 const client = twilio(accountSid, authToken); 103 104 try { 105 // Collect all distinct phone numbers from the countries table 106 const phoneRows = await getAll( 107 `SELECT DISTINCT twilio_phone_number FROM countries 108 WHERE twilio_phone_number IS NOT NULL AND sms_enabled = true`, 109 [] 110 ); 111 const phoneNumbers = phoneRows.map(r => r.twilio_phone_number); 112 113 if (phoneNumbers.length === 0) { 114 logger.info('No Twilio phone numbers configured in countries table — skipping poll'); 115 return { processed: 0, stored: 0, unmatched: 0 }; 116 } 117 118 // Always poll last 24 hours (simplified approach - no timestamp tracking) 119 const lastPoll = new Date(); 120 lastPoll.setHours(lastPoll.getHours() - 24); 121 logger.info( 122 `Polling ${phoneNumbers.length} numbers for SMS from last 24h (since ${lastPoll.toISOString()})...` 123 ); 124 125 // Fetch messages for all our numbers and deduplicate by SID 126 const seen = new Set(); 127 const messages = []; 128 for (const phone of phoneNumbers) { 129 const batch = await client.messages.list({ 130 to: phone, 131 dateSentAfter: lastPoll, 132 limit: 100, 133 }); 134 for (const msg of batch) { 135 if (!seen.has(msg.sid)) { 136 seen.add(msg.sid); 137 messages.push(msg); 138 } 139 } 140 } 141 142 if (messages.length === 0) { 143 logger.info('No new inbound SMS messages'); 144 return { processed: 0, stored: 0, unmatched: 0 }; 145 } 146 147 logger.info(`Found ${messages.length} new inbound SMS messages`); 148 149 let stored = 0; 150 let unmatched = 0; 151 152 for (const msg of messages) { 153 logger.info(`Processing SMS from ${msg.from}: "${msg.body}"`); 154 155 // Find matching outreach 156 const outreach = await findOutreachByPhone(msg.from); 157 158 if (!outreach) { 159 logger.warn(`No outreach found for phone number: ${msg.from}`); 160 unmatched++; 161 continue; 162 } 163 164 // Check if this message was already stored (by MessageSid) BEFORE any side effects. 165 // The poller queries the last 24h on every run, so the same message SID will appear 166 // repeatedly. Without this early-exit, processStopKeyword fires on every poll cycle 167 // causing duplicate opt-out writes for a single STOP message. 168 const existing = await getOne( 169 `SELECT id FROM messages 170 WHERE raw_payload::text LIKE $1 171 AND site_id = $2`, 172 [`%${msg.sid}%`, outreach.site_id] 173 ); 174 175 if (existing) { 176 logger.info(`Message ${msg.sid} already stored, skipping`); 177 continue; 178 } 179 180 // Content-level dedup: skip if same sender sent identical text within 5 minutes. 181 // Catches rage-sends and double-taps (different Twilio SIDs, same content). 182 const contentDupe = await getOne( 183 `SELECT id FROM messages 184 WHERE contact_uri = $1 185 AND message_body = $2 186 AND direction = 'inbound' 187 AND created_at > $3::timestamptz - INTERVAL '5 minutes'`, 188 [msg.from, msg.body, msg.dateSent.toISOString()] 189 ); 190 191 if (contentDupe) { 192 logger.info( 193 `Content-duplicate from ${msg.from} (matches message #${contentDupe.id}), skipping` 194 ); 195 continue; 196 } 197 198 // Process STOP keywords for TCPA compliance 199 const stopResult = await processStopKeyword(msg.body, msg.from); 200 if (stopResult.isOptOutRequest) { 201 logger.success(`Processed opt-out request from ${msg.from}`); 202 203 // Twilio handles STOP auto-reply natively; sending our own ack fails with 21610/21614 204 // because the number is already blocked the moment STOP is processed. 205 logger.info(`Opt-out recorded for ${msg.from} (no ack sent — Twilio handles STOP reply)`); 206 } 207 208 // Process START keywords for re-subscription 209 const startResult = await processStartKeyword(msg.body, msg.from); 210 if (startResult.isResubscribeRequest) { 211 logger.success(`Processed re-subscription request from ${msg.from}`); 212 213 // Send confirmation reply 214 try { 215 await client.messages.create({ 216 body: 'You have been resubscribed to SMS messages. Reply STOP to unsubscribe.', 217 from: msg.to, 218 to: msg.from, 219 }); 220 logger.success(`Sent re-subscription confirmation to ${msg.from}`); 221 } catch (error) { 222 logger.error(`Failed to send re-subscription confirmation to ${msg.from}`, error); 223 } 224 } 225 226 // Store in messages table with full Twilio payload 227 const result = await run( 228 `INSERT INTO messages ( 229 site_id, direction, contact_method, contact_uri, 230 message_body, sentiment, raw_payload, created_at 231 ) VALUES ($1, 'inbound', 'sms', $2, $3, NULL, $4, $5) 232 RETURNING id`, 233 [ 234 outreach.site_id, 235 msg.from, 236 msg.body, 237 JSON.stringify({ 238 sid: msg.sid, 239 from: msg.from, 240 to: msg.to, 241 body: msg.body, 242 status: msg.status, 243 dateSent: msg.dateSent, 244 }), 245 msg.dateSent.toISOString(), 246 ] 247 ); 248 249 // We got a human reply — we've made contact. Cancel any remaining pending/approved 250 // outbound messages for this site and advance the site to 'outreach_sent'. 251 // This prevents follow-up messages to someone who already replied. 252 const cancelledOutreaches = await run( 253 `UPDATE messages SET approval_status = 'rejected', updated_at = CURRENT_TIMESTAMP 254 WHERE site_id = $1 AND direction = 'outbound' AND approval_status IN ('pending', 'approved')`, 255 [outreach.site_id] 256 ); 257 258 // Set conversation_status to 'active' on the site (we have a human reply) 259 await run( 260 `UPDATE sites SET status = 'outreach_sent', conversation_status = 'active', updated_at = CURRENT_TIMESTAMP 261 WHERE id = $1 AND status IN ('proposals_drafted', 'outreach_partial')`, 262 [outreach.site_id] 263 ); 264 265 if (cancelledOutreaches.changes > 0) { 266 logger.info( 267 `Cancelled ${cancelledOutreaches.changes} pending outbound messages for site #${outreach.site_id} (human reply received)` 268 ); 269 } 270 271 logger.success( 272 `Stored inbound SMS from ${msg.from} for site #${outreach.site_id} (message #${result.lastInsertRowid})` 273 ); 274 stored++; 275 } 276 277 logger.success( 278 `Polling complete: ${stored} messages stored, ${unmatched} unmatched, ${messages.length} total` 279 ); 280 281 return { processed: messages.length, stored, unmatched }; 282 } catch (error) { 283 logger.error('Error polling Twilio API', error); 284 throw error; 285 } 286 } 287 288 /** 289 * Process pending operator replies from messages table 290 * Send outbound SMS for messages marked as direction='outbound' and not yet sent 291 */ 292 export async function processPendingReplies() { 293 try { 294 // Find outbound messages that haven't been sent yet (no sent_at) 295 const pendingReplies = await getAll( 296 `SELECT id, site_id, message_body, contact_uri 297 FROM messages 298 WHERE direction = 'outbound' 299 AND contact_method = 'sms' 300 AND message_type = 'reply' 301 AND sent_at IS NULL 302 AND delivery_status IS NULL 303 ORDER BY created_at`, 304 [] 305 ); 306 307 if (pendingReplies.length === 0) { 308 logger.info('No pending operator replies to send'); 309 return { sent: 0, failed: 0 }; 310 } 311 312 logger.info(`Processing ${pendingReplies.length} pending operator replies...`); 313 314 const { sendSMS } = await import('../outreach/sms.js'); 315 let sent = 0; 316 let failed = 0; 317 318 for (const reply of pendingReplies) { 319 try { 320 // Send SMS using existing sendSMS function 321 await sendSMS(reply.id); 322 323 // Mark as sent 324 await run( 325 `UPDATE messages SET sent_at = CURRENT_TIMESTAMP, delivery_status = 'sent', updated_at = CURRENT_TIMESTAMP WHERE id = $1`, 326 [reply.id] 327 ); 328 329 logger.success(`Sent operator reply to ${reply.contact_uri} (message #${reply.id})`); 330 sent++; 331 } catch (error) { 332 logger.error(`Failed to send reply for message #${reply.id}`, error); 333 failed++; 334 } 335 } 336 337 logger.success( 338 `Processed ${sent}/${pendingReplies.length} operator replies (${failed} failed)` 339 ); 340 341 return { sent, failed }; 342 } catch (error) { 343 logger.error('Error processing pending SMS replies', error); 344 throw error; 345 } 346 } 347 348 // CLI functionality 349 if (import.meta.url === `file://${process.argv[1]}`) { 350 const command = process.argv[2]; 351 352 if (command === 'poll') { 353 pollInboundSMS() 354 .then(result => { 355 console.log( 356 `\n✅ SMS Polling Complete: ${result.stored} new messages stored, ${result.unmatched} unmatched\n` 357 ); 358 process.exit(0); 359 }) 360 .catch(error => { 361 logger.error('Failed to poll SMS', error); 362 process.exit(1); 363 }); 364 } else if (command === 'process-replies') { 365 processPendingReplies() 366 .then(result => { 367 console.log( 368 `\n✅ Processed operator replies: ${result.sent} sent, ${result.failed} failed\n` 369 ); 370 process.exit(0); 371 }) 372 .catch(error => { 373 logger.error('Failed to process replies', error); 374 process.exit(1); 375 }); 376 } else { 377 console.log('Inbound SMS Management'); 378 console.log(''); 379 console.log('Usage:'); 380 console.log(' poll - Poll Twilio API for new inbound SMS'); 381 console.log(' process-replies - Send pending operator replies from messages table'); 382 console.log(''); 383 console.log('Examples:'); 384 console.log(' node src/inbound/sms.js poll'); 385 console.log(' node src/inbound/sms.js process-replies'); 386 console.log(''); 387 console.log('Setup:'); 388 console.log(' Webhooks are handled by Cloudflare Workers (see cloudflare-worker/ directory)'); 389 console.log(' Use polling for local testing or as backup to webhooks'); 390 console.log(''); 391 console.log('Setup cron for polling (every 5 minutes):'); 392 console.log(' */5 * * * * cd /path/to/project && node src/inbound/sms.js poll'); 393 console.log(''); 394 process.exit(1); 395 } 396 } 397 398 export default { 399 findOutreachByPhone, 400 storeInboundSMS, 401 processPendingReplies, 402 pollInboundSMS, 403 };