/ src / cron / phone-exclusion-audit.js
phone-exclusion-audit.js
  1  #!/usr/bin/env node
  2  
  3  /**
  4   * Phone Exclusion Audit Cron Job
  5   *
  6   * Detects phone numbers that appear in contacts_json across 10+ distinct sites —
  7   * a reliable signal of scraped noise (e.g. the html-contact-extractor hitting a
  8   * shared error/placeholder page and recording its phone as a real contact).
  9   *
 10   * Steps:
 11   *   1. Scan all contacts_json rows, count per-phone site occurrences in JS
 12   *   2. For any phone at >= THRESHOLD sites not already excluded: add to phone_exclusion_list
 13   *   3. Strip those phones from every site's contacts_json phone_numbers array
 14   *   4. Mark queued SMS outreach to those numbers as failed
 15   *   5. Log a summary and update cron_jobs last_run
 16   *
 17   * Schedule: Weekly (7 days) via cron_jobs table
 18   */
 19  
 20  import { readdirSync } from 'fs';
 21  import { getAll, run, withTransaction } from './../utils/db.js';
 22  import Logger from '../utils/logger.js';
 23  import { getContactsDataWithFallback, setContactsJson, DATA_DIR } from '../utils/contacts-storage.js';
 24  
 25  const logger = new Logger('PhoneExclusionAudit');
 26  
 27  /** Minimum number of distinct sites a phone must appear on to be flagged. */
 28  const THRESHOLD = 10;
 29  
 30  /** Return minimal {id} rows for all sites with contacts on filesystem. */
 31  function getSiteIdsWithContacts() {
 32    try {
 33      return readdirSync(DATA_DIR)
 34        .filter(f => f.endsWith('.json'))
 35        .map(f => ({ id: parseInt(f, 10) }))
 36        .filter(r => !isNaN(r.id));
 37    } catch {
 38      return [];
 39    }
 40  }
 41  
 42  /**
 43   * Scan contacts_json for phones appearing across >= THRESHOLD sites.
 44   * Returns a Map of phone -> site_count for phones that exceed the threshold
 45   * and are not already in phone_exclusion_list.
 46   *
 47   * @returns {Promise<Map<string, number>>}
 48   */
 49  async function detectCommonPhones() {
 50    logger.info('Scanning contacts_json for common phone numbers...');
 51  
 52    const rows = getSiteIdsWithContacts();
 53  
 54    logger.info(`Scanning ${rows.length} sites...`);
 55  
 56    // phone -> Set of site IDs that contain it
 57    const phoneToSiteIds = new Map();
 58  
 59    for (const row of rows) {
 60      let contacts;
 61      try {
 62        contacts = getContactsDataWithFallback(row.id, row);
 63      } catch {
 64        continue; // malformed JSON — skip
 65      }
 66      if (!contacts) continue;
 67  
 68      const phones = contacts?.phone_numbers;
 69      if (!Array.isArray(phones) || phones.length === 0) continue;
 70  
 71      for (const phone of phones) {
 72        if (!phone || typeof phone !== 'string') continue;
 73        const normalized = phone.trim();
 74        if (!normalized) continue;
 75  
 76        if (!phoneToSiteIds.has(normalized)) {
 77          phoneToSiteIds.set(normalized, new Set());
 78        }
 79        phoneToSiteIds.get(normalized).add(row.id);
 80      }
 81    }
 82  
 83    // Load existing exclusion list so we don't double-count
 84    const existingRows = await getAll('SELECT phone FROM phone_exclusion_list');
 85    const alreadyExcluded = new Set(existingRows.map(r => r.phone));
 86  
 87    const newlyFlagged = new Map();
 88  
 89    for (const [phone, siteIds] of phoneToSiteIds) {
 90      if (siteIds.size >= THRESHOLD && !alreadyExcluded.has(phone)) {
 91        newlyFlagged.set(phone, siteIds.size);
 92      }
 93    }
 94  
 95    logger.info(
 96      `Found ${newlyFlagged.size} new phone(s) appearing in >= ${THRESHOLD} sites (${alreadyExcluded.size} already excluded)`
 97    );
 98  
 99    return newlyFlagged;
100  }
101  
102  /**
103   * Insert newly flagged phones into phone_exclusion_list.
104   *
105   * @param {Map<string, number>} flaggedPhones
106   */
107  async function addToExclusionList(flaggedPhones) {
108    if (flaggedPhones.size === 0) return;
109  
110    await withTransaction(async (client) => {
111      for (const [phone, siteCount] of flaggedPhones) {
112        await client.query(
113          `INSERT INTO phone_exclusion_list (phone, reason, site_count, added_by)
114           VALUES ($1, $2, $3, 'cron')
115           ON CONFLICT DO NOTHING`,
116          [phone, `Appears in ${siteCount} sites — likely scraped noise`, siteCount]
117        );
118        logger.info(`  Excluded: ${phone} (${siteCount} sites)`);
119      }
120    });
121  
122    logger.info(`Added ${flaggedPhones.size} phone(s) to exclusion list`);
123  }
124  
125  /**
126   * Remove excluded phones from every site's contacts_json phone_numbers array.
127   * Returns the count of sites whose contacts_json was modified.
128   *
129   * @param {string[]} phones  List of phones to strip
130   * @returns {number}
131   */
132  function stripFromContactsJson(phones) {
133    if (phones.length === 0) return 0;
134  
135    const phoneSet = new Set(phones);
136    const rows = getSiteIdsWithContacts();
137    let updatedCount = 0;
138  
139    for (const row of rows) {
140      let contacts;
141      try {
142        contacts = getContactsDataWithFallback(row.id, null);
143      } catch {
144        continue;
145      }
146      if (!contacts) continue;
147  
148      const originalPhones = contacts?.phone_numbers;
149      if (!Array.isArray(originalPhones) || originalPhones.length === 0) continue;
150  
151      const filtered = originalPhones.filter(p => !phoneSet.has(p?.trim()));
152      if (filtered.length === originalPhones.length) continue; // nothing changed
153  
154      contacts.phone_numbers = filtered;
155      setContactsJson(row.id, JSON.stringify(contacts));
156      updatedCount++;
157    }
158  
159    logger.info(`Stripped excluded phone(s) from ${updatedCount} site(s)`);
160    return updatedCount;
161  }
162  
163  /**
164   * Mark any queued SMS messages to excluded numbers as failed.
165   * Returns the count of messages updated.
166   *
167   * @param {string[]} phones
168   * @returns {Promise<number>}
169   */
170  async function markMessagesFailed(phones) {
171    if (phones.length === 0) return 0;
172  
173    const result = await run(
174      `UPDATE messages
175       SET delivery_status = 'skipped',
176           error_message   = 'Phone on exclusion list (common scraped noise)',
177           updated_at      = NOW()
178       WHERE contact_method    = 'sms'
179         AND approval_status   = 'approved'
180         AND delivery_status   IS NULL
181         AND contact_uri       = ANY($1::text[])`,
182      [phones]
183    );
184  
185    logger.info(`Marked ${result.changes} queued SMS message(s) as failed`);
186    return result.changes;
187  }
188  
189  /**
190   * Update cron_jobs table with the outcome of this run.
191   */
192  async function logCronExecution(status, details) {
193    try {
194      await run(
195        `UPDATE ops.cron_jobs
196         SET last_run_at = NOW(),
197             updated_at  = NOW()
198         WHERE task_key = 'phoneExclusionAudit'`
199      );
200    } catch (err2) {
201      logger.warn(`Could not update cron_jobs: ${err2.message}`);
202    }
203  }
204  
205  /**
206   * Main entry point.
207   */
208  async function runPhoneExclusionAudit() {
209    const startTime = Date.now();
210  
211    try {
212      logger.info('Starting phone exclusion audit...');
213  
214      // 1. Detect newly-common phones
215      const flaggedPhones = await detectCommonPhones();
216  
217      // 2. Add to exclusion list
218      await addToExclusionList(flaggedPhones);
219  
220      const phoneList = [...flaggedPhones.keys()];
221  
222      // 3. Strip from contacts_json (filesystem operation — synchronous)
223      const sitesUpdated = stripFromContactsJson(phoneList);
224  
225      // 4. Mark queued messages failed
226      const messagesFailed = await markMessagesFailed(phoneList);
227  
228      const duration = Date.now() - startTime;
229  
230      const summary = {
231        new_phones_excluded: flaggedPhones.size,
232        sites_updated: sitesUpdated,
233        messages_failed: messagesFailed,
234        threshold: THRESHOLD,
235        duration_ms: duration,
236      };
237  
238      // 5. Log to cron_jobs
239      await logCronExecution('completed', summary);
240  
241      logger.info('Phone Exclusion Audit Summary:');
242      logger.info(`  New phones excluded : ${flaggedPhones.size}`);
243      logger.info(`  Sites updated       : ${sitesUpdated}`);
244      logger.info(`  SMS messages failed : ${messagesFailed}`);
245      logger.info(`  Duration            : ${(duration / 1000).toFixed(2)}s`);
246  
247      return summary;
248    } catch (err) {
249      logger.error(`Phone exclusion audit failed: ${err.message}`);
250      await logCronExecution('failed', { error: err.message, duration_ms: Date.now() - startTime });
251      throw err;
252    }
253  }
254  
255  // Run if executed directly
256  if (import.meta.url === `file://${process.argv[1]}`) {
257    runPhoneExclusionAudit()
258      .then(() => {
259        logger.info('Phone exclusion audit completed');
260        process.exit(0);
261      })
262      .catch(err => {
263        logger.error(`Phone exclusion audit failed: ${err.message}`);
264        process.exit(1);
265      });
266  }
267  
268  export default runPhoneExclusionAudit;