/ src / inbound / sms.js
sms.js
  1  #!/usr/bin/env node
  2  
  3  /**
  4   * Twilio SMS Webhook Receiver
  5   * Handles inbound SMS messages and stores them in messages table
  6   */
  7  
  8  import twilio from 'twilio';
  9  import { getAll, getOne, run } from '../utils/db.js';
 10  import Logger from '../utils/logger.js';
 11  import { processStopKeyword, processStartKeyword } from '../utils/compliance.js';
 12  import '../utils/load-env.js';
 13  
 14  const logger = new Logger('InboundSMS');
 15  
 16  /**
 17   * Find outbound message by phone number
 18   * Returns { site_id, contact_uri } from the messages table
 19   */
 20  export async function findOutreachByPhone(phoneNumber) {
 21    // Normalize phone number (remove spaces, dashes, etc.)
 22    const normalized = phoneNumber.replace(/[\s\-()]/g, '');
 23  
 24    // For Australian numbers (and others), handle leading 0 -> country code conversion
 25    // E.g., 0412345678 should match +61412345678
 26    const withoutLeadingZero = normalized.startsWith('0') ? normalized.slice(1) : normalized;
 27  
 28    // Find outbound message with matching phone in contact_uri
 29    const outreach = await getOne(
 30      `SELECT id, site_id, contact_uri, contact_method
 31       FROM messages
 32       WHERE direction = 'outbound'
 33       AND contact_method = 'sms'
 34       AND (
 35         contact_uri LIKE $1
 36         OR contact_uri LIKE $2
 37         OR contact_uri LIKE $3
 38         OR contact_uri LIKE $4
 39       )
 40       ORDER BY sent_at DESC
 41       LIMIT 1`,
 42      [
 43        `%${normalized}%`,
 44        `%${normalized.slice(-10)}%`,
 45        `%${phoneNumber}%`,
 46        `%${withoutLeadingZero}%`,
 47      ]
 48    );
 49  
 50    return outreach;
 51  }
 52  
 53  /**
 54   * Store inbound SMS in messages table
 55   */
 56  export async function storeInboundSMS(siteId, fromNumber, messageBody) {
 57    // Content-level dedup: skip if same sender sent identical text within 5 minutes
 58    const contentDupe = await getOne(
 59      `SELECT id FROM messages
 60       WHERE contact_uri = $1
 61       AND message_body = $2
 62       AND direction = 'inbound'
 63       AND created_at > NOW() - INTERVAL '5 minutes'`,
 64      [fromNumber, messageBody]
 65    );
 66  
 67    if (contentDupe) {
 68      logger.info(
 69        `Content-duplicate from ${fromNumber} (matches message #${contentDupe.id}), skipping`
 70      );
 71      return contentDupe.id;
 72    }
 73  
 74    const result = await run(
 75      `INSERT INTO messages (
 76        site_id, direction, contact_method, contact_uri,
 77        message_body, sentiment
 78      ) VALUES ($1, 'inbound', 'sms', $2, $3, NULL)
 79      RETURNING id`,
 80      [siteId, fromNumber, messageBody]
 81    );
 82  
 83    logger.success(
 84      `Stored inbound SMS from ${fromNumber} for site #${siteId} (message #${result.lastInsertRowid})`
 85    );
 86  
 87    return result.lastInsertRowid;
 88  }
 89  
 90  /**
 91   * Poll Twilio API for inbound SMS messages
 92   * This is an alternative to webhooks that doesn't require ngrok
 93   */
 94  export async function pollInboundSMS() {
 95    const accountSid = process.env.TWILIO_ACCOUNT_SID;
 96    const authToken = process.env.TWILIO_AUTH_TOKEN;
 97  
 98    if (!accountSid || !authToken) {
 99      throw new Error('Missing Twilio credentials. Set TWILIO_ACCOUNT_SID and TWILIO_AUTH_TOKEN');
100    }
101  
102    const client = twilio(accountSid, authToken);
103  
104    try {
105      // Collect all distinct phone numbers from the countries table
106      const phoneRows = await getAll(
107        `SELECT DISTINCT twilio_phone_number FROM countries
108         WHERE twilio_phone_number IS NOT NULL AND sms_enabled = true`,
109        []
110      );
111      const phoneNumbers = phoneRows.map(r => r.twilio_phone_number);
112  
113      if (phoneNumbers.length === 0) {
114        logger.info('No Twilio phone numbers configured in countries table — skipping poll');
115        return { processed: 0, stored: 0, unmatched: 0 };
116      }
117  
118      // Always poll last 24 hours (simplified approach - no timestamp tracking)
119      const lastPoll = new Date();
120      lastPoll.setHours(lastPoll.getHours() - 24);
121      logger.info(
122        `Polling ${phoneNumbers.length} numbers for SMS from last 24h (since ${lastPoll.toISOString()})...`
123      );
124  
125      // Fetch messages for all our numbers and deduplicate by SID
126      const seen = new Set();
127      const messages = [];
128      for (const phone of phoneNumbers) {
129        const batch = await client.messages.list({
130          to: phone,
131          dateSentAfter: lastPoll,
132          limit: 100,
133        });
134        for (const msg of batch) {
135          if (!seen.has(msg.sid)) {
136            seen.add(msg.sid);
137            messages.push(msg);
138          }
139        }
140      }
141  
142      if (messages.length === 0) {
143        logger.info('No new inbound SMS messages');
144        return { processed: 0, stored: 0, unmatched: 0 };
145      }
146  
147      logger.info(`Found ${messages.length} new inbound SMS messages`);
148  
149      let stored = 0;
150      let unmatched = 0;
151  
152      for (const msg of messages) {
153        logger.info(`Processing SMS from ${msg.from}: "${msg.body}"`);
154  
155        // Find matching outreach
156        const outreach = await findOutreachByPhone(msg.from);
157  
158        if (!outreach) {
159          logger.warn(`No outreach found for phone number: ${msg.from}`);
160          unmatched++;
161          continue;
162        }
163  
164        // Check if this message was already stored (by MessageSid) BEFORE any side effects.
165        // The poller queries the last 24h on every run, so the same message SID will appear
166        // repeatedly. Without this early-exit, processStopKeyword fires on every poll cycle
167        // causing duplicate opt-out writes for a single STOP message.
168        const existing = await getOne(
169          `SELECT id FROM messages
170           WHERE raw_payload::text LIKE $1
171           AND site_id = $2`,
172          [`%${msg.sid}%`, outreach.site_id]
173        );
174  
175        if (existing) {
176          logger.info(`Message ${msg.sid} already stored, skipping`);
177          continue;
178        }
179  
180        // Content-level dedup: skip if same sender sent identical text within 5 minutes.
181        // Catches rage-sends and double-taps (different Twilio SIDs, same content).
182        const contentDupe = await getOne(
183          `SELECT id FROM messages
184           WHERE contact_uri = $1
185           AND message_body = $2
186           AND direction = 'inbound'
187           AND created_at > $3::timestamptz - INTERVAL '5 minutes'`,
188          [msg.from, msg.body, msg.dateSent.toISOString()]
189        );
190  
191        if (contentDupe) {
192          logger.info(
193            `Content-duplicate from ${msg.from} (matches message #${contentDupe.id}), skipping`
194          );
195          continue;
196        }
197  
198        // Process STOP keywords for TCPA compliance
199        const stopResult = await processStopKeyword(msg.body, msg.from);
200        if (stopResult.isOptOutRequest) {
201          logger.success(`Processed opt-out request from ${msg.from}`);
202  
203          // Twilio handles STOP auto-reply natively; sending our own ack fails with 21610/21614
204          // because the number is already blocked the moment STOP is processed.
205          logger.info(`Opt-out recorded for ${msg.from} (no ack sent — Twilio handles STOP reply)`);
206        }
207  
208        // Process START keywords for re-subscription
209        const startResult = await processStartKeyword(msg.body, msg.from);
210        if (startResult.isResubscribeRequest) {
211          logger.success(`Processed re-subscription request from ${msg.from}`);
212  
213          // Send confirmation reply
214          try {
215            await client.messages.create({
216              body: 'You have been resubscribed to SMS messages. Reply STOP to unsubscribe.',
217              from: msg.to,
218              to: msg.from,
219            });
220            logger.success(`Sent re-subscription confirmation to ${msg.from}`);
221          } catch (error) {
222            logger.error(`Failed to send re-subscription confirmation to ${msg.from}`, error);
223          }
224        }
225  
226        // Store in messages table with full Twilio payload
227        const result = await run(
228          `INSERT INTO messages (
229            site_id, direction, contact_method, contact_uri,
230            message_body, sentiment, raw_payload, created_at
231          ) VALUES ($1, 'inbound', 'sms', $2, $3, NULL, $4, $5)
232          RETURNING id`,
233          [
234            outreach.site_id,
235            msg.from,
236            msg.body,
237            JSON.stringify({
238              sid: msg.sid,
239              from: msg.from,
240              to: msg.to,
241              body: msg.body,
242              status: msg.status,
243              dateSent: msg.dateSent,
244            }),
245            msg.dateSent.toISOString(),
246          ]
247        );
248  
249        // We got a human reply — we've made contact. Cancel any remaining pending/approved
250        // outbound messages for this site and advance the site to 'outreach_sent'.
251        // This prevents follow-up messages to someone who already replied.
252        const cancelledOutreaches = await run(
253          `UPDATE messages SET approval_status = 'rejected', updated_at = CURRENT_TIMESTAMP
254           WHERE site_id = $1 AND direction = 'outbound' AND approval_status IN ('pending', 'approved')`,
255          [outreach.site_id]
256        );
257  
258        // Set conversation_status to 'active' on the site (we have a human reply)
259        await run(
260          `UPDATE sites SET status = 'outreach_sent', conversation_status = 'active', updated_at = CURRENT_TIMESTAMP
261           WHERE id = $1 AND status IN ('proposals_drafted', 'outreach_partial')`,
262          [outreach.site_id]
263        );
264  
265        if (cancelledOutreaches.changes > 0) {
266          logger.info(
267            `Cancelled ${cancelledOutreaches.changes} pending outbound messages for site #${outreach.site_id} (human reply received)`
268          );
269        }
270  
271        logger.success(
272          `Stored inbound SMS from ${msg.from} for site #${outreach.site_id} (message #${result.lastInsertRowid})`
273        );
274        stored++;
275      }
276  
277      logger.success(
278        `Polling complete: ${stored} messages stored, ${unmatched} unmatched, ${messages.length} total`
279      );
280  
281      return { processed: messages.length, stored, unmatched };
282    } catch (error) {
283      logger.error('Error polling Twilio API', error);
284      throw error;
285    }
286  }
287  
288  /**
289   * Process pending operator replies from messages table
290   * Send outbound SMS for messages marked as direction='outbound' and not yet sent
291   */
292  export async function processPendingReplies() {
293    try {
294      // Find outbound messages that haven't been sent yet (no sent_at)
295      const pendingReplies = await getAll(
296        `SELECT id, site_id, message_body, contact_uri
297         FROM messages
298         WHERE direction = 'outbound'
299         AND contact_method = 'sms'
300         AND message_type = 'reply'
301         AND sent_at IS NULL
302         AND delivery_status IS NULL
303         ORDER BY created_at`,
304        []
305      );
306  
307      if (pendingReplies.length === 0) {
308        logger.info('No pending operator replies to send');
309        return { sent: 0, failed: 0 };
310      }
311  
312      logger.info(`Processing ${pendingReplies.length} pending operator replies...`);
313  
314      const { sendSMS } = await import('../outreach/sms.js');
315      let sent = 0;
316      let failed = 0;
317  
318      for (const reply of pendingReplies) {
319        try {
320          // Send SMS using existing sendSMS function
321          await sendSMS(reply.id);
322  
323          // Mark as sent
324          await run(
325            `UPDATE messages SET sent_at = CURRENT_TIMESTAMP, delivery_status = 'sent', updated_at = CURRENT_TIMESTAMP WHERE id = $1`,
326            [reply.id]
327          );
328  
329          logger.success(`Sent operator reply to ${reply.contact_uri} (message #${reply.id})`);
330          sent++;
331        } catch (error) {
332          logger.error(`Failed to send reply for message #${reply.id}`, error);
333          failed++;
334        }
335      }
336  
337      logger.success(
338        `Processed ${sent}/${pendingReplies.length} operator replies (${failed} failed)`
339      );
340  
341      return { sent, failed };
342    } catch (error) {
343      logger.error('Error processing pending SMS replies', error);
344      throw error;
345    }
346  }
347  
348  // CLI functionality
349  if (import.meta.url === `file://${process.argv[1]}`) {
350    const command = process.argv[2];
351  
352    if (command === 'poll') {
353      pollInboundSMS()
354        .then(result => {
355          console.log(
356            `\n✅ SMS Polling Complete: ${result.stored} new messages stored, ${result.unmatched} unmatched\n`
357          );
358          process.exit(0);
359        })
360        .catch(error => {
361          logger.error('Failed to poll SMS', error);
362          process.exit(1);
363        });
364    } else if (command === 'process-replies') {
365      processPendingReplies()
366        .then(result => {
367          console.log(
368            `\n✅ Processed operator replies: ${result.sent} sent, ${result.failed} failed\n`
369          );
370          process.exit(0);
371        })
372        .catch(error => {
373          logger.error('Failed to process replies', error);
374          process.exit(1);
375        });
376    } else {
377      console.log('Inbound SMS Management');
378      console.log('');
379      console.log('Usage:');
380      console.log('  poll               - Poll Twilio API for new inbound SMS');
381      console.log('  process-replies    - Send pending operator replies from messages table');
382      console.log('');
383      console.log('Examples:');
384      console.log('  node src/inbound/sms.js poll');
385      console.log('  node src/inbound/sms.js process-replies');
386      console.log('');
387      console.log('Setup:');
388      console.log('  Webhooks are handled by Cloudflare Workers (see cloudflare-worker/ directory)');
389      console.log('  Use polling for local testing or as backup to webhooks');
390      console.log('');
391      console.log('Setup cron for polling (every 5 minutes):');
392      console.log('  */5 * * * * cd /path/to/project && node src/inbound/sms.js poll');
393      console.log('');
394      process.exit(1);
395    }
396  }
397  
398  export default {
399    findOutreachByPhone,
400    storeInboundSMS,
401    processPendingReplies,
402    pollInboundSMS,
403  };