/ lead-dossier / scripts / lead-pipeline.py
lead-pipeline.py
  1  #!/usr/bin/env python3
  2  """
  3  Lead Pipeline: Source → Verify → Dedupe → Upload
  4  
  5  End-to-end lead sourcing, email verification, deduplication, and campaign upload.
  6  API-agnostic — configure endpoints and keys via environment variables.
  7  
  8  Usage:
  9      python3 lead-pipeline.py \\
 10          --source-api-key KEY --validation-api-key KEY --campaign-api-key KEY \\
 11          --titles "VP Marketing,CMO" --campaign-id UUID --volume 200
 12  
 13      python3 lead-pipeline.py --help
 14  """
 15  
 16  import argparse
 17  import json
 18  import os
 19  import sys
 20  import time
 21  from datetime import datetime
 22  from pathlib import Path
 23  
 24  try:
 25      import requests
 26  except ImportError:
 27      print("ERROR: 'requests' package required. Run: pip3 install requests", file=sys.stderr)
 28      sys.exit(1)
 29  
 30  
 31  # ---------------------------------------------------------------------------
 32  # Configuration — override via environment variables
 33  # ---------------------------------------------------------------------------
 34  
 35  SOURCE_API_URL = os.environ.get(
 36      "LEAD_SOURCE_API_URL",
 37      "https://api.apollo.io/api/v1/mixed_people/search"
 38  )
 39  VALIDATION_API_URL = os.environ.get(
 40      "EMAIL_VALIDATION_API_URL",
 41      "https://api.your-email-provider.com/v1/people/email-validation"
 42  )
 43  CAMPAIGN_API_BASE = os.environ.get(
 44      "CAMPAIGN_API_BASE_URL",
 45      "https://api.your-campaign-tool.com/api/v2"
 46  )
 47  
 48  
 49  # ---------------------------------------------------------------------------
 50  # Retry / backoff helper
 51  # ---------------------------------------------------------------------------
 52  
 53  def request_with_retry(method, url, max_retries=5, **kwargs):
 54      """HTTP request with exponential backoff on 429 / 5xx."""
 55      backoff = 1
 56      for attempt in range(max_retries + 1):
 57          try:
 58              resp = requests.request(method, url, timeout=30, **kwargs)
 59              if resp.status_code == 429:
 60                  wait = int(resp.headers.get("Retry-After", backoff))
 61                  print(f"  ⏳ Rate limited (429). Waiting {wait}s …")
 62                  time.sleep(wait)
 63                  backoff = min(backoff * 2, 60)
 64                  continue
 65              if resp.status_code >= 500:
 66                  print(f"  ⚠️  Server error {resp.status_code}. Retry in {backoff}s …")
 67                  time.sleep(backoff)
 68                  backoff = min(backoff * 2, 60)
 69                  continue
 70              return resp
 71          except requests.exceptions.RequestException as e:
 72              if attempt == max_retries:
 73                  raise
 74              print(f"  ⚠️  Request error: {e}. Retry in {backoff}s …")
 75              time.sleep(backoff)
 76              backoff = min(backoff * 2, 60)
 77      return resp  # type: ignore
 78  
 79  
 80  # ---------------------------------------------------------------------------
 81  # Step 1: Source Leads (People Search API)
 82  # ---------------------------------------------------------------------------
 83  
 84  def source_leads(api_key, titles, industries, company_size, locations, keywords, volume):
 85      """Pull leads from a people search API."""
 86      print(f"\n{'='*50}")
 87      print(f"STEP 1: Sourcing leads (target: {volume})")
 88      print(f"{'='*50}")
 89  
 90      leads = []
 91      page = 1
 92  
 93      size_ranges = []
 94      if company_size:
 95          parts = [s.strip() for s in company_size.split(",")]
 96          if len(parts) == 2:
 97              size_ranges = [f"{parts[0]},{parts[1]}"]
 98          else:
 99              size_ranges = parts
