email-exclusion-audit.js
1 #!/usr/bin/env node 2 3 /** 4 * Email Exclusion Audit Cron Job 5 * 6 * Detects email addresses that appear in contacts_json across 10+ distinct sites — 7 * a reliable signal of scraped noise (e.g. the html-contact-extractor hitting a 8 * shared error/placeholder page and recording its email as a real contact, or a 9 * font/CDN page whose copyright email ends up on hundreds of sites). 10 * 11 * Role-based prefixes (info@, support@, sales@, etc.) are deliberately skipped — 12 * these are legitimately common across businesses and the cross-site duplicate check 13 * would produce false positives. The exclusion list is for the same *exact* address 14 * on 10+ different domains (e.g. user@domain.com, impallari@gmail.com). 15 * 16 * Steps: 17 * 1. Scan all contacts_json rows, count per-email site occurrences in JS 18 * 2. Skip role-based prefixes (info@, contact@, hello@, …) 19 * 3. For any email at >= THRESHOLD sites not already excluded: add to email_exclusion_list 20 * 4. Strip those emails from every site's contacts_json email_addresses array 21 * 5. Mark queued email outreach to those addresses as failed 22 * 6. Log a summary and update cron_jobs last_run 23 * 24 * Schedule: Weekly (7 days) via cron_jobs table (task_key: emailExclusionAudit) 25 */ 26 27 import { readdirSync } from 'fs'; 28 import { getAll, run, withTransaction } from './../utils/db.js'; 29 import Logger from '../utils/logger.js'; 30 import { getContactsDataWithFallback, setContactsJson, DATA_DIR } from '../utils/contacts-storage.js'; 31 32 const logger = new Logger('EmailExclusionAudit'); 33 34 /** Minimum number of distinct sites an email must appear on to be flagged. */ 35 const THRESHOLD = 10; 36 37 /** 38 * Role-based email prefixes that are legitimately common across many businesses. 39 * These are skipped even if they appear on 10+ sites, because the cross-site 40 * duplicate signal doesn't apply — every plumber has an info@ address. 41 * 42 * Note: this only applies to prefixes before the @. If the full address is the 43 * same domain (e.g. info@acmeplumbing.com.au on 10 sites) it would still be caught, 44 * but in practice that won't happen — each business has its own domain. 45 */ 46 const ROLE_PREFIXES = [ 47 'info@', 48 'contact@', 49 'hello@', 50 'admin@', 51 'support@', 52 'sales@', 53 'enquiries@', 54 'enquiry@', 55 'office@', 56 'mail@', 57 'noreply@', 58 'no-reply@', 59 ]; 60 61 /** 62 * Normalise an email address: lowercase and trim whitespace. 63 * 64 * @param {string} raw 65 * @returns {string} 66 */ 67 function normaliseEmail(raw) { 68 return raw.trim().toLowerCase(); 69 } 70 71 /** 72 * Extract the email string from an email_addresses entry. 73 * Handles both plain strings and objects like { email: '...', label: '...' }. 74 * 75 * @param {unknown} entry 76 * @returns {string|null} 77 */ 78 function extractEmailAddress(entry) { 79 if (typeof entry === 'string') return entry; 80 if (entry && typeof entry === 'object' && typeof entry.email === 'string') return entry.email; 81 return null; 82 } 83 84 /** 85 * Return true if the email should be skipped (role-based prefix). 86 * 87 * @param {string} email Already normalised (lowercase) 88 * @returns {boolean} 89 */ 90 function isRoleBased(email) { 91 return ROLE_PREFIXES.some(prefix => email.startsWith(prefix)); 92 } 93 94 /** Return minimal {id} rows for all sites with contacts on filesystem. */ 95 function getSiteIdsWithContacts() { 96 try { 97 return readdirSync(DATA_DIR) 98 .filter(f => f.endsWith('.json')) 99 .map(f => ({ id: parseInt(f, 10) })) 100 .filter(r => !isNaN(r.id)); 101 } catch { 102 return []; 103 } 104 } 105 106 /** 107 * Scan contacts_json for emails appearing across >= THRESHOLD sites. 108 * Returns a Map of email -> site_count for emails that exceed the threshold 109 * and are not already in email_exclusion_list. 110 * 111 * @returns {Promise<Map<string, number>>} 112 */ 113 async function detectCommonEmails() { 114 logger.info('Scanning contacts_json for common email addresses...'); 115 116 const rows = getSiteIdsWithContacts(); 117 118 logger.info(`Scanning ${rows.length} sites...`); 119 120 // email -> Set of site IDs that contain it 121 const emailToSiteIds = new Map(); 122 123 for (const row of rows) { 124 let contacts; 125 try { 126 contacts = getContactsDataWithFallback(row.id, null); 127 } catch { 128 continue; // malformed JSON — skip 129 } 130 if (!contacts) continue; 131 132 const emails = contacts?.email_addresses; 133 if (!Array.isArray(emails) || emails.length === 0) continue; 134 135 const seenInThisSite = new Set(); 136 137 for (const entry of emails) { 138 const raw = extractEmailAddress(entry); 139 if (!raw) continue; 140 141 const normalised = normaliseEmail(raw); 142 if (!normalised || seenInThisSite.has(normalised)) continue; 143 seenInThisSite.add(normalised); 144 145 // Skip entries that don't look like email addresses (no @ or too short) 146 if (!normalised.includes('@') || normalised.length < 6) continue; 147 148 // Skip role-based prefixes — these appear legitimately across many businesses 149 if (isRoleBased(normalised)) continue; 150 151 if (!emailToSiteIds.has(normalised)) { 152 emailToSiteIds.set(normalised, new Set()); 153 } 154 emailToSiteIds.get(normalised).add(row.id); 155 } 156 } 157 158 // Load existing exclusion list so we don't double-count 159 const existingRows = await getAll('SELECT email FROM email_exclusion_list'); 160 const alreadyExcluded = new Set(existingRows.map(r => r.email)); 161 162 const newlyFlagged = new Map(); 163 164 for (const [email, siteIds] of emailToSiteIds) { 165 if (siteIds.size >= THRESHOLD && !alreadyExcluded.has(email)) { 166 newlyFlagged.set(email, siteIds.size); 167 } 168 } 169 170 logger.info( 171 `Found ${newlyFlagged.size} new email(s) appearing in >= ${THRESHOLD} sites (${alreadyExcluded.size} already excluded)` 172 ); 173 174 return newlyFlagged; 175 } 176 177 /** 178 * Insert newly flagged emails into email_exclusion_list. 179 * 180 * @param {Map<string, number>} flaggedEmails 181 */ 182 async function addToExclusionList(flaggedEmails) { 183 if (flaggedEmails.size === 0) return; 184 185 await withTransaction(async (client) => { 186 for (const [email, siteCount] of flaggedEmails) { 187 await client.query( 188 `INSERT INTO email_exclusion_list (email, reason, site_count, added_by) 189 VALUES ($1, $2, $3, 'cron') 190 ON CONFLICT DO NOTHING`, 191 [email, `Appears in ${siteCount} sites — likely scraped noise`, siteCount] 192 ); 193 logger.info(` Excluded: ${email} (${siteCount} sites)`); 194 } 195 }); 196 197 logger.info(`Added ${flaggedEmails.size} email(s) to exclusion list`); 198 } 199 200 /** 201 * Remove excluded emails from every site's contacts_json email_addresses array. 202 * Handles both plain-string entries and object entries { email, label }. 203 * Returns the count of sites whose contacts_json was modified. 204 * 205 * @param {string[]} emails Normalised (lowercase) emails to strip 206 * @returns {number} 207 */ 208 function stripFromContactsJson(emails) { 209 if (emails.length === 0) return 0; 210 211 const emailSet = new Set(emails); 212 const rows = getSiteIdsWithContacts(); 213 let updatedCount = 0; 214 215 for (const row of rows) { 216 let contacts; 217 try { 218 contacts = getContactsDataWithFallback(row.id, null); 219 } catch { 220 continue; 221 } 222 if (!contacts) continue; 223 224 const originalEmails = contacts?.email_addresses; 225 if (!Array.isArray(originalEmails) || originalEmails.length === 0) continue; 226 227 const filtered = originalEmails.filter(entry => { 228 const raw = extractEmailAddress(entry); 229 if (!raw) return true; // keep malformed entries as-is 230 return !emailSet.has(normaliseEmail(raw)); 231 }); 232 233 if (filtered.length === originalEmails.length) continue; // nothing changed 234 235 contacts.email_addresses = filtered; 236 setContactsJson(row.id, JSON.stringify(contacts)); 237 updatedCount++; 238 } 239 240 logger.info(`Stripped excluded email(s) from ${updatedCount} site(s)`); 241 return updatedCount; 242 } 243 244 /** 245 * Mark any queued email messages to excluded addresses as failed. 246 * Returns the count of messages updated. 247 * 248 * @param {string[]} emails Normalised (lowercase) emails 249 * @returns {Promise<number>} 250 */ 251 async function markMessagesFailed(emails) { 252 if (emails.length === 0) return 0; 253 254 // contact_uri stores the raw address — normalise with lower() for the match. 255 const result = await run( 256 `UPDATE messages 257 SET delivery_status = 'skipped', 258 error_message = 'Email on exclusion list (common scraped noise)', 259 updated_at = NOW() 260 WHERE contact_method = 'email' 261 AND approval_status = 'approved' 262 AND delivery_status IS NULL 263 AND lower(contact_uri) = ANY($1::text[])`, 264 [emails] 265 ); 266 267 logger.info(`Marked ${result.changes} queued email message(s) as failed`); 268 return result.changes; 269 } 270 271 /** 272 * Update cron_jobs last_run_at for this job. 273 */ 274 async function updateCronLastRun() { 275 try { 276 await run( 277 `UPDATE ops.cron_jobs 278 SET last_run_at = NOW(), 279 updated_at = NOW() 280 WHERE task_key = 'emailExclusionAudit'` 281 ); 282 } catch (err) { 283 logger.warn(`Could not update cron_jobs last_run_at: ${err.message}`); 284 } 285 } 286 287 /** 288 * Main entry point. 289 */ 290 async function runEmailExclusionAudit() { 291 const startTime = Date.now(); 292 293 try { 294 logger.info('Starting email exclusion audit...'); 295 296 // 1. Detect newly-common emails (skipping role-based prefixes). 297 const flaggedEmails = await detectCommonEmails(); 298 299 let sitesUpdated = 0; 300 let messagesFailed = 0; 301 302 const emailList = [...flaggedEmails.keys()]; 303 304 if (emailList.length > 0) { 305 // 2. Add to exclusion list 306 await addToExclusionList(flaggedEmails); 307 308 // 3. Strip from contacts_json (filesystem operation — synchronous) 309 sitesUpdated = stripFromContactsJson(emailList); 310 311 // 4. Mark queued messages failed 312 messagesFailed = await markMessagesFailed(emailList); 313 } 314 315 const duration = Date.now() - startTime; 316 317 const summary = { 318 new_emails_excluded: flaggedEmails.size, 319 sites_updated: sitesUpdated, 320 messages_failed: messagesFailed, 321 threshold: THRESHOLD, 322 duration_ms: duration, 323 }; 324 325 // 5. Update cron_jobs 326 await updateCronLastRun(); 327 328 logger.info('Email Exclusion Audit Summary:'); 329 logger.info(` New emails excluded : ${flaggedEmails.size}`); 330 logger.info(` Sites updated : ${sitesUpdated}`); 331 logger.info(` Email messages failed: ${messagesFailed}`); 332 logger.info(` Duration : ${(duration / 1000).toFixed(2)}s`); 333 334 return summary; 335 } catch (err) { 336 logger.error(`Email exclusion audit failed: ${err.message}`); 337 await updateCronLastRun(); 338 throw err; 339 } 340 } 341 342 // Run if executed directly 343 if (import.meta.url === `file://${process.argv[1]}`) { 344 runEmailExclusionAudit() 345 .then(() => { 346 logger.info('Email exclusion audit completed'); 347 process.exit(0); 348 }) 349 .catch(err => { 350 logger.error(`Email exclusion audit failed: ${err.message}`); 351 process.exit(1); 352 }); 353 } 354 355 export default runEmailExclusionAudit;