sync-unsubscribes.js
1 #!/usr/bin/env node 2 3 /** 4 * Sync Unsubscribes from Cloudflare Worker 5 * 6 * Polls the Cloudflare Worker's unsubscribes.json endpoint and imports 7 * unsubscribe requests into the local PostgreSQL database. 8 * 9 * Usage: 10 * node src/utils/sync-unsubscribes.js 11 * npm run sync-unsubscribes 12 */ 13 14 import Logger from './logger.js'; 15 import { retryWithBackoff, isRetryableError } from './error-handler.js'; 16 import { run, getOne, withTransaction } from './db.js'; 17 import './load-env.js'; 18 19 const logger = new Logger('SyncUnsubscribes'); 20 21 /** 22 * Fetch unsubscribes from Cloudflare Worker 23 */ 24 async function fetchUnsubscribes() { 25 const workerUrl = process.env.UNSUBSCRIBE_WORKER_URL; 26 if (!workerUrl) { 27 throw new Error('UNSUBSCRIBE_WORKER_URL not configured in .env'); 28 } 29 30 const url = `${workerUrl}/unsubscribes.json`; 31 logger.info(`Fetching unsubscribes from: ${url}`); 32 33 return await retryWithBackoff( 34 async () => { 35 const response = await fetch(url); 36 37 if (!response.ok) { 38 throw new Error(`Failed to fetch unsubscribes: ${response.status} ${response.statusText}`); 39 } 40 41 const unsubscribes = await response.json(); 42 return Array.isArray(unsubscribes) ? unsubscribes : []; 43 }, 44 { 45 maxRetries: 3, 46 shouldRetry: isRetryableError, 47 } 48 ); 49 } 50 51 /** 52 * Process unsubscribes and import to database 53 * @param {Array} unsubscribes 54 * @returns {Promise<{processed: number, skipped: number, errors: number}>} 55 */ 56 async function processUnsubscribes(unsubscribes) { 57 if (unsubscribes.length === 0) { 58 logger.info('No unsubscribes to process'); 59 return { processed: 0, skipped: 0, errors: 0 }; 60 } 61 62 const stats = { 63 processed: 0, 64 skipped: 0, 65 errors: 0, 66 }; 67 68 await withTransaction(async (client) => { 69 for (const unsub of unsubscribes) { 70 try { 71 const { outreachId: messageId, timestamp } = unsub; 72 73 // Skip sentinel/placeholder IDs from the worker (e.g. test entries with id=999999999) 74 if (!messageId || messageId >= 999999000) { 75 logger.info(`Skipping sentinel message ID: ${messageId}`); 76 stats.skipped++; 77 continue; 78 } 79 80 // Get message details 81 const { rows: msgRows } = await client.query( 82 `SELECT m.contact_uri, s.domain 83 FROM messages m 84 JOIN sites s ON m.site_id = s.id 85 WHERE m.id = $1 AND m.contact_method = 'email' AND m.direction = 'outbound'`, 86 [messageId] 87 ); 88 const message = msgRows[0] ?? null; 89 90 if (!message) { 91 logger.warn(`Message #${messageId} not found or not an email message`); 92 stats.skipped++; 93 continue; 94 } 95 96 const email = message.contact_uri; 97 98 if (email === 'PENDING_CONTACT_EXTRACTION' || !email.includes('@')) { 99 logger.warn(`Invalid email for message #${messageId}: ${email}`); 100 stats.skipped++; 101 continue; 102 } 103 104 // Insert into global unsubscribe list (ignore if already present) 105 // Note: unsubscribe tracking is via unsubscribed_emails table; no unsubscribed column on messages 106 const { rowCount } = await client.query( 107 `INSERT INTO unsubscribed_emails (email, message_id, source, unsubscribed_at) 108 VALUES ($1, $2, 'web', $3) 109 ON CONFLICT DO NOTHING`, 110 [email, messageId, timestamp] 111 ); 112 113 if (rowCount > 0) { 114 logger.success(`Unsubscribed: ${email} (message #${messageId} for ${message.domain})`); 115 stats.processed++; 116 } else { 117 logger.info(`Already unsubscribed: ${email} (message #${messageId})`); 118 stats.skipped++; 119 } 120 } catch (err) { 121 logger.error(`Error processing message #${unsub.outreachId}:`, err); 122 stats.errors++; 123 } 124 } 125 }); 126 127 return stats; 128 } 129 130 /** 131 * Clear processed unsubscribes from worker (optional) 132 */ 133 // eslint-disable-next-line require-await -- Placeholder for future implementation 134 async function _clearProcessedUnsubscribes() { 135 // For now, we'll keep the history in R2 136 // If you want to clear after processing, implement a DELETE endpoint in the worker 137 // and call it here with authentication 138 139 // Example: 140 // await fetch(`${workerUrl}/clear`, { method: 'DELETE' }); 141 142 logger.info('Keeping unsubscribe history in Cloudflare R2 for audit trail'); 143 } 144 145 /** 146 * Export sync function for use in other modules 147 */ 148 export async function syncUnsubscribes() { 149 try { 150 logger.info('Starting unsubscribe sync...'); 151 152 const unsubscribes = await fetchUnsubscribes(); 153 logger.info(`Found ${unsubscribes.length} unsubscribe(s) from worker`); 154 155 const stats = await processUnsubscribes(unsubscribes); 156 157 logger.info('Sync complete:'); 158 logger.info(` Processed: ${stats.processed}`); 159 logger.info(` Skipped: ${stats.skipped}`); 160 logger.info(` Errors: ${stats.errors}`); 161 162 return stats; 163 } catch (error) { 164 logger.error('Failed to sync unsubscribes:', error); 165 throw error; 166 } 167 } 168 169 /** 170 * Check if an email is globally unsubscribed 171 * @param {string} email 172 * @returns {Promise<boolean>} 173 */ 174 export async function isEmailUnsubscribed(email) { 175 const result = await getOne( 176 'SELECT 1 FROM unsubscribed_emails WHERE lower(email) = lower($1)', 177 [email] 178 ); 179 return !!result; 180 } 181 182 /** 183 * Get count of globally unsubscribed emails 184 * @returns {Promise<number>} 185 */ 186 export async function getUnsubscribeCount() { 187 const result = await getOne('SELECT COUNT(*) AS count FROM unsubscribed_emails', []); 188 return parseInt(result.count, 10); 189 } 190 191 // CLI functionality 192 if (import.meta.url === `file://${process.argv[1]}`) { 193 syncUnsubscribes() 194 .then(stats => { 195 if (stats.errors > 0) { 196 process.exit(1); 197 } 198 process.exit(0); 199 }) 200 .catch(error => { 201 console.error(`\n❌ Sync failed: ${error.message}\n`); 202 process.exit(1); 203 }); 204 } 205 206 export default { 207 syncUnsubscribes, 208 isEmailUnsubscribed, 209 getUnsubscribeCount, 210 };