100  
101      while len(leads) < volume:
102          body = {
103              "api_key": api_key,
104              "per_page": 100,
105              "page": page,
106          }
107          if titles:
108              body["person_titles"] = [t.strip() for t in titles.split(",")]
109          if industries:
110              body["q_organization_keyword_tags"] = [i.strip() for i in industries.split(",")]
111          if size_ranges:
112              body["organization_num_employees_ranges"] = size_ranges
113          if locations:
114              body["person_locations"] = [l.strip() for l in locations.split(",")]
115          if keywords:
116              body["q_keywords"] = keywords
117  
118          print(f"  📡 Page {page} …", end=" ", flush=True)
119          resp = request_with_retry("POST", SOURCE_API_URL, json=body)
120  
121          if resp.status_code != 200:
122              print(f"ERROR {resp.status_code}: {resp.text[:200]}")
123              break
124  
125          data = resp.json()
126          people = data.get("people", [])
127          if not people:
128              print("no more results.")
129              break
130  
131          page_leads = 0
132          for person in people:
133              email = person.get("email")
134              if not email:
135                  continue
136              leads.append({
137                  "email": email.lower().strip(),
138                  "first_name": person.get("first_name", ""),
139                  "last_name": person.get("last_name", ""),
140                  "title": person.get("title", ""),
141                  "company_name": (person.get("organization") or {}).get("name", ""),
142                  "domain": (person.get("organization") or {}).get("primary_domain", ""),
143              })
144              page_leads += 1
145              if len(leads) >= volume:
146                  break
147  
148          print(f"{page_leads} with email ({len(leads)} total)")
149  
150          total_pages = data.get("pagination", {}).get("total_pages", page)
151          if page >= total_pages:
152              print("  Reached last page.")
153              break
154          page += 1
155          time.sleep(0.5)
156  
157      # Dedupe by email within sourced set
158      seen = set()
159      unique_leads = []
160      for lead in leads:
161          if lead["email"] not in seen:
162              seen.add(lead["email"])
163              unique_leads.append(lead)
164  
165      print(f"\n  ✅ Sourced {len(unique_leads)} unique leads with emails")
166      return unique_leads
167  
168  
169  # ---------------------------------------------------------------------------
170  # Step 2: Email Verification
171  # ---------------------------------------------------------------------------
172  
173  def verify_emails(api_key, leads):
174      """Verify emails via validation API. Returns only valid leads."""
175      print(f"\n{'='*50}")
176      print(f"STEP 2: Verifying {len(leads)} emails")
177      print(f"{'='*50}")
178  
179      headers = {
180          "X-API-Key": api_key,
181          "Content-Type": "application/json",
182      }
183  
184      valid_leads = []
185      invalid_count = 0
186      unknown_count = 0
187      error_count = 0
188      rejection_reasons = {}
189  
190      for i, lead in enumerate(leads):
191          if (i + 1) % 50 == 0 or i == 0:
192              print(f"  🔍 Verifying {i+1}/{len(leads)} …")
193  
194          try:
195              resp = request_with_retry("POST", VALIDATION_API_URL, headers=headers, json={"email": lead["email"]})
196  
197              if resp.status_code != 200:
198                  error_count += 1
199                  continue
200  
201              data = resp.json()
202              status = data.get("email_status", "unknown")
203  
204              if status == "valid":
205                  lead["is_free_email"] = data.get("is_free_email", False)
206                  lead["is_role_based"] = data.get("is_role_based", False)
207                  valid_leads.append(lead)
208              elif status == "invalid":
209                  invalid_count += 1
210                  rejection_reasons["invalid"] = rejection_reasons.get("invalid", 0) + 1
211              else:
212                  unknown_count += 1
213                  rejection_reasons["unknown"] = rejection_reasons.get("unknown", 0) + 1
214  
215          except Exception as e:
216              error_count += 1
217              print(f"  ⚠️  Error verifying {lead['email']}: {e}")
218  
219          if (i + 1) % 20 == 0:
220              time.sleep(0.5)
221  
222      print(f"\n  ✅ Verified: {len(valid_leads)} valid")
223      print(f"  ❌ Invalid: {invalid_count}")
224      print(f"  ❓ Unknown: {unknown_count}")
225      print(f"  ⚠️  Errors: {error_count}")
226  
227      return valid_leads, {
228          "total": len(leads),
229          "valid": len(valid_leads),
230          "invalid": invalid_count,
231          "unknown": unknown_count,
232          "errors": error_count,
233          "rejection_reasons": rejection_reasons,
234      }
235  
236  
237  # ---------------------------------------------------------------------------
238  # Step 3: Deduplicate
239  # ---------------------------------------------------------------------------
240  
241  def get_existing_emails(api_key):
242      """Pull existing leads from campaign tool for dedup."""
243      print(f"\n  📥 Fetching existing leads for dedup …")
244  
245      url = f"{CAMPAIGN_API_BASE}/leads/list"
246      headers = {
247          "Authorization": f"Bearer {api_key}",
248          "Content-Type": "application/json",
249      }
250  
251      existing_emails = set()
252      cursor = None
253      page = 0
254  
255      while True:
256          body = {"limit": 100}
257          if cursor:
258              body["starting_after"] = cursor
259  
260          resp = request_with_retry("POST", url, headers=headers, json=body)
261  
262          if resp.status_code != 200:
263              print(f"  ⚠️  List error {resp.status_code}: {resp.text[:200]}")
264              break
265  
266          data = resp.json()
267          items = data.get("items", [])
268  
269          if not items:
270              break
271  
272          for item in items:
273              email = item.get("email", "").lower().strip()
274              if email:
275                  existing_emails.add(email)
276  
277          cursor = data.get("next_starting_after")
278          if not cursor:
279              break
280  
281          page += 1
282          if page % 10 == 0:
283              print(f"    … {len(existing_emails)} existing leads so far")
284          time.sleep(1)
285  
286      print(f"  📊 Found {len(existing_emails)} existing leads")
287      return existing_emails
288  
289  
290  def load_exclusion_list(filepath):
291      """Load burned emails from a CSV file."""
292      excluded = set()
293      if not filepath or not os.path.exists(filepath):
294          return excluded
295  
296      with open(filepath, "r") as f:
297          for line in f:
298              line = line.strip()
299              if not line or line.startswith("#"):
300                  continue
301              email = line.split(",")[0].strip().strip('"').lower()
302              if "@" in email:
303                  excluded.add(email)
304  
305      print(f"  📋 Loaded {len(excluded)} emails from exclusion list")
306      return excluded
307  
308  
309  def deduplicate(leads, api_key, exclude_file=None):
310      """Remove leads already in campaign tool or on exclusion list."""
311      print(f"\n{'='*50}")
312      print(f"STEP 3: Deduplicating {len(leads)} leads")
313      print(f"{'='*50}")
314  
315      existing = get_existing_emails(api_key)
316      excluded = load_exclusion_list(exclude_file)
317  
318      deduped = []
319      existing_dupes = 0
320      burned_dupes = 0
321  
322      for lead in leads:
323          email = lead["email"]
324          if email in existing:
325              existing_dupes += 1
326          elif email in excluded:
327              burned_dupes += 1
328          else:
329              deduped.append(lead)
330  
331      print(f"\n  ✅ Net new leads: {len(deduped)}")
332      print(f"  🔄 Already in campaign tool: {existing_dupes}")
333      print(f"  🚫 On exclusion list: {burned_dupes}")
334  
335      return deduped, {
336          "existing_dupes": existing_dupes,
337          "burned_dupes": burned_dupes,
338          "net_new": len(deduped),
339      }
340  
341  
342  # ---------------------------------------------------------------------------
343  # Step 4: Upload to Campaign Tool
344  # ---------------------------------------------------------------------------
345  
346  def generate_personalization(lead):
347      """Generate a simple 1-line personalization."""
348      name = lead.get("first_name", "")
349      company = lead.get("company_name", "")
350      title = lead.get("title", "")
351  
352      if company and title:
353          return f"Noticed you're {title} at {company} — curious how you're thinking about growth this quarter."
354      elif company:
355          return f"Been following {company}'s trajectory — impressive momentum."
356      elif title:
357          return f"As a {title}, you're probably juggling growth and efficiency right now."
358      return "Your background caught my eye — wanted to reach out."
359  
360  
361  def upload_leads(api_key, leads, campaign_id, dry_run=False):
362      """Upload leads to campaign tool in batches."""
363      print(f"\n{'='*50}")
364      print(f"STEP 4: Uploading {len(leads)} leads")
365      print(f"{'='*50}")
366  
367      if dry_run:
368          print("  🏃 DRY RUN — skipping actual upload")
369          return {"uploaded": 0, "failed": 0, "dry_run": True}
370  
371      url = f"{CAMPAIGN_API_BASE}/leads"
372      headers = {
373          "Authorization": f"Bearer {api_key}",
374          "Content-Type": "application/json",
375      }
376  
377      uploaded = 0
378      failed = 0
379      batch_size = 25
380  
381      for i in range(0, len(leads), batch_size):
382          batch = leads[i:i + batch_size]
383          batch_num = (i // batch_size) + 1
384          total_batches = (len(leads) + batch_size - 1) // batch_size
385  
386          print(f"  📤 Batch {batch_num}/{total_batches} ({len(batch)} leads) …", end=" ", flush=True)
387  
388          batch_success = 0
389          batch_fail = 0
390  
391          for lead in batch:
392              body = {
393                  "email": lead["email"],
394                  "first_name": lead.get("first_name", ""),
395                  "last_name": lead.get("last_name", ""),
396                  "company_name": lead.get("company_name", ""),
397                  "campaign": campaign_id,
398                  "custom_variables": {
399                      "title": lead.get("title", ""),
400                      "company_name": lead.get("company_name", ""),
401                      "personalization": generate_personalization(lead),
402                  },
403              }
404  
405              try:
406                  resp = request_with_retry("POST", url, headers=headers, json=body)
407                  if resp.status_code in (200, 201):
408                      batch_success += 1
409                  else:
410                      batch_fail += 1
411                      if batch_fail <= 3:
412                          print(f"\n    ⚠️  Failed {lead['email']}: {resp.status_code} {resp.text[:100]}")
413              except Exception as e:
414                  batch_fail += 1
415                  print(f"\n    ⚠️  Error uploading {lead['email']}: {e}")
416  
417          uploaded += batch_success
418          failed += batch_fail
419          print(f"✓ {batch_success} ok, {batch_fail} failed")
420  
421          if i + batch_size < len(leads):
422              time.sleep(1)
423  
424      print(f"\n  ✅ Uploaded: {uploaded}")
425      if failed:
426          print(f"  ❌ Failed: {failed}")
427  
428      return {"uploaded": uploaded, "failed": failed, "dry_run": False}
429  
430  
431  # ---------------------------------------------------------------------------
432  # Reporting
433  # ---------------------------------------------------------------------------
434  
435  def save_report(output_dir, sourced, verified_stats, dedup_stats, upload_stats, leads_uploaded, args):
436      """Save run log as JSON."""
437      timestamp = datetime.now().strftime("%Y-%m-%d-%H-%M")
438      report_path = os.path.join(output_dir, f"{timestamp}.json")
439  
440      report = {
441          "timestamp": datetime.now().isoformat(),
442          "parameters": {
443              "titles": args.titles,
444              "industries": args.industries,
445              "company_size": args.company_size,
446              "locations": args.locations,
447              "keywords": args.keywords,
448              "campaign_id": args.campaign_id,
449              "volume": args.volume,
450              "exclude_file": args.exclude_file,
451              "dry_run": args.dry_run,
452          },
453          "results": {
454              "sourced": sourced,
455              "verification": verified_stats,
456              "deduplication": dedup_stats,
457              "upload": upload_stats,
458          },
459          "leads_uploaded": [
460              {k: v for k, v in lead.items() if k not in ("is_free_email", "is_role_based")}
461              for lead in leads_uploaded
462          ],
463      }
464  
465      os.makedirs(output_dir, exist_ok=True)
466      with open(report_path, "w") as f:
467          json.dump(report, f, indent=2, default=str)
468  
469      print(f"\n  💾 Run log saved: {report_path}")
470      return report_path
471  
472  
473  def print_summary(sourced_count, verified_stats, dedup_stats, upload_stats):
474      """Print final summary."""
475      print(f"\n{'='*50}")
476      print(f"  LEAD PIPELINE SUMMARY")
477      print(f"{'='*50}")
478      print(f"  Sourced:                 {sourced_count:>6}")
479      print(f"  Verified (valid):        {verified_stats['valid']:>6}  ({verified_stats['valid']/max(sourced_count,1)*100:.1f}%)")
480      print(f"  Already in campaign:     {dedup_stats['existing_dupes']:>6}")
481      print(f"  Excluded (burned list):  {dedup_stats['burned_dupes']:>6}")
482      print(f"  Net new uploaded:        {upload_stats['uploaded']:>6}")
483      if upload_stats.get('failed'):
484          print(f"  Failed uploads:          {upload_stats['failed']:>6}")
485      if upload_stats.get('dry_run'):
486          print(f"  ⚠️  DRY RUN — nothing was uploaded")
487      print(f"{'='*50}\n")
488  
489  
490  # ---------------------------------------------------------------------------
491  # Main
492  # ---------------------------------------------------------------------------
493  
494  def main():
495      parser = argparse.ArgumentParser(
496          description="Lead Pipeline: Source → Verify → Dedupe → Upload",
497          formatter_class=argparse.RawDescriptionHelpFormatter,
498          epilog="""
499  Examples:
500    # Full pipeline run
501    python3 lead-pipeline.py \\
502      --source-api-key KEY --validation-api-key KEY --campaign-api-key KEY \\
503      --titles "VP Marketing,CMO" --industries "SaaS" \\
504      --company-size "11,50" --locations "United States" \\
505      --campaign-id abc-123 --volume 200
506  
507    # Dry run (no upload)
508    python3 lead-pipeline.py \\
509      --source-api-key KEY --validation-api-key KEY --campaign-api-key KEY \\
510      --titles "CTO,VP Engineering" --company-size "51,200" \\
511      --campaign-id abc-123 --volume 100 --dry-run
512          """,
513      )
514  
515      parser.add_argument("--source-api-key", required=True, help="People search API key")
516      parser.add_argument("--validation-api-key", required=True, help="Email validation API key")
517      parser.add_argument("--campaign-api-key", required=True, help="Campaign tool API key")
518      parser.add_argument("--titles", required=True, help="Comma-separated job titles")
519      parser.add_argument("--industries", default="", help="Comma-separated industries/keywords")
520      parser.add_argument("--company-size", default="", help="Employee range, e.g. '11,50'")
521      parser.add_argument("--locations", default="", help="Comma-separated locations")
522      parser.add_argument("--keywords", default="", help="Additional search keywords")
523      parser.add_argument("--campaign-id", required=True, help="Campaign UUID")
524      parser.add_argument("--volume", type=int, default=500, help="Target number of leads (default: 500)")
525      parser.add_argument("--exclude-file", default=None, help="Path to CSV of burned/excluded emails")
526      parser.add_argument("--output-dir", default="./data/pipeline-runs/", help="Directory for run logs")
527      parser.add_argument("--dry-run", action="store_true", help="Run pipeline but skip upload")
528  
529      args = parser.parse_args()
530  
531      start_time = time.time()
532      os.makedirs(args.output_dir, exist_ok=True)
533  
534      print(f"\n🚀 Lead Pipeline Started — {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
535      print(f"   Target: {args.volume} leads → Campaign {args.campaign_id}")
536      if args.dry_run:
537          print(f"   ⚠️  DRY RUN MODE — will not upload")
538  
539      # Step 1: Source
540      sourced_leads = source_leads(
541          api_key=getattr(args, 'source_api_key'),
542          titles=args.titles,
543          industries=args.industries,
544          company_size=args.company_size,
545          locations=args.locations,
546          keywords=args.keywords,
547          volume=args.volume,
548      )
549  
550      if not sourced_leads:
551          print("\n❌ No leads sourced. Exiting.")
552          sys.exit(1)
553  
554      # Save intermediate state
555      intermediate_path = os.path.join(args.output_dir, "last-sourced.json")
556      with open(intermediate_path, "w") as f:
557          json.dump(sourced_leads, f, indent=2)
558  
559      # Step 2: Verify
560      verified_leads, verified_stats = verify_emails(args.validation_api_key, sourced_leads)
561  
562      if not verified_leads:
563          print("\n❌ No leads passed verification. Exiting.")
564          sys.exit(1)
565  
566      intermediate_path = os.path.join(args.output_dir, "last-verified.json")
567      with open(intermediate_path, "w") as f:
568          json.dump(verified_leads, f, indent=2)
569  
570      # Step 3: Deduplicate
571      deduped_leads, dedup_stats = deduplicate(verified_leads, args.campaign_api_key, args.exclude_file)
572  
573      if not deduped_leads:
574          print("\n⚠️  All leads already exist. Nothing to upload.")
575          upload_stats = {"uploaded": 0, "failed": 0, "dry_run": args.dry_run}
576      else:
577          # Step 4: Upload
578          upload_stats = upload_leads(args.campaign_api_key, deduped_leads, args.campaign_id, args.dry_run)
579  
580      # Step 5: Report
581      print_summary(len(sourced_leads), verified_stats, dedup_stats, upload_stats)
582  
583      save_report(
584          args.output_dir,
585          sourced=len(sourced_leads),
586          verified_stats=verified_stats,
587          dedup_stats=dedup_stats,
588          upload_stats=upload_stats,
589          leads_uploaded=deduped_leads,
590          args=args,
591      )
592  
593      elapsed = time.time() - start_time
594      print(f"⏱️  Completed in {elapsed/60:.1f} minutes")
595  
596  
597  if __name__ == "__main__":
598      main()