lead-pipeline.py
1 #!/usr/bin/env python3 2 """ 3 Lead Pipeline: Source → Verify → Dedupe → Upload 4 5 End-to-end lead sourcing, email verification, deduplication, and campaign upload. 6 API-agnostic — configure endpoints and keys via environment variables. 7 8 Usage: 9 python3 lead-pipeline.py \\ 10 --source-api-key KEY --validation-api-key KEY --campaign-api-key KEY \\ 11 --titles "VP Marketing,CMO" --campaign-id UUID --volume 200 12 13 python3 lead-pipeline.py --help 14 """ 15 16 import argparse 17 import json 18 import os 19 import sys 20 import time 21 from datetime import datetime 22 from pathlib import Path 23 24 try: 25 import requests 26 except ImportError: 27 print("ERROR: 'requests' package required. Run: pip3 install requests", file=sys.stderr) 28 sys.exit(1) 29 30 31 # --------------------------------------------------------------------------- 32 # Configuration — override via environment variables 33 # --------------------------------------------------------------------------- 34 35 SOURCE_API_URL = os.environ.get( 36 "LEAD_SOURCE_API_URL", 37 "https://api.apollo.io/api/v1/mixed_people/search" 38 ) 39 VALIDATION_API_URL = os.environ.get( 40 "EMAIL_VALIDATION_API_URL", 41 "https://api.your-email-provider.com/v1/people/email-validation" 42 ) 43 CAMPAIGN_API_BASE = os.environ.get( 44 "CAMPAIGN_API_BASE_URL", 45 "https://api.your-campaign-tool.com/api/v2" 46 ) 47 48 49 # --------------------------------------------------------------------------- 50 # Retry / backoff helper 51 # --------------------------------------------------------------------------- 52 53 def request_with_retry(method, url, max_retries=5, **kwargs): 54 """HTTP request with exponential backoff on 429 / 5xx.""" 55 backoff = 1 56 for attempt in range(max_retries + 1): 57 try: 58 resp = requests.request(method, url, timeout=30, **kwargs) 59 if resp.status_code == 429: 60 wait = int(resp.headers.get("Retry-After", backoff)) 61 print(f" ⏳ Rate limited (429). Waiting {wait}s …") 62 time.sleep(wait) 63 backoff = min(backoff * 2, 60) 64 continue 65 if resp.status_code >= 500: 66 print(f" ⚠️ Server error {resp.status_code}. Retry in {backoff}s …") 67 time.sleep(backoff) 68 backoff = min(backoff * 2, 60) 69 continue 70 return resp 71 except requests.exceptions.RequestException as e: 72 if attempt == max_retries: 73 raise 74 print(f" ⚠️ Request error: {e}. Retry in {backoff}s …") 75 time.sleep(backoff) 76 backoff = min(backoff * 2, 60) 77 return resp # type: ignore 78 79 80 # --------------------------------------------------------------------------- 81 # Step 1: Source Leads (People Search API) 82 # --------------------------------------------------------------------------- 83 84 def source_leads(api_key, titles, industries, company_size, locations, keywords, volume): 85 """Pull leads from a people search API.""" 86 print(f"\n{'='*50}") 87 print(f"STEP 1: Sourcing leads (target: {volume})") 88 print(f"{'='*50}") 89 90 leads = [] 91 page = 1 92 93 size_ranges = [] 94 if company_size: 95 parts = [s.strip() for s in company_size.split(",")] 96 if len(parts) == 2: 97 size_ranges = [f"{parts[0]},{parts[1]}"] 98 else: 99 size_ranges = parts 100 101 while len(leads) < volume: 102 body = { 103 "api_key": api_key, 104 "per_page": 100, 105 "page": page, 106 } 107 if titles: 108 body["person_titles"] = [t.strip() for t in titles.split(",")] 109 if industries: 110 body["q_organization_keyword_tags"] = [i.strip() for i in industries.split(",")] 111 if size_ranges: 112 body["organization_num_employees_ranges"] = size_ranges 113 if locations: 114 body["person_locations"] = [l.strip() for l in locations.split(",")] 115 if keywords: 116 body["q_keywords"] = keywords 117 118 print(f" 📡 Page {page} …", end=" ", flush=True) 119 resp = request_with_retry("POST", SOURCE_API_URL, json=body) 120 121 if resp.status_code != 200: 122 print(f"ERROR {resp.status_code}: {resp.text[:200]}") 123 break 124 125 data = resp.json() 126 people = data.get("people", []) 127 if not people: 128 print("no more results.") 129 break 130 131 page_leads = 0 132 for person in people: 133 email = person.get("email") 134 if not email: 135 continue 136 leads.append({ 137 "email": email.lower().strip(), 138 "first_name": person.get("first_name", ""), 139 "last_name": person.get("last_name", ""), 140 "title": person.get("title", ""), 141 "company_name": (person.get("organization") or {}).get("name", ""), 142 "domain": (person.get("organization") or {}).get("primary_domain", ""), 143 }) 144 page_leads += 1 145 if len(leads) >= volume: 146 break 147 148 print(f"{page_leads} with email ({len(leads)} total)") 149 150 total_pages = data.get("pagination", {}).get("total_pages", page) 151 if page >= total_pages: 152 print(" Reached last page.") 153 break 154 page += 1 155 time.sleep(0.5) 156 157 # Dedupe by email within sourced set 158 seen = set() 159 unique_leads = [] 160 for lead in leads: 161 if lead["email"] not in seen: 162 seen.add(lead["email"]) 163 unique_leads.append(lead) 164 165 print(f"\n ✅ Sourced {len(unique_leads)} unique leads with emails") 166 return unique_leads 167 168 169 # --------------------------------------------------------------------------- 170 # Step 2: Email Verification 171 # --------------------------------------------------------------------------- 172 173 def verify_emails(api_key, leads): 174 """Verify emails via validation API. Returns only valid leads.""" 175 print(f"\n{'='*50}") 176 print(f"STEP 2: Verifying {len(leads)} emails") 177 print(f"{'='*50}") 178 179 headers = { 180 "X-API-Key": api_key, 181 "Content-Type": "application/json", 182 } 183 184 valid_leads = [] 185 invalid_count = 0 186 unknown_count = 0 187 error_count = 0 188 rejection_reasons = {} 189 190 for i, lead in enumerate(leads): 191 if (i + 1) % 50 == 0 or i == 0: 192 print(f" 🔍 Verifying {i+1}/{len(leads)} …") 193 194 try: 195 resp = request_with_retry("POST", VALIDATION_API_URL, headers=headers, json={"email": lead["email"]}) 196 197 if resp.status_code != 200: 198 error_count += 1 199 continue 200 201 data = resp.json() 202 status = data.get("email_status", "unknown") 203 204 if status == "valid": 205 lead["is_free_email"] = data.get("is_free_email", False) 206 lead["is_role_based"] = data.get("is_role_based", False) 207 valid_leads.append(lead) 208 elif status == "invalid": 209 invalid_count += 1 210 rejection_reasons["invalid"] = rejection_reasons.get("invalid", 0) + 1 211 else: 212 unknown_count += 1 213 rejection_reasons["unknown"] = rejection_reasons.get("unknown", 0) + 1 214 215 except Exception as e: 216 error_count += 1 217 print(f" ⚠️ Error verifying {lead['email']}: {e}") 218 219 if (i + 1) % 20 == 0: 220 time.sleep(0.5) 221 222 print(f"\n ✅ Verified: {len(valid_leads)} valid") 223 print(f" ❌ Invalid: {invalid_count}") 224 print(f" ❓ Unknown: {unknown_count}") 225 print(f" ⚠️ Errors: {error_count}") 226 227 return valid_leads, { 228 "total": len(leads), 229 "valid": len(valid_leads), 230 "invalid": invalid_count, 231 "unknown": unknown_count, 232 "errors": error_count, 233 "rejection_reasons": rejection_reasons, 234 } 235 236 237 # --------------------------------------------------------------------------- 238 # Step 3: Deduplicate 239 # --------------------------------------------------------------------------- 240 241 def get_existing_emails(api_key): 242 """Pull existing leads from campaign tool for dedup.""" 243 print(f"\n 📥 Fetching existing leads for dedup …") 244 245 url = f"{CAMPAIGN_API_BASE}/leads/list" 246 headers = { 247 "Authorization": f"Bearer {api_key}", 248 "Content-Type": "application/json", 249 } 250 251 existing_emails = set() 252 cursor = None 253 page = 0 254 255 while True: 256 body = {"limit": 100} 257 if cursor: 258 body["starting_after"] = cursor 259 260 resp = request_with_retry("POST", url, headers=headers, json=body) 261 262 if resp.status_code != 200: 263 print(f" ⚠️ List error {resp.status_code}: {resp.text[:200]}") 264 break 265 266 data = resp.json() 267 items = data.get("items", []) 268 269 if not items: 270 break 271 272 for item in items: 273 email = item.get("email", "").lower().strip() 274 if email: 275 existing_emails.add(email) 276 277 cursor = data.get("next_starting_after") 278 if not cursor: 279 break 280 281 page += 1 282 if page % 10 == 0: 283 print(f" … {len(existing_emails)} existing leads so far") 284 time.sleep(1) 285 286 print(f" 📊 Found {len(existing_emails)} existing leads") 287 return existing_emails 288 289 290 def load_exclusion_list(filepath): 291 """Load burned emails from a CSV file.""" 292 excluded = set() 293 if not filepath or not os.path.exists(filepath): 294 return excluded 295 296 with open(filepath, "r") as f: 297 for line in f: 298 line = line.strip() 299 if not line or line.startswith("#"): 300 continue 301 email = line.split(",")[0].strip().strip('"').lower() 302 if "@" in email: 303 excluded.add(email) 304 305 print(f" 📋 Loaded {len(excluded)} emails from exclusion list") 306 return excluded 307 308 309 def deduplicate(leads, api_key, exclude_file=None): 310 """Remove leads already in campaign tool or on exclusion list.""" 311 print(f"\n{'='*50}") 312 print(f"STEP 3: Deduplicating {len(leads)} leads") 313 print(f"{'='*50}") 314 315 existing = get_existing_emails(api_key) 316 excluded = load_exclusion_list(exclude_file) 317 318 deduped = [] 319 existing_dupes = 0 320 burned_dupes = 0 321 322 for lead in leads: 323 email = lead["email"] 324 if email in existing: 325 existing_dupes += 1 326 elif email in excluded: 327 burned_dupes += 1 328 else: 329 deduped.append(lead) 330 331 print(f"\n ✅ Net new leads: {len(deduped)}") 332 print(f" 🔄 Already in campaign tool: {existing_dupes}") 333 print(f" 🚫 On exclusion list: {burned_dupes}") 334 335 return deduped, { 336 "existing_dupes": existing_dupes, 337 "burned_dupes": burned_dupes, 338 "net_new": len(deduped), 339 } 340 341 342 # --------------------------------------------------------------------------- 343 # Step 4: Upload to Campaign Tool 344 # --------------------------------------------------------------------------- 345 346 def generate_personalization(lead): 347 """Generate a simple 1-line personalization.""" 348 name = lead.get("first_name", "") 349 company = lead.get("company_name", "") 350 title = lead.get("title", "") 351 352 if company and title: 353 return f"Noticed you're {title} at {company} — curious how you're thinking about growth this quarter." 354 elif company: 355 return f"Been following {company}'s trajectory — impressive momentum." 356 elif title: 357 return f"As a {title}, you're probably juggling growth and efficiency right now." 358 return "Your background caught my eye — wanted to reach out." 359 360 361 def upload_leads(api_key, leads, campaign_id, dry_run=False): 362 """Upload leads to campaign tool in batches.""" 363 print(f"\n{'='*50}") 364 print(f"STEP 4: Uploading {len(leads)} leads") 365 print(f"{'='*50}") 366 367 if dry_run: 368 print(" 🏃 DRY RUN — skipping actual upload") 369 return {"uploaded": 0, "failed": 0, "dry_run": True} 370 371 url = f"{CAMPAIGN_API_BASE}/leads" 372 headers = { 373 "Authorization": f"Bearer {api_key}", 374 "Content-Type": "application/json", 375 } 376 377 uploaded = 0 378 failed = 0 379 batch_size = 25 380 381 for i in range(0, len(leads), batch_size): 382 batch = leads[i:i + batch_size] 383 batch_num = (i // batch_size) + 1 384 total_batches = (len(leads) + batch_size - 1) // batch_size 385 386 print(f" 📤 Batch {batch_num}/{total_batches} ({len(batch)} leads) …", end=" ", flush=True) 387 388 batch_success = 0 389 batch_fail = 0 390 391 for lead in batch: 392 body = { 393 "email": lead["email"], 394 "first_name": lead.get("first_name", ""), 395 "last_name": lead.get("last_name", ""), 396 "company_name": lead.get("company_name", ""), 397 "campaign": campaign_id, 398 "custom_variables": { 399 "title": lead.get("title", ""), 400 "company_name": lead.get("company_name", ""), 401 "personalization": generate_personalization(lead), 402 }, 403 } 404 405 try: 406 resp = request_with_retry("POST", url, headers=headers, json=body) 407 if resp.status_code in (200, 201): 408 batch_success += 1 409 else: 410 batch_fail += 1 411 if batch_fail <= 3: 412 print(f"\n ⚠️ Failed {lead['email']}: {resp.status_code} {resp.text[:100]}") 413 except Exception as e: 414 batch_fail += 1 415 print(f"\n ⚠️ Error uploading {lead['email']}: {e}") 416 417 uploaded += batch_success 418 failed += batch_fail 419 print(f"✓ {batch_success} ok, {batch_fail} failed") 420 421 if i + batch_size < len(leads): 422 time.sleep(1) 423 424 print(f"\n ✅ Uploaded: {uploaded}") 425 if failed: 426 print(f" ❌ Failed: {failed}") 427 428 return {"uploaded": uploaded, "failed": failed, "dry_run": False} 429 430 431 # --------------------------------------------------------------------------- 432 # Reporting 433 # --------------------------------------------------------------------------- 434 435 def save_report(output_dir, sourced, verified_stats, dedup_stats, upload_stats, leads_uploaded, args): 436 """Save run log as JSON.""" 437 timestamp = datetime.now().strftime("%Y-%m-%d-%H-%M") 438 report_path = os.path.join(output_dir, f"{timestamp}.json") 439 440 report = { 441 "timestamp": datetime.now().isoformat(), 442 "parameters": { 443 "titles": args.titles, 444 "industries": args.industries, 445 "company_size": args.company_size, 446 "locations": args.locations, 447 "keywords": args.keywords, 448 "campaign_id": args.campaign_id, 449 "volume": args.volume, 450 "exclude_file": args.exclude_file, 451 "dry_run": args.dry_run, 452 }, 453 "results": { 454 "sourced": sourced, 455 "verification": verified_stats, 456 "deduplication": dedup_stats, 457 "upload": upload_stats, 458 }, 459 "leads_uploaded": [ 460 {k: v for k, v in lead.items() if k not in ("is_free_email", "is_role_based")} 461 for lead in leads_uploaded 462 ], 463 } 464 465 os.makedirs(output_dir, exist_ok=True) 466 with open(report_path, "w") as f: 467 json.dump(report, f, indent=2, default=str) 468 469 print(f"\n 💾 Run log saved: {report_path}") 470 return report_path 471 472 473 def print_summary(sourced_count, verified_stats, dedup_stats, upload_stats): 474 """Print final summary.""" 475 print(f"\n{'='*50}") 476 print(f" LEAD PIPELINE SUMMARY") 477 print(f"{'='*50}") 478 print(f" Sourced: {sourced_count:>6}") 479 print(f" Verified (valid): {verified_stats['valid']:>6} ({verified_stats['valid']/max(sourced_count,1)*100:.1f}%)") 480 print(f" Already in campaign: {dedup_stats['existing_dupes']:>6}") 481 print(f" Excluded (burned list): {dedup_stats['burned_dupes']:>6}") 482 print(f" Net new uploaded: {upload_stats['uploaded']:>6}") 483 if upload_stats.get('failed'): 484 print(f" Failed uploads: {upload_stats['failed']:>6}") 485 if upload_stats.get('dry_run'): 486 print(f" ⚠️ DRY RUN — nothing was uploaded") 487 print(f"{'='*50}\n") 488 489 490 # --------------------------------------------------------------------------- 491 # Main 492 # --------------------------------------------------------------------------- 493 494 def main(): 495 parser = argparse.ArgumentParser( 496 description="Lead Pipeline: Source → Verify → Dedupe → Upload", 497 formatter_class=argparse.RawDescriptionHelpFormatter, 498 epilog=""" 499 Examples: 500 # Full pipeline run 501 python3 lead-pipeline.py \\ 502 --source-api-key KEY --validation-api-key KEY --campaign-api-key KEY \\ 503 --titles "VP Marketing,CMO" --industries "SaaS" \\ 504 --company-size "11,50" --locations "United States" \\ 505 --campaign-id abc-123 --volume 200 506 507 # Dry run (no upload) 508 python3 lead-pipeline.py \\ 509 --source-api-key KEY --validation-api-key KEY --campaign-api-key KEY \\ 510 --titles "CTO,VP Engineering" --company-size "51,200" \\ 511 --campaign-id abc-123 --volume 100 --dry-run 512 """, 513 ) 514 515 parser.add_argument("--source-api-key", required=True, help="People search API key") 516 parser.add_argument("--validation-api-key", required=True, help="Email validation API key") 517 parser.add_argument("--campaign-api-key", required=True, help="Campaign tool API key") 518 parser.add_argument("--titles", required=True, help="Comma-separated job titles") 519 parser.add_argument("--industries", default="", help="Comma-separated industries/keywords") 520 parser.add_argument("--company-size", default="", help="Employee range, e.g. '11,50'") 521 parser.add_argument("--locations", default="", help="Comma-separated locations") 522 parser.add_argument("--keywords", default="", help="Additional search keywords") 523 parser.add_argument("--campaign-id", required=True, help="Campaign UUID") 524 parser.add_argument("--volume", type=int, default=500, help="Target number of leads (default: 500)") 525 parser.add_argument("--exclude-file", default=None, help="Path to CSV of burned/excluded emails") 526 parser.add_argument("--output-dir", default="./data/pipeline-runs/", help="Directory for run logs") 527 parser.add_argument("--dry-run", action="store_true", help="Run pipeline but skip upload") 528 529 args = parser.parse_args() 530 531 start_time = time.time() 532 os.makedirs(args.output_dir, exist_ok=True) 533 534 print(f"\n🚀 Lead Pipeline Started — {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") 535 print(f" Target: {args.volume} leads → Campaign {args.campaign_id}") 536 if args.dry_run: 537 print(f" ⚠️ DRY RUN MODE — will not upload") 538 539 # Step 1: Source 540 sourced_leads = source_leads( 541 api_key=getattr(args, 'source_api_key'), 542 titles=args.titles, 543 industries=args.industries, 544 company_size=args.company_size, 545 locations=args.locations, 546 keywords=args.keywords, 547 volume=args.volume, 548 ) 549 550 if not sourced_leads: 551 print("\n❌ No leads sourced. Exiting.") 552 sys.exit(1) 553 554 # Save intermediate state 555 intermediate_path = os.path.join(args.output_dir, "last-sourced.json") 556 with open(intermediate_path, "w") as f: 557 json.dump(sourced_leads, f, indent=2) 558 559 # Step 2: Verify 560 verified_leads, verified_stats = verify_emails(args.validation_api_key, sourced_leads) 561 562 if not verified_leads: 563 print("\n❌ No leads passed verification. Exiting.") 564 sys.exit(1) 565 566 intermediate_path = os.path.join(args.output_dir, "last-verified.json") 567 with open(intermediate_path, "w") as f: 568 json.dump(verified_leads, f, indent=2) 569 570 # Step 3: Deduplicate 571 deduped_leads, dedup_stats = deduplicate(verified_leads, args.campaign_api_key, args.exclude_file) 572 573 if not deduped_leads: 574 print("\n⚠️ All leads already exist. Nothing to upload.") 575 upload_stats = {"uploaded": 0, "failed": 0, "dry_run": args.dry_run} 576 else: 577 # Step 4: Upload 578 upload_stats = upload_leads(args.campaign_api_key, deduped_leads, args.campaign_id, args.dry_run) 579 580 # Step 5: Report 581 print_summary(len(sourced_leads), verified_stats, dedup_stats, upload_stats) 582 583 save_report( 584 args.output_dir, 585 sourced=len(sourced_leads), 586 verified_stats=verified_stats, 587 dedup_stats=dedup_stats, 588 upload_stats=upload_stats, 589 leads_uploaded=deduped_leads, 590 args=args, 591 ) 592 593 elapsed = time.time() - start_time 594 print(f"⏱️ Completed in {elapsed/60:.1f} minutes") 595 596 597 if __name__ == "__main__": 598 main()