email.js
1 #!/usr/bin/env node 2 3 /** 4 * Email Inbound Handler 5 * Processes inbound email replies from Resend and stores them in messages table 6 */ 7 8 import { getAll, getOne, run } from '../utils/db.js'; 9 import Logger from '../utils/logger.js'; 10 import '../utils/load-env.js'; 11 12 const logger = new Logger('InboundEmail'); 13 14 /** 15 * Find outbound message by email address 16 * Returns { id, site_id, contact_uri, contact_method } from the messages table 17 */ 18 export async function findOutreachByEmail(emailAddress) { 19 // Normalize email (lowercase) 20 const normalized = emailAddress.toLowerCase().trim(); 21 22 // Find outbound message with matching email in contact_uri 23 const outreach = await getOne( 24 `SELECT id, site_id, contact_uri, contact_method 25 FROM messages 26 WHERE direction = 'outbound' 27 AND contact_method = 'email' 28 AND lower(contact_uri) = $1 29 ORDER BY sent_at DESC 30 LIMIT 1`, 31 [normalized] 32 ); 33 34 return outreach; 35 } 36 37 /** 38 * Parse quoted text from email body 39 * Simple heuristic: split on common reply delimiters 40 */ 41 export function parseEmailBody(body) { 42 if (!body) return { clean: '', quoted: '' }; 43 44 const text = typeof body === 'string' ? body : body.text || ''; 45 46 // Common reply delimiters 47 const delimiters = [ 48 /^On .* wrote:$/m, 49 /^-----Original Message-----/m, 50 /^From: .*$/m, 51 /^________________________________$/m, 52 /^>+/m, // Quoted lines starting with > 53 ]; 54 55 let cleanBody = text; 56 let quotedText = ''; 57 58 for (const delimiter of delimiters) { 59 const match = text.match(delimiter); 60 if (match && match.index) { 61 cleanBody = text.substring(0, match.index).trim(); 62 quotedText = text.substring(match.index).trim(); 63 break; 64 } 65 } 66 67 return { clean: cleanBody, quoted: quotedText }; 68 } 69 70 /** 71 * Detect sentiment using simple keyword matching 72 */ 73 export function detectSentiment(text) { 74 if (!text) return null; 75 76 const lowerText = text.toLowerCase(); 77 78 // Negative/objection signals (check these first as they're more specific) 79 const negativeKeywords = [ 80 'not interested', 81 'no thanks', 82 'remove', 83 'unsubscribe', 84 'stop', 85 "don't contact", 86 'already have', 87 'too expensive', 88 'not now', 89 'busy', 90 ]; 91 92 // Positive signals 93 const positiveKeywords = [ 94 'interested', 95 'yes', 96 'sounds good', 97 'please call', 98 "let's talk", 99 'schedule', 100 'when can', 101 'available', 102 'great', 103 'perfect', 104 'thank you', 105 ]; 106 107 // Check for negative signals first (they're more specific) 108 const hasNegative = negativeKeywords.some(keyword => lowerText.includes(keyword)); 109 if (hasNegative) return 'objection'; 110 111 // Check for positive signals 112 const hasPositive = positiveKeywords.some(keyword => lowerText.includes(keyword)); 113 if (hasPositive) return 'positive'; 114 115 // Default to neutral 116 return 'neutral'; 117 } 118 119 /** 120 * Fetch received email details from Resend API 121 */ 122 export async function fetchReceivedEmail(emailId) { 123 const apiKey = process.env.RESEND_API_KEY; 124 125 if (!apiKey) { 126 throw new Error('RESEND_API_KEY not configured'); 127 } 128 129 const url = `https://api.resend.com/emails/${emailId}`; 130 131 logger.info(`Fetching email details for ${emailId}...`); 132 133 const response = await fetch(url, { 134 headers: { 135 Authorization: `Bearer ${apiKey}`, 136 }, 137 }); 138 139 if (response.status === 404) { 140 return null; 141 } 142 143 if (!response.ok) { 144 throw new Error(`Failed to fetch email: ${response.status} ${response.statusText}`); 145 } 146 147 const email = await response.json(); 148 return email; 149 } 150 151 /** 152 * Store unmatched inbound email in messages table (no site match) 153 * Note: messages table requires site_id (NOT NULL). Unmatched emails need a 154 * catch-all site or should be logged separately. For now we skip storing 155 * truly unmatched emails and just log them. 156 * If a site_id is provided (e.g. from a fuzzy match), it will be stored. 157 */ 158 export async function storeUnmatchedEmail(fromEmail, subject, messageBody, rawPayload, siteId = null) { 159 if (!siteId) { 160 // messages.site_id is NOT NULL — cannot store without a site 161 logger.warn( 162 `Cannot store unmatched email from ${fromEmail} — no site_id available (messages.site_id is NOT NULL)` 163 ); 164 return null; 165 } 166 167 const { clean } = parseEmailBody(messageBody); 168 const sentiment = detectSentiment(clean); 169 170 const result = await run( 171 `INSERT INTO messages ( 172 site_id, direction, contact_method, contact_uri, 173 message_body, subject_line, sentiment, raw_payload 174 ) VALUES ($1, 'inbound', 'email', $2, $3, $4, $5, $6) 175 RETURNING id`, 176 [siteId, fromEmail, clean, subject, sentiment, JSON.stringify(rawPayload)] 177 ); 178 179 logger.info( 180 `Stored unmatched email from ${fromEmail} as message #${result.lastInsertRowid} (site #${siteId})` 181 ); 182 183 return result.lastInsertRowid; 184 } 185 186 /** 187 * Store inbound email in messages table 188 */ 189 export async function storeInboundEmail(siteId, fromEmail, subject, messageBody, rawPayload) { 190 // Parse email body to extract clean content 191 const { clean } = parseEmailBody(messageBody); 192 193 // Detect sentiment 194 const sentiment = detectSentiment(clean); 195 196 const result = await run( 197 `INSERT INTO messages ( 198 site_id, direction, contact_method, contact_uri, 199 message_body, subject_line, sentiment, raw_payload 200 ) VALUES ($1, 'inbound', 'email', $2, $3, $4, $5, $6) 201 RETURNING id`, 202 [siteId, fromEmail, clean, subject, sentiment, JSON.stringify(rawPayload)] 203 ); 204 205 logger.success( 206 `Stored inbound email from ${fromEmail} for site #${siteId} (message #${result.lastInsertRowid}, sentiment: ${sentiment})` 207 ); 208 209 return result.lastInsertRowid; 210 } 211 212 /** 213 * Poll for inbound email replies via Cloudflare Worker 214 * Reads email.received events from R2 and fetches full email content from Resend API 215 */ 216 // eslint-disable-next-line complexity -- Email polling requires multiple conditional checks 217 export async function pollInboundEmails() { 218 const workerUrl = process.env.EMAIL_EVENTS_WORKER_URL; 219 220 if (!workerUrl) { 221 throw new Error('EMAIL_EVENTS_WORKER_URL not configured in .env'); 222 } 223 224 try { 225 // Always poll last 24 hours (simplified approach - no timestamp tracking) 226 const lastPoll = new Date(); 227 lastPoll.setHours(lastPoll.getHours() - 24); 228 logger.info( 229 `Polling for email.received events from last 24 hours (since ${lastPoll.toISOString()})...` 230 ); 231 232 // Fetch events from worker 233 const url = `${workerUrl}/email-events.json`; 234 const response = await fetch(url, { 235 headers: { 'X-Auth-Secret': process.env.RESEND_WORKER_SECRET || '' }, 236 }); 237 238 if (!response.ok) { 239 throw new Error(`Failed to fetch events: ${response.status} ${response.statusText}`); 240 } 241 242 const allEvents = await response.json(); 243 244 // Filter for email.received events only, excluding test addresses 245 const receivedEvents = allEvents.filter(event => { 246 if (event.type !== 'email.received') return false; 247 if (new Date(event.created_at) <= lastPoll) return false; 248 const from = event.data?.from || ''; 249 if (/^test-[^@]+@/i.test(from)) { 250 logger.info(`Skipping test address: ${from}`); 251 return false; 252 } 253 return true; 254 }); 255 256 if (receivedEvents.length === 0) { 257 logger.info('No new inbound email messages'); 258 return { processed: 0, stored: 0, unmatched: 0 }; 259 } 260 261 logger.info(`Found ${receivedEvents.length} new inbound email messages`); 262 263 let stored = 0; 264 let unmatched = 0; 265 266 for (const event of receivedEvents) { 267 try { 268 const { from, subject, email_id } = event.data || {}; 269 270 if (!from || !email_id) { 271 logger.warn('Missing from or email_id in email.received event'); 272 unmatched++; 273 continue; 274 } 275 276 logger.info(`Processing email from ${from}: "${subject || '(no subject)'}"`); 277 278 // Find matching outreach 279 const outreach = await findOutreachByEmail(from); 280 281 // Check if this message was already stored (by email_id) — works for both matched and unmatched 282 const existing = await getOne( 283 `SELECT id FROM messages WHERE raw_payload::text LIKE $1`, 284 [`%${email_id}%`] 285 ); 286 287 if (existing) { 288 logger.info(`Email ${email_id} already stored, skipping`); 289 continue; 290 } 291 292 // Fetch full email content from Resend API 293 const emailDetails = await fetchReceivedEmail(email_id); 294 295 if (!emailDetails) { 296 // Email no longer exists in Resend (404) — store stub to prevent retry loops 297 logger.warn( 298 `Email ${email_id} not found in Resend (404), storing stub to prevent retries` 299 ); 300 const stubSiteId = outreach ? outreach.site_id : null; 301 await storeUnmatchedEmail( 302 from, 303 subject || '(no subject)', 304 `[Email content unavailable - Resend returned 404 for ${email_id}]`, 305 { email_id, from, subject, created_at: event.created_at, resend_404: true }, 306 stubSiteId 307 ); 308 unmatched++; 309 if (stubSiteId) stored++; 310 continue; 311 } 312 313 const rawPayload = { 314 email_id, 315 from, 316 to: emailDetails.to, 317 subject, 318 created_at: event.created_at, 319 text: emailDetails.text, 320 html: emailDetails.html, 321 }; 322 323 if (!outreach) { 324 logger.warn(`No outreach found for email address: ${from} — storing as unmatched`); 325 await storeUnmatchedEmail( 326 from, 327 subject || '(no subject)', 328 emailDetails.text || emailDetails.html || '', 329 rawPayload 330 ); 331 unmatched++; 332 continue; 333 } 334 335 const emailBody = emailDetails.text || emailDetails.html || ''; 336 337 // Store in messages table 338 await storeInboundEmail(outreach.site_id, from, subject || '(no subject)', emailBody, rawPayload); 339 340 // Check for refund request 341 const { processRefundRequest } = await import('../payment/refund-processor.js'); 342 const refundResult = await processRefundRequest(from, emailBody); 343 if (refundResult.processed) { 344 logger.success(`Auto-refund issued for ${from} (purchase #${refundResult.purchaseId})`); 345 } 346 347 stored++; 348 } catch (error) { 349 logger.error(`Failed to process email event: ${error.message}`); 350 unmatched++; 351 } 352 } 353 354 // Clear all email.received events from worker now that we've processed them 355 const clearUrl = `${workerUrl}/email-events.json`; 356 await fetch(clearUrl, { 357 method: 'DELETE', 358 headers: { 'X-Auth-Secret': process.env.RESEND_WORKER_SECRET || '' }, 359 }); 360 361 logger.success( 362 `Polling complete: ${stored} messages stored, ${unmatched} unmatched, ${receivedEvents.length} total` 363 ); 364 365 return { processed: receivedEvents.length, stored, unmatched }; 366 } catch (error) { 367 logger.error('Error polling for inbound emails', error); 368 throw error; 369 } 370 } 371 372 /** 373 * Process pending operator replies from messages table 374 * Send outbound emails for messages marked as direction='outbound' and not yet sent 375 */ 376 export async function processPendingReplies() { 377 try { 378 // Find operator reply messages that haven't been sent yet 379 // IMPORTANT: scoped to message_type='reply' to avoid sending initial outreach proposals 380 const allPendingReplies = await getAll( 381 `SELECT id, site_id, message_body, subject_line, contact_uri 382 FROM messages 383 WHERE direction = 'outbound' 384 AND contact_method = 'email' 385 AND message_type = 'reply' 386 AND sent_at IS NULL 387 AND delivery_status IS NULL 388 ORDER BY created_at`, 389 [] 390 ); 391 392 if (allPendingReplies.length === 0) { 393 logger.info('No pending operator email replies to send'); 394 return { sent: 0, failed: 0 }; 395 } 396 397 logger.info(`Processing ${allPendingReplies.length} pending operator email replies...`); 398 399 const { sendEmail } = await import('../outreach/email.js'); 400 let sent = 0; 401 let failed = 0; 402 403 for (const reply of allPendingReplies) { 404 try { 405 // Send email using existing sendEmail function 406 await sendEmail(reply.id); 407 408 // Mark as sent 409 await run( 410 `UPDATE messages SET sent_at = CURRENT_TIMESTAMP, delivery_status = 'sent', updated_at = CURRENT_TIMESTAMP WHERE id = $1`, 411 [reply.id] 412 ); 413 414 logger.success(`Sent operator reply to ${reply.contact_uri} (message #${reply.id})`); 415 sent++; 416 } catch (error) { 417 logger.error(`Failed to send reply for message #${reply.id}`, error); 418 failed++; 419 } 420 } 421 422 logger.success( 423 `Processed ${sent}/${allPendingReplies.length} operator email replies (${failed} failed)` 424 ); 425 426 return { sent, failed }; 427 } catch (error) { 428 logger.error('Error processing pending email replies', error); 429 throw error; 430 } 431 } 432 433 // CLI functionality 434 if (import.meta.url === `file://${process.argv[1]}`) { 435 const command = process.argv[2]; 436 437 if (command === 'poll') { 438 pollInboundEmails() 439 .then(result => { 440 console.log( 441 `\n✅ Email Polling Complete: ${result.stored} new messages stored, ${result.unmatched} unmatched\n` 442 ); 443 process.exit(0); 444 }) 445 .catch(error => { 446 logger.error('Failed to poll emails', error); 447 process.exit(1); 448 }); 449 } else if (command === 'process-replies') { 450 processPendingReplies() 451 .then(result => { 452 console.log( 453 `\n✅ Processed operator replies: ${result.sent} sent, ${result.failed} failed\n` 454 ); 455 process.exit(0); 456 }) 457 .catch(error => { 458 logger.error('Failed to process replies', error); 459 process.exit(1); 460 }); 461 } else { 462 console.log('Inbound Email Management'); 463 console.log(''); 464 console.log('Usage:'); 465 console.log(' poll - Poll Resend API for new inbound emails'); 466 console.log(' process-replies - Send pending operator replies from messages table'); 467 console.log(''); 468 console.log('Examples:'); 469 console.log(' node src/inbound/email.js poll'); 470 console.log(' node src/inbound/email.js process-replies'); 471 console.log(''); 472 console.log('Setup:'); 473 console.log(' Configure Resend webhooks to point to your Cloudflare Worker'); 474 console.log(' Set EMAIL_EVENTS_WORKER_URL in .env'); 475 console.log(''); 476 console.log('Setup cron for polling (every 5 minutes):'); 477 console.log(' */5 * * * * cd /path/to/project && node src/inbound/email.js poll'); 478 console.log(''); 479 process.exit(1); 480 } 481 } 482 483 export default { 484 findOutreachByEmail, 485 parseEmailBody, 486 detectSentiment, 487 fetchReceivedEmail, 488 storeInboundEmail, 489 storeUnmatchedEmail, 490 pollInboundEmails, 491 processPendingReplies, 492 };