phone-exclusion-audit.js
1 #!/usr/bin/env node 2 3 /** 4 * Phone Exclusion Audit Cron Job 5 * 6 * Detects phone numbers that appear in contacts_json across 10+ distinct sites — 7 * a reliable signal of scraped noise (e.g. the html-contact-extractor hitting a 8 * shared error/placeholder page and recording its phone as a real contact). 9 * 10 * Steps: 11 * 1. Scan all contacts_json rows, count per-phone site occurrences in JS 12 * 2. For any phone at >= THRESHOLD sites not already excluded: add to phone_exclusion_list 13 * 3. Strip those phones from every site's contacts_json phone_numbers array 14 * 4. Mark queued SMS outreach to those numbers as failed 15 * 5. Log a summary and update cron_jobs last_run 16 * 17 * Schedule: Weekly (7 days) via cron_jobs table 18 */ 19 20 import { readdirSync } from 'fs'; 21 import { getAll, run, withTransaction } from './../utils/db.js'; 22 import Logger from '../utils/logger.js'; 23 import { getContactsDataWithFallback, setContactsJson, DATA_DIR } from '../utils/contacts-storage.js'; 24 25 const logger = new Logger('PhoneExclusionAudit'); 26 27 /** Minimum number of distinct sites a phone must appear on to be flagged. */ 28 const THRESHOLD = 10; 29 30 /** Return minimal {id} rows for all sites with contacts on filesystem. */ 31 function getSiteIdsWithContacts() { 32 try { 33 return readdirSync(DATA_DIR) 34 .filter(f => f.endsWith('.json')) 35 .map(f => ({ id: parseInt(f, 10) })) 36 .filter(r => !isNaN(r.id)); 37 } catch { 38 return []; 39 } 40 } 41 42 /** 43 * Scan contacts_json for phones appearing across >= THRESHOLD sites. 44 * Returns a Map of phone -> site_count for phones that exceed the threshold 45 * and are not already in phone_exclusion_list. 46 * 47 * @returns {Promise<Map<string, number>>} 48 */ 49 async function detectCommonPhones() { 50 logger.info('Scanning contacts_json for common phone numbers...'); 51 52 const rows = getSiteIdsWithContacts(); 53 54 logger.info(`Scanning ${rows.length} sites...`); 55 56 // phone -> Set of site IDs that contain it 57 const phoneToSiteIds = new Map(); 58 59 for (const row of rows) { 60 let contacts; 61 try { 62 contacts = getContactsDataWithFallback(row.id, row); 63 } catch { 64 continue; // malformed JSON — skip 65 } 66 if (!contacts) continue; 67 68 const phones = contacts?.phone_numbers; 69 if (!Array.isArray(phones) || phones.length === 0) continue; 70 71 for (const phone of phones) { 72 if (!phone || typeof phone !== 'string') continue; 73 const normalized = phone.trim(); 74 if (!normalized) continue; 75 76 if (!phoneToSiteIds.has(normalized)) { 77 phoneToSiteIds.set(normalized, new Set()); 78 } 79 phoneToSiteIds.get(normalized).add(row.id); 80 } 81 } 82 83 // Load existing exclusion list so we don't double-count 84 const existingRows = await getAll('SELECT phone FROM phone_exclusion_list'); 85 const alreadyExcluded = new Set(existingRows.map(r => r.phone)); 86 87 const newlyFlagged = new Map(); 88 89 for (const [phone, siteIds] of phoneToSiteIds) { 90 if (siteIds.size >= THRESHOLD && !alreadyExcluded.has(phone)) { 91 newlyFlagged.set(phone, siteIds.size); 92 } 93 } 94 95 logger.info( 96 `Found ${newlyFlagged.size} new phone(s) appearing in >= ${THRESHOLD} sites (${alreadyExcluded.size} already excluded)` 97 ); 98 99 return newlyFlagged; 100 } 101 102 /** 103 * Insert newly flagged phones into phone_exclusion_list. 104 * 105 * @param {Map<string, number>} flaggedPhones 106 */ 107 async function addToExclusionList(flaggedPhones) { 108 if (flaggedPhones.size === 0) return; 109 110 await withTransaction(async (client) => { 111 for (const [phone, siteCount] of flaggedPhones) { 112 await client.query( 113 `INSERT INTO phone_exclusion_list (phone, reason, site_count, added_by) 114 VALUES ($1, $2, $3, 'cron') 115 ON CONFLICT DO NOTHING`, 116 [phone, `Appears in ${siteCount} sites — likely scraped noise`, siteCount] 117 ); 118 logger.info(` Excluded: ${phone} (${siteCount} sites)`); 119 } 120 }); 121 122 logger.info(`Added ${flaggedPhones.size} phone(s) to exclusion list`); 123 } 124 125 /** 126 * Remove excluded phones from every site's contacts_json phone_numbers array. 127 * Returns the count of sites whose contacts_json was modified. 128 * 129 * @param {string[]} phones List of phones to strip 130 * @returns {number} 131 */ 132 function stripFromContactsJson(phones) { 133 if (phones.length === 0) return 0; 134 135 const phoneSet = new Set(phones); 136 const rows = getSiteIdsWithContacts(); 137 let updatedCount = 0; 138 139 for (const row of rows) { 140 let contacts; 141 try { 142 contacts = getContactsDataWithFallback(row.id, null); 143 } catch { 144 continue; 145 } 146 if (!contacts) continue; 147 148 const originalPhones = contacts?.phone_numbers; 149 if (!Array.isArray(originalPhones) || originalPhones.length === 0) continue; 150 151 const filtered = originalPhones.filter(p => !phoneSet.has(p?.trim())); 152 if (filtered.length === originalPhones.length) continue; // nothing changed 153 154 contacts.phone_numbers = filtered; 155 setContactsJson(row.id, JSON.stringify(contacts)); 156 updatedCount++; 157 } 158 159 logger.info(`Stripped excluded phone(s) from ${updatedCount} site(s)`); 160 return updatedCount; 161 } 162 163 /** 164 * Mark any queued SMS messages to excluded numbers as failed. 165 * Returns the count of messages updated. 166 * 167 * @param {string[]} phones 168 * @returns {Promise<number>} 169 */ 170 async function markMessagesFailed(phones) { 171 if (phones.length === 0) return 0; 172 173 const result = await run( 174 `UPDATE messages 175 SET delivery_status = 'skipped', 176 error_message = 'Phone on exclusion list (common scraped noise)', 177 updated_at = NOW() 178 WHERE contact_method = 'sms' 179 AND approval_status = 'approved' 180 AND delivery_status IS NULL 181 AND contact_uri = ANY($1::text[])`, 182 [phones] 183 ); 184 185 logger.info(`Marked ${result.changes} queued SMS message(s) as failed`); 186 return result.changes; 187 } 188 189 /** 190 * Update cron_jobs table with the outcome of this run. 191 */ 192 async function logCronExecution(status, details) { 193 try { 194 await run( 195 `UPDATE ops.cron_jobs 196 SET last_run_at = NOW(), 197 updated_at = NOW() 198 WHERE task_key = 'phoneExclusionAudit'` 199 ); 200 } catch (err2) { 201 logger.warn(`Could not update cron_jobs: ${err2.message}`); 202 } 203 } 204 205 /** 206 * Main entry point. 207 */ 208 async function runPhoneExclusionAudit() { 209 const startTime = Date.now(); 210 211 try { 212 logger.info('Starting phone exclusion audit...'); 213 214 // 1. Detect newly-common phones 215 const flaggedPhones = await detectCommonPhones(); 216 217 // 2. Add to exclusion list 218 await addToExclusionList(flaggedPhones); 219 220 const phoneList = [...flaggedPhones.keys()]; 221 222 // 3. Strip from contacts_json (filesystem operation — synchronous) 223 const sitesUpdated = stripFromContactsJson(phoneList); 224 225 // 4. Mark queued messages failed 226 const messagesFailed = await markMessagesFailed(phoneList); 227 228 const duration = Date.now() - startTime; 229 230 const summary = { 231 new_phones_excluded: flaggedPhones.size, 232 sites_updated: sitesUpdated, 233 messages_failed: messagesFailed, 234 threshold: THRESHOLD, 235 duration_ms: duration, 236 }; 237 238 // 5. Log to cron_jobs 239 await logCronExecution('completed', summary); 240 241 logger.info('Phone Exclusion Audit Summary:'); 242 logger.info(` New phones excluded : ${flaggedPhones.size}`); 243 logger.info(` Sites updated : ${sitesUpdated}`); 244 logger.info(` SMS messages failed : ${messagesFailed}`); 245 logger.info(` Duration : ${(duration / 1000).toFixed(2)}s`); 246 247 return summary; 248 } catch (err) { 249 logger.error(`Phone exclusion audit failed: ${err.message}`); 250 await logCronExecution('failed', { error: err.message, duration_ms: Date.now() - startTime }); 251 throw err; 252 } 253 } 254 255 // Run if executed directly 256 if (import.meta.url === `file://${process.argv[1]}`) { 257 runPhoneExclusionAudit() 258 .then(() => { 259 logger.info('Phone exclusion audit completed'); 260 process.exit(0); 261 }) 262 .catch(err => { 263 logger.error(`Phone exclusion audit failed: ${err.message}`); 264 process.exit(1); 265 }); 266 } 267 268 export default runPhoneExclusionAudit;