/ lead-dossier / scripts / lead-enricher.py
lead-enricher.py
  1  #!/usr/bin/env python3
  2  """
  3  Lead Enricher — Enriches inbound leads with CRM data, account research,
  4  and structured dossier format.
  5  
  6  Designed to process leads from webhooks, forms, CRM triggers, or chat channels.
  7  Can run as a cron job or be called directly.
  8  
  9  Usage:
 10      python3 lead-enricher.py [--dry-run] [--backfill N]
 11      python3 lead-enricher.py --input leads.json --output enriched.json
 12  """
 13  
 14  import argparse
 15  import json
 16  import logging
 17  import os
 18  import re
 19  import sys
 20  import time
 21  from datetime import datetime, timezone
 22  from pathlib import Path
 23  from urllib.request import urlopen, Request
 24  from urllib.error import URLError, HTTPError
 25  import urllib.parse
 26  
 27  logging.basicConfig(
 28      level=logging.INFO,
 29      format="%(asctime)s [%(levelname)s] %(message)s"
 30  )
 31  log = logging.getLogger("lead-enricher")
 32  
 33  SCRIPT_DIR = Path(__file__).resolve().parent
 34  DATA_DIR = SCRIPT_DIR.parent / "data"
 35  STATE_PATH = DATA_DIR / "lead-enricher-state.json"
 36  LOG_DIR = SCRIPT_DIR.parent / "logs" / "lead-enricher"
 37  
 38  # CRM configuration — set via environment variables
 39  CRM_BASE_URL = os.environ.get("CRM_BASE_URL", "https://api.hubapi.com")
 40  CRM_API_KEY = os.environ.get("CRM_API_KEY", "")
 41  CRM_PORTAL_ID = os.environ.get("CRM_PORTAL_ID", "")
 42  
 43  
 44  # ── CRM Helpers ──
 45  
 46  def crm_search_contact(email):
 47      """Search CRM for contact by email, return properties."""
 48      if not CRM_API_KEY or not email:
 49          return None, None
 50  
 51      url = f"{CRM_BASE_URL}/crm/v3/objects/contacts/search"
 52      headers = {
 53          "Authorization": f"Bearer {CRM_API_KEY}",
 54          "Content-Type": "application/json"
 55      }
 56      body = {
 57          "filterGroups": [{
 58              "filters": [{
 59                  "propertyName": "email",
 60                  "operator": "EQ",
 61                  "value": email
 62              }]
 63          }],
 64          "properties": [
 65              "firstname", "lastname", "email", "phone", "company",
 66              "jobtitle", "website", "annualrevenue", "industry",
 67              "num_employees", "hs_lead_status", "lifecyclestage",
 68              "message", "country"
 69          ]
 70      }
 71      data = json.dumps(body).encode()
 72      req = Request(url, data=data, headers=headers, method="POST")
 73      try:
 74          with urlopen(req, timeout=15) as resp:
 75              result = json.loads(resp.read())
 76              results = result.get("results", [])
 77              if results:
 78                  return results[0].get("properties", {}), results[0].get("id")
 79              return None, None
 80      except Exception as e:
 81          log.warning(f"CRM contact search failed for {email}: {e}")
 82          return None, None
 83  
 84  
 85  def crm_search_company(domain):
 86      """Search CRM for company by domain."""
 87      if not CRM_API_KEY or not domain:
 88          return None
 89  
 90      url = f"{CRM_BASE_URL}/crm/v3/objects/companies/search"
 91      headers = {
 92          "Authorization": f"Bearer {CRM_API_KEY}",
 93          "Content-Type": "application/json"
 94      }
 95      body = {
 96          "filterGroups": [{
 97              "filters": [{
 98                  "propertyName": "domain",
 99                  "operator": "EQ",
100                  "value": domain
101              }]
102          }],
103          "properties": [
104              "name", "domain", "annualrevenue", "industry",
105              "numberofemployees", "description", "website"
106          ]
107      }
108      data = json.dumps(body).encode()
109      req = Request(url, data=data, headers=headers, method="POST")
110      try:
111          with urlopen(req, timeout=15) as resp:
112              result = json.loads(resp.read())
113              results = result.get("results", [])
114              if results:
115                  return results[0].get("properties", {})
116              return None
117      except Exception as e:
118          log.warning(f"CRM company search failed for {domain}: {e}")
119          return None
120  
121  
122  def format_revenue(rev_str):
123      """Format revenue string nicely."""
124      if not rev_str:
125          return "Unknown"
126      try:
127          rev = float(rev_str)
128          if rev >= 1_000_000_000:
129              return f"${rev/1_000_000_000:.1f}B"
130          elif rev >= 1_000_000:
131              return f"${rev/1_000_000:.0f}M"
132          elif rev >= 1_000:
133              return f"${rev/1_000:.0f}K"
134          else:
135              return f"${rev:.0f}"
136      except (ValueError, TypeError):
137          return rev_str
138  
139  
140  # ── Lead Parsing ──
141  
142  def parse_form_lead(data):
143      """Parse a lead from a form submission (generic JSON format)."""
144      info = {}
145      field_map = {
146          "name": ["name", "full_name", "contact_name"],
147          "email": ["email", "email_address"],
148          "company": ["company", "company_name", "organization"],
149          "title": ["title", "job_title", "jobtitle", "position"],
150          "phone": ["phone", "phone_number", "tel"],
151          "website": ["website", "company_url", "domain"],
152          "industry": ["industry", "vertical"],
153          "employees": ["employees", "company_size", "num_employees"],
154          "revenue": ["revenue", "annual_revenue", "annualrevenue"],
155          "budget": ["budget", "monthly_budget"],
156          "interest": ["interest", "services", "services_interested_in"],
157          "source": ["source", "lead_source", "how_did_you_hear"],
158          "message": ["message", "notes", "comments"],
159          "country": ["country", "location"],
160          "tier": ["tier", "lead_tier"],
161      }
162  
163      for target, candidates in field_map.items():
164          for candidate in candidates:
165              val = data.get(candidate, "")
166              if val and str(val).strip() and str(val).lower() not in ("n/a", "unknown", ""):
167                  info[target] = str(val).strip()
168                  break
169  
170      return info
171  
172  
173  # ── Enriched Card Builder ──
174  
175  def build_enriched_card(info, crm_contact=None, crm_company=None, crm_contact_id=None):
176      """Build a structured enriched lead card."""
177      name = info.get("name", "Unknown")
178      email = info.get("email") or (crm_contact or {}).get("email", "")
179      company = info.get("company", "")
180      website = info.get("website") or (crm_contact or {}).get("website", "")
181      title = info.get("title") or (crm_contact or {}).get("jobtitle", "")
182      phone = info.get("phone") or (crm_contact or {}).get("phone", "")
183      industry = info.get("industry") or (crm_company or {}).get("industry") or (crm_contact or {}).get("industry", "")
184      employees = info.get("employees") or (crm_company or {}).get("numberofemployees") or (crm_contact or {}).get("num_employees", "")
185      budget = info.get("budget", "")
186      source = info.get("source", "")
187  
188      revenue_raw = (info.get("revenue") or
189                     (crm_company or {}).get("annualrevenue") or
190                     (crm_contact or {}).get("annualrevenue", ""))
191      revenue = format_revenue(revenue_raw) if revenue_raw else "Unknown"
192  
193      services = info.get("interest", "")
194      problem = info.get("message", "")
195  
196      website_display = website
197      if website_display:
198          website_display = re.sub(r'^https?://(www\.)?', '', website_display).rstrip('/')
199  
200      # Build card
201      card = {
202          "name": name,
203          "email": email,
204          "company": company,
205          "website": website_display,
206          "title": title,
207          "phone": phone,
208          "industry": industry,
209          "employees": employees,
210          "revenue": revenue,
211          "budget": budget,
212          "services": services,
213          "problem": problem,
214          "source": source,
215          "crm_contact_id": crm_contact_id,
216          "enriched_at": datetime.now(timezone.utc).isoformat(),
217      }
218  
219      # Build text summary
220      lines = ["📋 ENRICHED LEAD", ""]
221      lines.append(f"Name: {name}")
222      if email:
223          lines.append(f"Email: {email}")
224      if company:
225          lines.append(f"Company: {company}")
226      if website_display:
227          lines.append(f"Website: {website_display}")
228      if title:
229          lines.append(f"Title: {title}")
230      if revenue != "Unknown":
231          lines.append(f"Revenue: {revenue}")
232      if budget:
233          lines.append(f"Budget: {budget}")
234      if services:
235          lines.append(f"Services: {services}")
236      if industry:
237          lines.append(f"Industry: {industry}")
238      if employees:
239          lines.append(f"Employees: {employees}")
240      if problem:
241          lines.append(f"\nProblem: {problem[:500]}")
242  
243      footer_parts = []
244      if phone:
245          footer_parts.append(f"📞 {phone}")
246      if source:
247          footer_parts.append(f"Source: {source}")
248      now = datetime.now(timezone.utc).strftime("%m/%d/%Y, %I:%M:%S %p UTC")
249      footer_parts.append(f"🕐 {now}")
250  
251      if footer_parts:
252          lines.append("")
253          lines.append(" | ".join(footer_parts))
254  
255      card["text_summary"] = "\n".join(lines)
256      return card
257  
258  
259  # ── Main ──
260  
261  def process_leads(leads):
262      """Enrich a list of lead dicts."""
263      results = []
264  
265      for lead_data in leads:
266          info = parse_form_lead(lead_data)
267          if not info.get("name") and not info.get("email"):
268              log.warning(f"Skipping lead with no name or email: {lead_data}")
269              continue
270  
271          log.info(f"Processing lead: {info.get('name', 'Unknown')}")
272  
273          # CRM enrichment
274          crm_contact = None
275          crm_contact_id = None
276          crm_company = None
277  
278          if info.get("email"):
279              crm_contact, crm_contact_id = crm_search_contact(info["email"])
280  
281          domain = info.get("website", "")
282          if domain:
283              domain = re.sub(r'^https?://(www\.)?', '', domain).rstrip('/').split('/')[0]
284              if domain:
285                  crm_company = crm_search_company(domain)
286  
287          card = build_enriched_card(info, crm_contact, crm_company, crm_contact_id)
288          results.append(card)
289  
290          log.info(f"✅ Enriched: {info.get('name', 'Unknown')}")
291          time.sleep(0.5)  # Rate limit
292  
293      return results
294  
295  
296  def main():
297      parser = argparse.ArgumentParser(description="Lead Enricher")
298      parser.add_argument("--input", help="JSON file with lead data")
299      parser.add_argument("--output", help="Output JSON file for enriched leads")
300      parser.add_argument("--dry-run", action="store_true", help="Process but don't write output")
301      args = parser.parse_args()
302  
303      LOG_DIR.mkdir(parents=True, exist_ok=True)
304  
305      if args.input:
306          with open(args.input) as f:
307              leads = json.load(f)
308          if not isinstance(leads, list):
309              leads = [leads]
310      else:
311          # Read from stdin
312          leads = json.load(sys.stdin)
313          if not isinstance(leads, list):
314              leads = [leads]
315  
316      log.info(f"Processing {len(leads)} leads")
317  
318      results = process_leads(leads)
319  
320      if args.dry_run:
321          for r in results:
322              print(r.get("text_summary", ""))
323              print("---")
324      elif args.output:
325          with open(args.output, "w") as f:
326              json.dump(results, f, indent=2)
327          log.info(f"📄 Results written to {args.output}")
328      else:
329          print(json.dumps(results, indent=2))
330  
331      log.info(f"Done. Enriched {len(results)} leads.")
332  
333  
334  if __name__ == "__main__":
335      main()