/ sales-pipeline / icp_learning_analyzer.py
icp_learning_analyzer.py
  1  #!/usr/bin/env python3
  2  """
  3  ICP Learning Analyzer — learns from your prospect approve/reject decisions.
  4  
  5  Reads prospect approval/rejection history from a PostgreSQL database,
  6  analyzes patterns by source type (cold, trigger, warm, revival), and
  7  outputs recommended ICP filter changes.
  8  
  9  Your ICP evolves from your own data instead of guesswork.
 10  
 11  Analyzes:
 12    - Industry patterns (which industries convert vs. get rejected)
 13    - Company size sweet spots (employee count ranges that win)
 14    - Title patterns (which seniority levels get approved)
 15    - Revenue ranges (what deal sizes work)
 16    - Approval rates per source type
 17  
 18  Usage:
 19      python3 icp_learning_analyzer.py
 20      python3 icp_learning_analyzer.py --config data/icp-config.json
 21  
 22  Requires:
 23      - DATABASE_URL environment variable (PostgreSQL connection string)
 24      - psycopg2-binary package
 25      - A prospects table with status, source, and company/contact joins
 26  
 27  Configuration:
 28      Create data/icp-config.json with source_type_mapping and min_sample_size.
 29      See .env.example and data/icp-config.example.json for templates.
 30  """
 31  
 32  import argparse
 33  import json
 34  import logging
 35  import os
 36  import sys
 37  from collections import Counter, defaultdict
 38  from datetime import datetime, timezone
 39  from pathlib import Path
 40  
 41  logging.basicConfig(level=logging.INFO, format="%(asctime)s [ICP-Analyzer] %(message)s")
 42  log = logging.getLogger(__name__)
 43  
 44  # ─── Configuration ───────────────────────────────────────────────────────────
 45  BASE_DIR = Path(os.environ.get("BASE_DIR", Path(__file__).resolve().parent))
 46  DATA_DIR = BASE_DIR / "data"
 47  OUTPUT_PATH = DATA_DIR / "icp-recommendations.json"
 48  
 49  # Database connection string
 50  DATABASE_URL = os.environ.get("DATABASE_URL", "")
 51  
 52  # Default ICP config (override with --config flag)
 53  DEFAULT_CONFIG = {
 54      # Maps your prospect source names to analysis categories
 55      "source_type_mapping": {
 56          "cold_outbound": "cold",
 57          "trigger_prospector": "trigger",
 58          "website_visitor": "warm",
 59          "deal_revival": "revival",
 60          "referral": "warm",
 61          "inbound": "warm",
 62      },
 63      # Minimum approved samples before generating recommendations
 64      "min_sample_size": 30,
 65  }
 66  
 67  
 68  def load_config(config_path=None):
 69      """Load ICP config from file or use defaults."""
 70      if config_path and Path(config_path).exists():
 71          with open(config_path) as f:
 72              return json.load(f)
 73      default_path = DATA_DIR / "icp-config.json"
 74      if default_path.exists():
 75          with open(default_path) as f:
 76              return json.load(f)
 77      log.info("No config file found, using defaults")
 78      return DEFAULT_CONFIG
 79  
 80  
 81  def fetch_prospects():
 82      """Fetch approved/rejected prospects from database.
 83  
 84      Expected schema:
 85          prospects: source, status, signal, conviction_score, company_id, contact_id
 86          companies: id, industry, employees, revenue_range
 87          contacts:  id, title
 88  
 89      Status values: approved, skipped, sent, opened, replied, meeting, won, lost
 90      """
 91      try:
 92          import psycopg2
 93      except ImportError:
 94          log.error("psycopg2 not installed. Run: pip install psycopg2-binary")
 95          return []
 96  
 97      if not DATABASE_URL:
 98          log.error("DATABASE_URL not set. Set it in your environment or .env file.")
 99          return []
