/ src / utils / sync-unsubscribes.js
sync-unsubscribes.js
  1  #!/usr/bin/env node
  2  
  3  /**
  4   * Sync Unsubscribes from Cloudflare Worker
  5   *
  6   * Polls the Cloudflare Worker's unsubscribes.json endpoint and imports
  7   * unsubscribe requests into the local PostgreSQL database.
  8   *
  9   * Usage:
 10   *   node src/utils/sync-unsubscribes.js
 11   *   npm run sync-unsubscribes
 12   */
 13  
 14  import Logger from './logger.js';
 15  import { retryWithBackoff, isRetryableError } from './error-handler.js';
 16  import { run, getOne, withTransaction } from './db.js';
 17  import './load-env.js';
 18  
 19  const logger = new Logger('SyncUnsubscribes');
 20  
 21  /**
 22   * Fetch unsubscribes from Cloudflare Worker
 23   */
 24  async function fetchUnsubscribes() {
 25    const workerUrl = process.env.UNSUBSCRIBE_WORKER_URL;
 26    if (!workerUrl) {
 27      throw new Error('UNSUBSCRIBE_WORKER_URL not configured in .env');
 28    }
 29  
 30    const url = `${workerUrl}/unsubscribes.json`;
 31    logger.info(`Fetching unsubscribes from: ${url}`);
 32  
 33    return await retryWithBackoff(
 34      async () => {
 35        const response = await fetch(url);
 36  
 37        if (!response.ok) {
 38          throw new Error(`Failed to fetch unsubscribes: ${response.status} ${response.statusText}`);
 39        }
 40  
 41        const unsubscribes = await response.json();
 42        return Array.isArray(unsubscribes) ? unsubscribes : [];
 43      },
 44      {
 45        maxRetries: 3,
 46        shouldRetry: isRetryableError,
 47      }
 48    );
 49  }
 50  
 51  /**
 52   * Process unsubscribes and import to database
 53   * @param {Array} unsubscribes
 54   * @returns {Promise<{processed: number, skipped: number, errors: number}>}
 55   */
 56  async function processUnsubscribes(unsubscribes) {
 57    if (unsubscribes.length === 0) {
 58      logger.info('No unsubscribes to process');
 59      return { processed: 0, skipped: 0, errors: 0 };
 60    }
 61  
 62    const stats = {
 63      processed: 0,
 64      skipped: 0,
 65      errors: 0,
 66    };
 67  
 68    await withTransaction(async (client) => {
 69      for (const unsub of unsubscribes) {
 70        try {
 71          const { outreachId: messageId, timestamp } = unsub;
 72  
 73          // Skip sentinel/placeholder IDs from the worker (e.g. test entries with id=999999999)
 74          if (!messageId || messageId >= 999999000) {
 75            logger.info(`Skipping sentinel message ID: ${messageId}`);
 76            stats.skipped++;
 77            continue;
 78          }
 79  
 80          // Get message details
 81          const { rows: msgRows } = await client.query(
 82            `SELECT m.contact_uri, s.domain
 83             FROM messages m
 84             JOIN sites s ON m.site_id = s.id
 85             WHERE m.id = $1 AND m.contact_method = 'email' AND m.direction = 'outbound'`,
 86            [messageId]
 87          );
 88          const message = msgRows[0] ?? null;
 89  
 90          if (!message) {
 91            logger.warn(`Message #${messageId} not found or not an email message`);
 92            stats.skipped++;
 93            continue;
 94          }
 95  
 96          const email = message.contact_uri;
 97  
 98          if (email === 'PENDING_CONTACT_EXTRACTION' || !email.includes('@')) {
 99            logger.warn(`Invalid email for message #${messageId}: ${email}`);
100            stats.skipped++;
101            continue;
102          }
103  
104          // Insert into global unsubscribe list (ignore if already present)
105          // Note: unsubscribe tracking is via unsubscribed_emails table; no unsubscribed column on messages
106          const { rowCount } = await client.query(
107            `INSERT INTO unsubscribed_emails (email, message_id, source, unsubscribed_at)
108             VALUES ($1, $2, 'web', $3)
109             ON CONFLICT DO NOTHING`,
110            [email, messageId, timestamp]
111          );
112  
113          if (rowCount > 0) {
114            logger.success(`Unsubscribed: ${email} (message #${messageId} for ${message.domain})`);
115            stats.processed++;
116          } else {
117            logger.info(`Already unsubscribed: ${email} (message #${messageId})`);
118            stats.skipped++;
119          }
120        } catch (err) {
121          logger.error(`Error processing message #${unsub.outreachId}:`, err);
122          stats.errors++;
123        }
124      }
125    });
126  
127    return stats;
128  }
129  
130  /**
131   * Clear processed unsubscribes from worker (optional)
132   */
133  // eslint-disable-next-line require-await -- Placeholder for future implementation
134  async function _clearProcessedUnsubscribes() {
135    // For now, we'll keep the history in R2
136    // If you want to clear after processing, implement a DELETE endpoint in the worker
137    // and call it here with authentication
138  
139    // Example:
140    // await fetch(`${workerUrl}/clear`, { method: 'DELETE' });
141  
142    logger.info('Keeping unsubscribe history in Cloudflare R2 for audit trail');
143  }
144  
145  /**
146   * Export sync function for use in other modules
147   */
148  export async function syncUnsubscribes() {
149    try {
150      logger.info('Starting unsubscribe sync...');
151  
152      const unsubscribes = await fetchUnsubscribes();
153      logger.info(`Found ${unsubscribes.length} unsubscribe(s) from worker`);
154  
155      const stats = await processUnsubscribes(unsubscribes);
156  
157      logger.info('Sync complete:');
158      logger.info(`  Processed: ${stats.processed}`);
159      logger.info(`  Skipped: ${stats.skipped}`);
160      logger.info(`  Errors: ${stats.errors}`);
161  
162      return stats;
163    } catch (error) {
164      logger.error('Failed to sync unsubscribes:', error);
165      throw error;
166    }
167  }
168  
169  /**
170   * Check if an email is globally unsubscribed
171   * @param {string} email
172   * @returns {Promise<boolean>}
173   */
174  export async function isEmailUnsubscribed(email) {
175    const result = await getOne(
176      'SELECT 1 FROM unsubscribed_emails WHERE lower(email) = lower($1)',
177      [email]
178    );
179    return !!result;
180  }
181  
182  /**
183   * Get count of globally unsubscribed emails
184   * @returns {Promise<number>}
185   */
186  export async function getUnsubscribeCount() {
187    const result = await getOne('SELECT COUNT(*) AS count FROM unsubscribed_emails', []);
188    return parseInt(result.count, 10);
189  }
190  
191  // CLI functionality
192  if (import.meta.url === `file://${process.argv[1]}`) {
193    syncUnsubscribes()
194      .then(stats => {
195        if (stats.errors > 0) {
196          process.exit(1);
197        }
198        process.exit(0);
199      })
200      .catch(error => {
201        console.error(`\n❌ Sync failed: ${error.message}\n`);
202        process.exit(1);
203      });
204  }
205  
206  export default {
207    syncUnsubscribes,
208    isEmailUnsubscribed,
209    getUnsubscribeCount,
210  };