/ src / inbound / email.js
email.js
  1  #!/usr/bin/env node
  2  
  3  /**
  4   * Email Inbound Handler
  5   * Processes inbound email replies from Resend and stores them in messages table
  6   */
  7  
  8  import { getAll, getOne, run } from '../utils/db.js';
  9  import Logger from '../utils/logger.js';
 10  import '../utils/load-env.js';
 11  
 12  const logger = new Logger('InboundEmail');
 13  
 14  /**
 15   * Find outbound message by email address
 16   * Returns { id, site_id, contact_uri, contact_method } from the messages table
 17   */
 18  export async function findOutreachByEmail(emailAddress) {
 19    // Normalize email (lowercase)
 20    const normalized = emailAddress.toLowerCase().trim();
 21  
 22    // Find outbound message with matching email in contact_uri
 23    const outreach = await getOne(
 24      `SELECT id, site_id, contact_uri, contact_method
 25       FROM messages
 26       WHERE direction = 'outbound'
 27       AND contact_method = 'email'
 28       AND lower(contact_uri) = $1
 29       ORDER BY sent_at DESC
 30       LIMIT 1`,
 31      [normalized]
 32    );
 33  
 34    return outreach;
 35  }
 36  
 37  /**
 38   * Parse quoted text from email body
 39   * Simple heuristic: split on common reply delimiters
 40   */
 41  export function parseEmailBody(body) {
 42    if (!body) return { clean: '', quoted: '' };
 43  
 44    const text = typeof body === 'string' ? body : body.text || '';
 45  
 46    // Common reply delimiters
 47    const delimiters = [
 48      /^On .* wrote:$/m,
 49      /^-----Original Message-----/m,
 50      /^From: .*$/m,
 51      /^________________________________$/m,
 52      /^>+/m, // Quoted lines starting with >
 53    ];
 54  
 55    let cleanBody = text;
 56    let quotedText = '';
 57  
 58    for (const delimiter of delimiters) {
 59      const match = text.match(delimiter);
 60      if (match && match.index) {
 61        cleanBody = text.substring(0, match.index).trim();
 62        quotedText = text.substring(match.index).trim();
 63        break;
 64      }
 65    }
 66  
 67    return { clean: cleanBody, quoted: quotedText };
 68  }
 69  
 70  /**
 71   * Detect sentiment using simple keyword matching
 72   */
 73  export function detectSentiment(text) {
 74    if (!text) return null;
 75  
 76    const lowerText = text.toLowerCase();
 77  
 78    // Negative/objection signals (check these first as they're more specific)
 79    const negativeKeywords = [
 80      'not interested',
 81      'no thanks',
 82      'remove',
 83      'unsubscribe',
 84      'stop',
 85      "don't contact",
 86      'already have',
 87      'too expensive',
 88      'not now',
 89      'busy',
 90    ];
 91  
 92    // Positive signals
 93    const positiveKeywords = [
 94      'interested',
 95      'yes',
 96      'sounds good',
 97      'please call',
 98      "let's talk",
 99      'schedule',
100      'when can',
101      'available',
102      'great',
103      'perfect',
104      'thank you',
105    ];
106  
107    // Check for negative signals first (they're more specific)
108    const hasNegative = negativeKeywords.some(keyword => lowerText.includes(keyword));
109    if (hasNegative) return 'objection';
110  
111    // Check for positive signals
112    const hasPositive = positiveKeywords.some(keyword => lowerText.includes(keyword));
113    if (hasPositive) return 'positive';
114  
115    // Default to neutral
116    return 'neutral';
117  }
118  
119  /**
120   * Fetch received email details from Resend API
121   */
122  export async function fetchReceivedEmail(emailId) {
123    const apiKey = process.env.RESEND_API_KEY;
124  
125    if (!apiKey) {
126      throw new Error('RESEND_API_KEY not configured');
127    }
128  
129    const url = `https://api.resend.com/emails/${emailId}`;
130  
131    logger.info(`Fetching email details for ${emailId}...`);
132  
133    const response = await fetch(url, {
134      headers: {
135        Authorization: `Bearer ${apiKey}`,
136      },
137    });
138  
139    if (response.status === 404) {
140      return null;
141    }
142  
143    if (!response.ok) {
144      throw new Error(`Failed to fetch email: ${response.status} ${response.statusText}`);
145    }
146  
147    const email = await response.json();
148    return email;
149  }
150  
151  /**
152   * Store unmatched inbound email in messages table (no site match)
153   * Note: messages table requires site_id (NOT NULL). Unmatched emails need a
154   * catch-all site or should be logged separately. For now we skip storing
155   * truly unmatched emails and just log them.
156   * If a site_id is provided (e.g. from a fuzzy match), it will be stored.
157   */
158  export async function storeUnmatchedEmail(fromEmail, subject, messageBody, rawPayload, siteId = null) {
159    if (!siteId) {
160      // messages.site_id is NOT NULL — cannot store without a site
161      logger.warn(
162        `Cannot store unmatched email from ${fromEmail} — no site_id available (messages.site_id is NOT NULL)`
163      );
164      return null;
165    }
166  
167    const { clean } = parseEmailBody(messageBody);
168    const sentiment = detectSentiment(clean);
169  
170    const result = await run(
171      `INSERT INTO messages (
172        site_id, direction, contact_method, contact_uri,
173        message_body, subject_line, sentiment, raw_payload
174      ) VALUES ($1, 'inbound', 'email', $2, $3, $4, $5, $6)
175      RETURNING id`,
176      [siteId, fromEmail, clean, subject, sentiment, JSON.stringify(rawPayload)]
177    );
178  
179    logger.info(
180      `Stored unmatched email from ${fromEmail} as message #${result.lastInsertRowid} (site #${siteId})`
181    );
182  
183    return result.lastInsertRowid;
184  }
185  
186  /**
187   * Store inbound email in messages table
188   */
189  export async function storeInboundEmail(siteId, fromEmail, subject, messageBody, rawPayload) {
190    // Parse email body to extract clean content
191    const { clean } = parseEmailBody(messageBody);
192  
193    // Detect sentiment
194    const sentiment = detectSentiment(clean);
195  
196    const result = await run(
197      `INSERT INTO messages (
198        site_id, direction, contact_method, contact_uri,
199        message_body, subject_line, sentiment, raw_payload
200      ) VALUES ($1, 'inbound', 'email', $2, $3, $4, $5, $6)
201      RETURNING id`,
202      [siteId, fromEmail, clean, subject, sentiment, JSON.stringify(rawPayload)]
203    );
204  
205    logger.success(
206      `Stored inbound email from ${fromEmail} for site #${siteId} (message #${result.lastInsertRowid}, sentiment: ${sentiment})`
207    );
208  
209    return result.lastInsertRowid;
210  }
211  
212  /**
213   * Poll for inbound email replies via Cloudflare Worker
214   * Reads email.received events from R2 and fetches full email content from Resend API
215   */
216  // eslint-disable-next-line complexity -- Email polling requires multiple conditional checks
217  export async function pollInboundEmails() {
218    const workerUrl = process.env.EMAIL_EVENTS_WORKER_URL;
219  
220    if (!workerUrl) {
221      throw new Error('EMAIL_EVENTS_WORKER_URL not configured in .env');
222    }
223  
224    try {
225      // Always poll last 24 hours (simplified approach - no timestamp tracking)
226      const lastPoll = new Date();
227      lastPoll.setHours(lastPoll.getHours() - 24);
228      logger.info(
229        `Polling for email.received events from last 24 hours (since ${lastPoll.toISOString()})...`
230      );
231  
232      // Fetch events from worker
233      const url = `${workerUrl}/email-events.json`;
234      const response = await fetch(url, {
235        headers: { 'X-Auth-Secret': process.env.RESEND_WORKER_SECRET || '' },
236      });
237  
238      if (!response.ok) {
239        throw new Error(`Failed to fetch events: ${response.status} ${response.statusText}`);
240      }
241  
242      const allEvents = await response.json();
243  
244      // Filter for email.received events only, excluding test addresses
245      const receivedEvents = allEvents.filter(event => {
246        if (event.type !== 'email.received') return false;
247        if (new Date(event.created_at) <= lastPoll) return false;
248        const from = event.data?.from || '';
249        if (/^test-[^@]+@/i.test(from)) {
250          logger.info(`Skipping test address: ${from}`);
251          return false;
252        }
253        return true;
254      });
255  
256      if (receivedEvents.length === 0) {
257        logger.info('No new inbound email messages');
258        return { processed: 0, stored: 0, unmatched: 0 };
259      }
260  
261      logger.info(`Found ${receivedEvents.length} new inbound email messages`);
262  
263      let stored = 0;
264      let unmatched = 0;
265  
266      for (const event of receivedEvents) {
267        try {
268          const { from, subject, email_id } = event.data || {};
269  
270          if (!from || !email_id) {
271            logger.warn('Missing from or email_id in email.received event');
272            unmatched++;
273            continue;
274          }
275  
276          logger.info(`Processing email from ${from}: "${subject || '(no subject)'}"`);
277  
278          // Find matching outreach
279          const outreach = await findOutreachByEmail(from);
280  
281          // Check if this message was already stored (by email_id) — works for both matched and unmatched
282          const existing = await getOne(
283            `SELECT id FROM messages WHERE raw_payload::text LIKE $1`,
284            [`%${email_id}%`]
285          );
286  
287          if (existing) {
288            logger.info(`Email ${email_id} already stored, skipping`);
289            continue;
290          }
291  
292          // Fetch full email content from Resend API
293          const emailDetails = await fetchReceivedEmail(email_id);
294  
295          if (!emailDetails) {
296            // Email no longer exists in Resend (404) — store stub to prevent retry loops
297            logger.warn(
298              `Email ${email_id} not found in Resend (404), storing stub to prevent retries`
299            );
300            const stubSiteId = outreach ? outreach.site_id : null;
301            await storeUnmatchedEmail(
302              from,
303              subject || '(no subject)',
304              `[Email content unavailable - Resend returned 404 for ${email_id}]`,
305              { email_id, from, subject, created_at: event.created_at, resend_404: true },
306              stubSiteId
307            );
308            unmatched++;
309            if (stubSiteId) stored++;
310            continue;
311          }
312  
313          const rawPayload = {
314            email_id,
315            from,
316            to: emailDetails.to,
317            subject,
318            created_at: event.created_at,
319            text: emailDetails.text,
320            html: emailDetails.html,
321          };
322  
323          if (!outreach) {
324            logger.warn(`No outreach found for email address: ${from} — storing as unmatched`);
325            await storeUnmatchedEmail(
326              from,
327              subject || '(no subject)',
328              emailDetails.text || emailDetails.html || '',
329              rawPayload
330            );
331            unmatched++;
332            continue;
333          }
334  
335          const emailBody = emailDetails.text || emailDetails.html || '';
336  
337          // Store in messages table
338          await storeInboundEmail(outreach.site_id, from, subject || '(no subject)', emailBody, rawPayload);
339  
340          // Check for refund request
341          const { processRefundRequest } = await import('../payment/refund-processor.js');
342          const refundResult = await processRefundRequest(from, emailBody);
343          if (refundResult.processed) {
344            logger.success(`Auto-refund issued for ${from} (purchase #${refundResult.purchaseId})`);
345          }
346  
347          stored++;
348        } catch (error) {
349          logger.error(`Failed to process email event: ${error.message}`);
350          unmatched++;
351        }
352      }
353  
354      // Clear all email.received events from worker now that we've processed them
355      const clearUrl = `${workerUrl}/email-events.json`;
356      await fetch(clearUrl, {
357        method: 'DELETE',
358        headers: { 'X-Auth-Secret': process.env.RESEND_WORKER_SECRET || '' },
359      });
360  
361      logger.success(
362        `Polling complete: ${stored} messages stored, ${unmatched} unmatched, ${receivedEvents.length} total`
363      );
364  
365      return { processed: receivedEvents.length, stored, unmatched };
366    } catch (error) {
367      logger.error('Error polling for inbound emails', error);
368      throw error;
369    }
370  }
371  
372  /**
373   * Process pending operator replies from messages table
374   * Send outbound emails for messages marked as direction='outbound' and not yet sent
375   */
376  export async function processPendingReplies() {
377    try {
378      // Find operator reply messages that haven't been sent yet
379      // IMPORTANT: scoped to message_type='reply' to avoid sending initial outreach proposals
380      const allPendingReplies = await getAll(
381        `SELECT id, site_id, message_body, subject_line, contact_uri
382         FROM messages
383         WHERE direction = 'outbound'
384         AND contact_method = 'email'
385         AND message_type = 'reply'
386         AND sent_at IS NULL
387         AND delivery_status IS NULL
388         ORDER BY created_at`,
389        []
390      );
391  
392      if (allPendingReplies.length === 0) {
393        logger.info('No pending operator email replies to send');
394        return { sent: 0, failed: 0 };
395      }
396  
397      logger.info(`Processing ${allPendingReplies.length} pending operator email replies...`);
398  
399      const { sendEmail } = await import('../outreach/email.js');
400      let sent = 0;
401      let failed = 0;
402  
403      for (const reply of allPendingReplies) {
404        try {
405          // Send email using existing sendEmail function
406          await sendEmail(reply.id);
407  
408          // Mark as sent
409          await run(
410            `UPDATE messages SET sent_at = CURRENT_TIMESTAMP, delivery_status = 'sent', updated_at = CURRENT_TIMESTAMP WHERE id = $1`,
411            [reply.id]
412          );
413  
414          logger.success(`Sent operator reply to ${reply.contact_uri} (message #${reply.id})`);
415          sent++;
416        } catch (error) {
417          logger.error(`Failed to send reply for message #${reply.id}`, error);
418          failed++;
419        }
420      }
421  
422      logger.success(
423        `Processed ${sent}/${allPendingReplies.length} operator email replies (${failed} failed)`
424      );
425  
426      return { sent, failed };
427    } catch (error) {
428      logger.error('Error processing pending email replies', error);
429      throw error;
430    }
431  }
432  
433  // CLI functionality
434  if (import.meta.url === `file://${process.argv[1]}`) {
435    const command = process.argv[2];
436  
437    if (command === 'poll') {
438      pollInboundEmails()
439        .then(result => {
440          console.log(
441            `\n✅ Email Polling Complete: ${result.stored} new messages stored, ${result.unmatched} unmatched\n`
442          );
443          process.exit(0);
444        })
445        .catch(error => {
446          logger.error('Failed to poll emails', error);
447          process.exit(1);
448        });
449    } else if (command === 'process-replies') {
450      processPendingReplies()
451        .then(result => {
452          console.log(
453            `\n✅ Processed operator replies: ${result.sent} sent, ${result.failed} failed\n`
454          );
455          process.exit(0);
456        })
457        .catch(error => {
458          logger.error('Failed to process replies', error);
459          process.exit(1);
460        });
461    } else {
462      console.log('Inbound Email Management');
463      console.log('');
464      console.log('Usage:');
465      console.log('  poll               - Poll Resend API for new inbound emails');
466      console.log('  process-replies    - Send pending operator replies from messages table');
467      console.log('');
468      console.log('Examples:');
469      console.log('  node src/inbound/email.js poll');
470      console.log('  node src/inbound/email.js process-replies');
471      console.log('');
472      console.log('Setup:');
473      console.log('  Configure Resend webhooks to point to your Cloudflare Worker');
474      console.log('  Set EMAIL_EVENTS_WORKER_URL in .env');
475      console.log('');
476      console.log('Setup cron for polling (every 5 minutes):');
477      console.log('  */5 * * * * cd /path/to/project && node src/inbound/email.js poll');
478      console.log('');
479      process.exit(1);
480    }
481  }
482  
483  export default {
484    findOutreachByEmail,
485    parseEmailBody,
486    detectSentiment,
487    fetchReceivedEmail,
488    storeInboundEmail,
489    storeUnmatchedEmail,
490    pollInboundEmails,
491    processPendingReplies,
492  };