lead-enricher.py
1 #!/usr/bin/env python3 2 """ 3 Lead Enricher — Enriches inbound leads with CRM data, account research, 4 and structured dossier format. 5 6 Designed to process leads from webhooks, forms, CRM triggers, or chat channels. 7 Can run as a cron job or be called directly. 8 9 Usage: 10 python3 lead-enricher.py [--dry-run] [--backfill N] 11 python3 lead-enricher.py --input leads.json --output enriched.json 12 """ 13 14 import argparse 15 import json 16 import logging 17 import os 18 import re 19 import sys 20 import time 21 from datetime import datetime, timezone 22 from pathlib import Path 23 from urllib.request import urlopen, Request 24 from urllib.error import URLError, HTTPError 25 import urllib.parse 26 27 logging.basicConfig( 28 level=logging.INFO, 29 format="%(asctime)s [%(levelname)s] %(message)s" 30 ) 31 log = logging.getLogger("lead-enricher") 32 33 SCRIPT_DIR = Path(__file__).resolve().parent 34 DATA_DIR = SCRIPT_DIR.parent / "data" 35 STATE_PATH = DATA_DIR / "lead-enricher-state.json" 36 LOG_DIR = SCRIPT_DIR.parent / "logs" / "lead-enricher" 37 38 # CRM configuration — set via environment variables 39 CRM_BASE_URL = os.environ.get("CRM_BASE_URL", "https://api.hubapi.com") 40 CRM_API_KEY = os.environ.get("CRM_API_KEY", "") 41 CRM_PORTAL_ID = os.environ.get("CRM_PORTAL_ID", "") 42 43 44 # ── CRM Helpers ── 45 46 def crm_search_contact(email): 47 """Search CRM for contact by email, return properties.""" 48 if not CRM_API_KEY or not email: 49 return None, None 50 51 url = f"{CRM_BASE_URL}/crm/v3/objects/contacts/search" 52 headers = { 53 "Authorization": f"Bearer {CRM_API_KEY}", 54 "Content-Type": "application/json" 55 } 56 body = { 57 "filterGroups": [{ 58 "filters": [{ 59 "propertyName": "email", 60 "operator": "EQ", 61 "value": email 62 }] 63 }], 64 "properties": [ 65 "firstname", "lastname", "email", "phone", "company", 66 "jobtitle", "website", "annualrevenue", "industry", 67 "num_employees", "hs_lead_status", "lifecyclestage", 68 "message", "country" 69 ] 70 } 71 data = json.dumps(body).encode() 72 req = Request(url, data=data, headers=headers, method="POST") 73 try: 74 with urlopen(req, timeout=15) as resp: 75 result = json.loads(resp.read()) 76 results = result.get("results", []) 77 if results: 78 return results[0].get("properties", {}), results[0].get("id") 79 return None, None 80 except Exception as e: 81 log.warning(f"CRM contact search failed for {email}: {e}") 82 return None, None 83 84 85 def crm_search_company(domain): 86 """Search CRM for company by domain.""" 87 if not CRM_API_KEY or not domain: 88 return None 89 90 url = f"{CRM_BASE_URL}/crm/v3/objects/companies/search" 91 headers = { 92 "Authorization": f"Bearer {CRM_API_KEY}", 93 "Content-Type": "application/json" 94 } 95 body = { 96 "filterGroups": [{ 97 "filters": [{ 98 "propertyName": "domain", 99 "operator": "EQ", 100 "value": domain 101 }] 102 }], 103 "properties": [ 104 "name", "domain", "annualrevenue", "industry", 105 "numberofemployees", "description", "website" 106 ] 107 } 108 data = json.dumps(body).encode() 109 req = Request(url, data=data, headers=headers, method="POST") 110 try: 111 with urlopen(req, timeout=15) as resp: 112 result = json.loads(resp.read()) 113 results = result.get("results", []) 114 if results: 115 return results[0].get("properties", {}) 116 return None 117 except Exception as e: 118 log.warning(f"CRM company search failed for {domain}: {e}") 119 return None 120 121 122 def format_revenue(rev_str): 123 """Format revenue string nicely.""" 124 if not rev_str: 125 return "Unknown" 126 try: 127 rev = float(rev_str) 128 if rev >= 1_000_000_000: 129 return f"${rev/1_000_000_000:.1f}B" 130 elif rev >= 1_000_000: 131 return f"${rev/1_000_000:.0f}M" 132 elif rev >= 1_000: 133 return f"${rev/1_000:.0f}K" 134 else: 135 return f"${rev:.0f}" 136 except (ValueError, TypeError): 137 return rev_str 138 139 140 # ── Lead Parsing ── 141 142 def parse_form_lead(data): 143 """Parse a lead from a form submission (generic JSON format).""" 144 info = {} 145 field_map = { 146 "name": ["name", "full_name", "contact_name"], 147 "email": ["email", "email_address"], 148 "company": ["company", "company_name", "organization"], 149 "title": ["title", "job_title", "jobtitle", "position"], 150 "phone": ["phone", "phone_number", "tel"], 151 "website": ["website", "company_url", "domain"], 152 "industry": ["industry", "vertical"], 153 "employees": ["employees", "company_size", "num_employees"], 154 "revenue": ["revenue", "annual_revenue", "annualrevenue"], 155 "budget": ["budget", "monthly_budget"], 156 "interest": ["interest", "services", "services_interested_in"], 157 "source": ["source", "lead_source", "how_did_you_hear"], 158 "message": ["message", "notes", "comments"], 159 "country": ["country", "location"], 160 "tier": ["tier", "lead_tier"], 161 } 162 163 for target, candidates in field_map.items(): 164 for candidate in candidates: 165 val = data.get(candidate, "") 166 if val and str(val).strip() and str(val).lower() not in ("n/a", "unknown", ""): 167 info[target] = str(val).strip() 168 break 169 170 return info 171 172 173 # ── Enriched Card Builder ── 174 175 def build_enriched_card(info, crm_contact=None, crm_company=None, crm_contact_id=None): 176 """Build a structured enriched lead card.""" 177 name = info.get("name", "Unknown") 178 email = info.get("email") or (crm_contact or {}).get("email", "") 179 company = info.get("company", "") 180 website = info.get("website") or (crm_contact or {}).get("website", "") 181 title = info.get("title") or (crm_contact or {}).get("jobtitle", "") 182 phone = info.get("phone") or (crm_contact or {}).get("phone", "") 183 industry = info.get("industry") or (crm_company or {}).get("industry") or (crm_contact or {}).get("industry", "") 184 employees = info.get("employees") or (crm_company or {}).get("numberofemployees") or (crm_contact or {}).get("num_employees", "") 185 budget = info.get("budget", "") 186 source = info.get("source", "") 187 188 revenue_raw = (info.get("revenue") or 189 (crm_company or {}).get("annualrevenue") or 190 (crm_contact or {}).get("annualrevenue", "")) 191 revenue = format_revenue(revenue_raw) if revenue_raw else "Unknown" 192 193 services = info.get("interest", "") 194 problem = info.get("message", "") 195 196 website_display = website 197 if website_display: 198 website_display = re.sub(r'^https?://(www\.)?', '', website_display).rstrip('/') 199 200 # Build card 201 card = { 202 "name": name, 203 "email": email, 204 "company": company, 205 "website": website_display, 206 "title": title, 207 "phone": phone, 208 "industry": industry, 209 "employees": employees, 210 "revenue": revenue, 211 "budget": budget, 212 "services": services, 213 "problem": problem, 214 "source": source, 215 "crm_contact_id": crm_contact_id, 216 "enriched_at": datetime.now(timezone.utc).isoformat(), 217 } 218 219 # Build text summary 220 lines = ["📋 ENRICHED LEAD", ""] 221 lines.append(f"Name: {name}") 222 if email: 223 lines.append(f"Email: {email}") 224 if company: 225 lines.append(f"Company: {company}") 226 if website_display: 227 lines.append(f"Website: {website_display}") 228 if title: 229 lines.append(f"Title: {title}") 230 if revenue != "Unknown": 231 lines.append(f"Revenue: {revenue}") 232 if budget: 233 lines.append(f"Budget: {budget}") 234 if services: 235 lines.append(f"Services: {services}") 236 if industry: 237 lines.append(f"Industry: {industry}") 238 if employees: 239 lines.append(f"Employees: {employees}") 240 if problem: 241 lines.append(f"\nProblem: {problem[:500]}") 242 243 footer_parts = [] 244 if phone: 245 footer_parts.append(f"📞 {phone}") 246 if source: 247 footer_parts.append(f"Source: {source}") 248 now = datetime.now(timezone.utc).strftime("%m/%d/%Y, %I:%M:%S %p UTC") 249 footer_parts.append(f"🕐 {now}") 250 251 if footer_parts: 252 lines.append("") 253 lines.append(" | ".join(footer_parts)) 254 255 card["text_summary"] = "\n".join(lines) 256 return card 257 258 259 # ── Main ── 260 261 def process_leads(leads): 262 """Enrich a list of lead dicts.""" 263 results = [] 264 265 for lead_data in leads: 266 info = parse_form_lead(lead_data) 267 if not info.get("name") and not info.get("email"): 268 log.warning(f"Skipping lead with no name or email: {lead_data}") 269 continue 270 271 log.info(f"Processing lead: {info.get('name', 'Unknown')}") 272 273 # CRM enrichment 274 crm_contact = None 275 crm_contact_id = None 276 crm_company = None 277 278 if info.get("email"): 279 crm_contact, crm_contact_id = crm_search_contact(info["email"]) 280 281 domain = info.get("website", "") 282 if domain: 283 domain = re.sub(r'^https?://(www\.)?', '', domain).rstrip('/').split('/')[0] 284 if domain: 285 crm_company = crm_search_company(domain) 286 287 card = build_enriched_card(info, crm_contact, crm_company, crm_contact_id) 288 results.append(card) 289 290 log.info(f"✅ Enriched: {info.get('name', 'Unknown')}") 291 time.sleep(0.5) # Rate limit 292 293 return results 294 295 296 def main(): 297 parser = argparse.ArgumentParser(description="Lead Enricher") 298 parser.add_argument("--input", help="JSON file with lead data") 299 parser.add_argument("--output", help="Output JSON file for enriched leads") 300 parser.add_argument("--dry-run", action="store_true", help="Process but don't write output") 301 args = parser.parse_args() 302 303 LOG_DIR.mkdir(parents=True, exist_ok=True) 304 305 if args.input: 306 with open(args.input) as f: 307 leads = json.load(f) 308 if not isinstance(leads, list): 309 leads = [leads] 310 else: 311 # Read from stdin 312 leads = json.load(sys.stdin) 313 if not isinstance(leads, list): 314 leads = [leads] 315 316 log.info(f"Processing {len(leads)} leads") 317 318 results = process_leads(leads) 319 320 if args.dry_run: 321 for r in results: 322 print(r.get("text_summary", "")) 323 print("---") 324 elif args.output: 325 with open(args.output, "w") as f: 326 json.dump(results, f, indent=2) 327 log.info(f"📄 Results written to {args.output}") 328 else: 329 print(json.dumps(results, indent=2)) 330 331 log.info(f"Done. Enriched {len(results)} leads.") 332 333 334 if __name__ == "__main__": 335 main()