/ src / cron / email-exclusion-audit.js
email-exclusion-audit.js
  1  #!/usr/bin/env node
  2  
  3  /**
  4   * Email Exclusion Audit Cron Job
  5   *
  6   * Detects email addresses that appear in contacts_json across 10+ distinct sites —
  7   * a reliable signal of scraped noise (e.g. the html-contact-extractor hitting a
  8   * shared error/placeholder page and recording its email as a real contact, or a
  9   * font/CDN page whose copyright email ends up on hundreds of sites).
 10   *
 11   * Role-based prefixes (info@, support@, sales@, etc.) are deliberately skipped —
 12   * these are legitimately common across businesses and the cross-site duplicate check
 13   * would produce false positives.  The exclusion list is for the same *exact* address
 14   * on 10+ different domains (e.g. user@domain.com, impallari@gmail.com).
 15   *
 16   * Steps:
 17   *   1. Scan all contacts_json rows, count per-email site occurrences in JS
 18   *   2. Skip role-based prefixes (info@, contact@, hello@, …)
 19   *   3. For any email at >= THRESHOLD sites not already excluded: add to email_exclusion_list
 20   *   4. Strip those emails from every site's contacts_json email_addresses array
 21   *   5. Mark queued email outreach to those addresses as failed
 22   *   6. Log a summary and update cron_jobs last_run
 23   *
 24   * Schedule: Weekly (7 days) via cron_jobs table (task_key: emailExclusionAudit)
 25   */
 26  
 27  import { readdirSync } from 'fs';
 28  import { getAll, run, withTransaction } from './../utils/db.js';
 29  import Logger from '../utils/logger.js';
 30  import { getContactsDataWithFallback, setContactsJson, DATA_DIR } from '../utils/contacts-storage.js';
 31  
 32  const logger = new Logger('EmailExclusionAudit');
 33  
 34  /** Minimum number of distinct sites an email must appear on to be flagged. */
 35  const THRESHOLD = 10;
 36  
 37  /**
 38   * Role-based email prefixes that are legitimately common across many businesses.
 39   * These are skipped even if they appear on 10+ sites, because the cross-site
 40   * duplicate signal doesn't apply — every plumber has an info@ address.
 41   *
 42   * Note: this only applies to prefixes before the @.  If the full address is the
 43   * same domain (e.g. info@acmeplumbing.com.au on 10 sites) it would still be caught,
 44   * but in practice that won't happen — each business has its own domain.
 45   */
 46  const ROLE_PREFIXES = [
 47    'info@',
 48    'contact@',
 49    'hello@',
 50    'admin@',
 51    'support@',
 52    'sales@',
 53    'enquiries@',
 54    'enquiry@',
 55    'office@',
 56    'mail@',
 57    'noreply@',
 58    'no-reply@',
 59  ];
 60  
 61  /**
 62   * Normalise an email address: lowercase and trim whitespace.
 63   *
 64   * @param {string} raw
 65   * @returns {string}
 66   */
 67  function normaliseEmail(raw) {
 68    return raw.trim().toLowerCase();
 69  }
 70  
 71  /**
 72   * Extract the email string from an email_addresses entry.
 73   * Handles both plain strings and objects like { email: '...', label: '...' }.
 74   *
 75   * @param {unknown} entry
 76   * @returns {string|null}
 77   */
 78  function extractEmailAddress(entry) {
 79    if (typeof entry === 'string') return entry;
 80    if (entry && typeof entry === 'object' && typeof entry.email === 'string') return entry.email;
 81    return null;
 82  }
 83  
 84  /**
 85   * Return true if the email should be skipped (role-based prefix).
 86   *
 87   * @param {string} email  Already normalised (lowercase)
 88   * @returns {boolean}
 89   */
 90  function isRoleBased(email) {
 91    return ROLE_PREFIXES.some(prefix => email.startsWith(prefix));
 92  }
 93  
 94  /** Return minimal {id} rows for all sites with contacts on filesystem. */
 95  function getSiteIdsWithContacts() {
 96    try {
 97      return readdirSync(DATA_DIR)
 98        .filter(f => f.endsWith('.json'))
 99        .map(f => ({ id: parseInt(f, 10) }))
100        .filter(r => !isNaN(r.id));
101    } catch {
102      return [];
103    }
104  }
105  
106  /**
107   * Scan contacts_json for emails appearing across >= THRESHOLD sites.
108   * Returns a Map of email -> site_count for emails that exceed the threshold
109   * and are not already in email_exclusion_list.
110   *
111   * @returns {Promise<Map<string, number>>}
112   */
113  async function detectCommonEmails() {
114    logger.info('Scanning contacts_json for common email addresses...');
115  
116    const rows = getSiteIdsWithContacts();
117  
118    logger.info(`Scanning ${rows.length} sites...`);
119  
120    // email -> Set of site IDs that contain it
121    const emailToSiteIds = new Map();
122  
123    for (const row of rows) {
124      let contacts;
125      try {
126        contacts = getContactsDataWithFallback(row.id, null);
127      } catch {
128        continue; // malformed JSON — skip
129      }
130      if (!contacts) continue;
131  
132      const emails = contacts?.email_addresses;
133      if (!Array.isArray(emails) || emails.length === 0) continue;
134  
135      const seenInThisSite = new Set();
136  
137      for (const entry of emails) {
138        const raw = extractEmailAddress(entry);
139        if (!raw) continue;
140  
141        const normalised = normaliseEmail(raw);
142        if (!normalised || seenInThisSite.has(normalised)) continue;
143        seenInThisSite.add(normalised);
144  
145        // Skip entries that don't look like email addresses (no @ or too short)
146        if (!normalised.includes('@') || normalised.length < 6) continue;
147  
148        // Skip role-based prefixes — these appear legitimately across many businesses
149        if (isRoleBased(normalised)) continue;
150  
151        if (!emailToSiteIds.has(normalised)) {
152          emailToSiteIds.set(normalised, new Set());
153        }
154        emailToSiteIds.get(normalised).add(row.id);
155      }
156    }
157  
158    // Load existing exclusion list so we don't double-count
159    const existingRows = await getAll('SELECT email FROM email_exclusion_list');
160    const alreadyExcluded = new Set(existingRows.map(r => r.email));
161  
162    const newlyFlagged = new Map();
163  
164    for (const [email, siteIds] of emailToSiteIds) {
165      if (siteIds.size >= THRESHOLD && !alreadyExcluded.has(email)) {
166        newlyFlagged.set(email, siteIds.size);
167      }
168    }
169  
170    logger.info(
171      `Found ${newlyFlagged.size} new email(s) appearing in >= ${THRESHOLD} sites (${alreadyExcluded.size} already excluded)`
172    );
173  
174    return newlyFlagged;
175  }
176  
177  /**
178   * Insert newly flagged emails into email_exclusion_list.
179   *
180   * @param {Map<string, number>} flaggedEmails
181   */
182  async function addToExclusionList(flaggedEmails) {
183    if (flaggedEmails.size === 0) return;
184  
185    await withTransaction(async (client) => {
186      for (const [email, siteCount] of flaggedEmails) {
187        await client.query(
188          `INSERT INTO email_exclusion_list (email, reason, site_count, added_by)
189           VALUES ($1, $2, $3, 'cron')
190           ON CONFLICT DO NOTHING`,
191          [email, `Appears in ${siteCount} sites — likely scraped noise`, siteCount]
192        );
193        logger.info(`  Excluded: ${email} (${siteCount} sites)`);
194      }
195    });
196  
197    logger.info(`Added ${flaggedEmails.size} email(s) to exclusion list`);
198  }
199  
200  /**
201   * Remove excluded emails from every site's contacts_json email_addresses array.
202   * Handles both plain-string entries and object entries { email, label }.
203   * Returns the count of sites whose contacts_json was modified.
204   *
205   * @param {string[]} emails  Normalised (lowercase) emails to strip
206   * @returns {number}
207   */
208  function stripFromContactsJson(emails) {
209    if (emails.length === 0) return 0;
210  
211    const emailSet = new Set(emails);
212    const rows = getSiteIdsWithContacts();
213    let updatedCount = 0;
214  
215    for (const row of rows) {
216      let contacts;
217      try {
218        contacts = getContactsDataWithFallback(row.id, null);
219      } catch {
220        continue;
221      }
222      if (!contacts) continue;
223  
224      const originalEmails = contacts?.email_addresses;
225      if (!Array.isArray(originalEmails) || originalEmails.length === 0) continue;
226  
227      const filtered = originalEmails.filter(entry => {
228        const raw = extractEmailAddress(entry);
229        if (!raw) return true; // keep malformed entries as-is
230        return !emailSet.has(normaliseEmail(raw));
231      });
232  
233      if (filtered.length === originalEmails.length) continue; // nothing changed
234  
235      contacts.email_addresses = filtered;
236      setContactsJson(row.id, JSON.stringify(contacts));
237      updatedCount++;
238    }
239  
240    logger.info(`Stripped excluded email(s) from ${updatedCount} site(s)`);
241    return updatedCount;
242  }
243  
244  /**
245   * Mark any queued email messages to excluded addresses as failed.
246   * Returns the count of messages updated.
247   *
248   * @param {string[]} emails  Normalised (lowercase) emails
249   * @returns {Promise<number>}
250   */
251  async function markMessagesFailed(emails) {
252    if (emails.length === 0) return 0;
253  
254    // contact_uri stores the raw address — normalise with lower() for the match.
255    const result = await run(
256      `UPDATE messages
257       SET delivery_status = 'skipped',
258           error_message   = 'Email on exclusion list (common scraped noise)',
259           updated_at      = NOW()
260       WHERE contact_method    = 'email'
261         AND approval_status   = 'approved'
262         AND delivery_status   IS NULL
263         AND lower(contact_uri) = ANY($1::text[])`,
264      [emails]
265    );
266  
267    logger.info(`Marked ${result.changes} queued email message(s) as failed`);
268    return result.changes;
269  }
270  
271  /**
272   * Update cron_jobs last_run_at for this job.
273   */
274  async function updateCronLastRun() {
275    try {
276      await run(
277        `UPDATE ops.cron_jobs
278         SET last_run_at = NOW(),
279             updated_at  = NOW()
280         WHERE task_key = 'emailExclusionAudit'`
281      );
282    } catch (err) {
283      logger.warn(`Could not update cron_jobs last_run_at: ${err.message}`);
284    }
285  }
286  
287  /**
288   * Main entry point.
289   */
290  async function runEmailExclusionAudit() {
291    const startTime = Date.now();
292  
293    try {
294      logger.info('Starting email exclusion audit...');
295  
296      // 1. Detect newly-common emails (skipping role-based prefixes).
297      const flaggedEmails = await detectCommonEmails();
298  
299      let sitesUpdated = 0;
300      let messagesFailed = 0;
301  
302      const emailList = [...flaggedEmails.keys()];
303  
304      if (emailList.length > 0) {
305        // 2. Add to exclusion list
306        await addToExclusionList(flaggedEmails);
307  
308        // 3. Strip from contacts_json (filesystem operation — synchronous)
309        sitesUpdated = stripFromContactsJson(emailList);
310  
311        // 4. Mark queued messages failed
312        messagesFailed = await markMessagesFailed(emailList);
313      }
314  
315      const duration = Date.now() - startTime;
316  
317      const summary = {
318        new_emails_excluded: flaggedEmails.size,
319        sites_updated: sitesUpdated,
320        messages_failed: messagesFailed,
321        threshold: THRESHOLD,
322        duration_ms: duration,
323      };
324  
325      // 5. Update cron_jobs
326      await updateCronLastRun();
327  
328      logger.info('Email Exclusion Audit Summary:');
329      logger.info(`  New emails excluded  : ${flaggedEmails.size}`);
330      logger.info(`  Sites updated        : ${sitesUpdated}`);
331      logger.info(`  Email messages failed: ${messagesFailed}`);
332      logger.info(`  Duration             : ${(duration / 1000).toFixed(2)}s`);
333  
334      return summary;
335    } catch (err) {
336      logger.error(`Email exclusion audit failed: ${err.message}`);
337      await updateCronLastRun();
338      throw err;
339    }
340  }
341  
342  // Run if executed directly
343  if (import.meta.url === `file://${process.argv[1]}`) {
344    runEmailExclusionAudit()
345      .then(() => {
346        logger.info('Email exclusion audit completed');
347        process.exit(0);
348      })
349      .catch(err => {
350        logger.error(`Email exclusion audit failed: ${err.message}`);
351        process.exit(1);
352      });
353  }
354  
355  export default runEmailExclusionAudit;