100  
101      try:
102          conn = psycopg2.connect(DATABASE_URL)
103          cur = conn.cursor()
104          cur.execute("""
105              SELECT p.source, p.status, p.signal, p.conviction_score,
106                     c.industry, c.employees, c.revenue_range,
107                     ct.title
108              FROM prospects p
109              LEFT JOIN companies c ON p.company_id = c.id
110              LEFT JOIN contacts ct ON p.contact_id = ct.id
111              WHERE p.status IN ('approved', 'skipped', 'sent', 'opened',
112                                 'replied', 'meeting', 'won', 'lost')
113          """)
114          cols = [d[0] for d in cur.description]
115          rows = [dict(zip(cols, row)) for row in cur.fetchall()]
116          conn.close()
117          log.info(f"Fetched {len(rows)} prospect records")
118          return rows
119      except Exception as e:
120          log.error(f"Database query failed: {e}")
121          return []
122  
123  
124  def classify_status(status):
125      """Map database status to binary approved/rejected for analysis."""
126      approved_statuses = {"approved", "sent", "opened", "replied", "meeting", "won"}
127      return "approved" if status in approved_statuses else "rejected"
128  
129  
130  def parse_revenue(revenue_range):
131      """Parse revenue_range string to midpoint integer.
132  
133      Handles formats like: "$10M-$50M", "10M-50M", "$5M - $10M"
134      Returns None if unparseable.
135      """
136      if not revenue_range:
137          return None
138      cleaned = str(revenue_range).replace("$", "").replace(",", "").strip()
139      parts = (cleaned
140               .replace("M", "000000")
141               .replace("B", "000000000")
142               .replace("K", "000")
143               .split("-"))
144      try:
145          nums = [int(float(p.strip())) for p in parts if p.strip()]
146          return sum(nums) // len(nums) if nums else None
147      except (ValueError, ZeroDivisionError):
148          return None
149  
150  
151  def analyze_source_group(prospects, min_sample):
152      """Analyze a group of prospects and return filter recommendations.
153  
154      Returns recommendations for:
155        - industries: which to target, which to exclude
156        - employees: min/max employee count range
157        - titles: top-performing job titles
158        - revenue: min/max revenue range
159        - confidence: overall approval rate
160      """
161      approved = [p for p in prospects if classify_status(p["status"]) == "approved"]
162      rejected = [p for p in prospects if classify_status(p["status"]) == "rejected"]
163  
164      if len(approved) < min_sample:
165          return {
166              "status": "insufficient_data",
167              "sample_size": len(approved),
168              "min_required": min_sample,
169              "filters": {},
170          }
171  
172      total_approved = len(approved)
173      total_rejected = max(len(rejected), 1)
174  
175      # ── Industry Analysis ────────────────────────────────────────────────
176      approved_industries = Counter(p["industry"] for p in approved if p.get("industry"))
177      rejected_industries = Counter(p["industry"] for p in rejected if p.get("industry"))
178  
179      # Industries with >10% of approvals = recommend targeting
180      rec_industries = [ind for ind, cnt in approved_industries.most_common(10)
181                        if cnt / total_approved >= 0.10]
182      # Industries with >30% of rejections and <5% of approvals = recommend excluding
183      exclude_industries = [ind for ind, cnt in rejected_industries.most_common()
184                            if cnt / total_rejected >= 0.30
185                            and approved_industries.get(ind, 0) / total_approved < 0.05]
186  
187      # ── Employee Count Analysis ──────────────────────────────────────────
188      approved_emp = sorted([p["employees"] for p in approved if p.get("employees")])
189      emp_filters = {}
190      if approved_emp:
191          p10 = approved_emp[max(0, len(approved_emp) // 10)]
192          p90 = approved_emp[min(len(approved_emp) - 1, len(approved_emp) * 9 // 10)]
193          emp_filters["min_employees"] = p10
194          emp_filters["max_employees"] = p90
195  
196      # ── Title Analysis ───────────────────────────────────────────────────
197      approved_titles = Counter(p["title"] for p in approved if p.get("title"))
198      top_titles = [t for t, _ in approved_titles.most_common(8)]
199  
200      # ── Revenue Analysis ─────────────────────────────────────────────────
201      approved_rev = [parse_revenue(p.get("revenue_range")) for p in approved]
202      approved_rev = sorted([r for r in approved_rev if r is not None])
203      rev_filters = {}
204      if approved_rev:
205          rev_filters["revenue_min"] = approved_rev[max(0, len(approved_rev) // 10)]
206          rev_filters["revenue_max"] = approved_rev[min(len(approved_rev) - 1,
207                                                         len(approved_rev) * 9 // 10)]
208  
209      # ── Compile Filters ──────────────────────────────────────────────────
210      approval_rate = total_approved / (total_approved + len(rejected))
211      filters = {**emp_filters, **rev_filters}
212      if rec_industries:
213          filters["industries"] = rec_industries
214      if exclude_industries:
215          filters["exclude_industries"] = exclude_industries
216      if top_titles:
217          filters["titles"] = top_titles
218  
219      return {
220          "status": "ready",
221          "filters": filters,
222          "confidence": round(approval_rate, 3),
223          "sample_size": total_approved,
224          "rejected_count": len(rejected),
225          "approval_rate": round(approval_rate, 3),
226      }
227  
228  
229  # ─── Main ────────────────────────────────────────────────────────────────────
230  
231  def main():
232      parser = argparse.ArgumentParser(description="ICP Learning Analyzer")
233      parser.add_argument("--config", help="Path to icp-config.json")
234      args = parser.parse_args()
235  
236      config = load_config(args.config)
237      source_mapping = config.get("source_type_mapping", DEFAULT_CONFIG["source_type_mapping"])
238      min_sample = config.get("min_sample_size", DEFAULT_CONFIG["min_sample_size"])
239  
240      prospects = fetch_prospects()
241  
242      # Group by mapped source type
243      grouped = defaultdict(list)
244      for p in prospects:
245          mapped = source_mapping.get(p.get("source", ""), "other")
246          grouped[mapped].append(p)
247  
248      recommendations = {}
249      for source_type in ["cold", "trigger", "warm", "revival"]:
250          group = grouped.get(source_type, [])
251          log.info(f"[{source_type}] {len(group)} total prospects")
252          recommendations[source_type] = analyze_source_group(group, min_sample)
253  
254      output = {
255          "generated_at": datetime.now(timezone.utc).isoformat(),
256          "status": "complete" if prospects else "no_data",
257          "total_prospects_analyzed": len(prospects),
258          "recommendations": recommendations,
259      }
260  
261      DATA_DIR.mkdir(parents=True, exist_ok=True)
262      with open(OUTPUT_PATH, "w") as f:
263          json.dump(output, f, indent=2)
264  
265      log.info(f"Wrote recommendations to {OUTPUT_PATH}")
266  
267      # Summary
268      print(f"\n📊 ICP Learning Analyzer Results")
269      print(f"   Total prospects analyzed: {len(prospects)}")
270      print(f"   {'─'*40}")
271      for src, rec in recommendations.items():
272          status = rec.get("status", "unknown")
273          sample = rec.get("sample_size", 0)
274          rate = rec.get("approval_rate", 0)
275          print(f"   {src:10s}: {status:20s} (n={sample}, approval={rate:.0%})")
276          if rec.get("filters"):
277              f = rec["filters"]
278              if f.get("industries"):
279                  print(f"              → Target: {', '.join(f['industries'][:5])}")
280              if f.get("exclude_industries"):
281                  print(f"              → Exclude: {', '.join(f['exclude_industries'][:3])}")
282              if f.get("min_employees"):
283                  print(f"              → Employees: {f['min_employees']}-{f.get('max_employees', '?')}")
284  
285  
286  if __name__ == "__main__":
287      main()