/ sales-pipeline / icp_learning_analyzer.py
icp_learning_analyzer.py
1 #!/usr/bin/env python3 2 """ 3 ICP Learning Analyzer — learns from your prospect approve/reject decisions. 4 5 Reads prospect approval/rejection history from a PostgreSQL database, 6 analyzes patterns by source type (cold, trigger, warm, revival), and 7 outputs recommended ICP filter changes. 8 9 Your ICP evolves from your own data instead of guesswork. 10 11 Analyzes: 12 - Industry patterns (which industries convert vs. get rejected) 13 - Company size sweet spots (employee count ranges that win) 14 - Title patterns (which seniority levels get approved) 15 - Revenue ranges (what deal sizes work) 16 - Approval rates per source type 17 18 Usage: 19 python3 icp_learning_analyzer.py 20 python3 icp_learning_analyzer.py --config data/icp-config.json 21 22 Requires: 23 - DATABASE_URL environment variable (PostgreSQL connection string) 24 - psycopg2-binary package 25 - A prospects table with status, source, and company/contact joins 26 27 Configuration: 28 Create data/icp-config.json with source_type_mapping and min_sample_size. 29 See .env.example and data/icp-config.example.json for templates. 30 """ 31 32 import argparse 33 import json 34 import logging 35 import os 36 import sys 37 from collections import Counter, defaultdict 38 from datetime import datetime, timezone 39 from pathlib import Path 40 41 logging.basicConfig(level=logging.INFO, format="%(asctime)s [ICP-Analyzer] %(message)s") 42 log = logging.getLogger(__name__) 43 44 # ─── Configuration ─────────────────────────────────────────────────────────── 45 BASE_DIR = Path(os.environ.get("BASE_DIR", Path(__file__).resolve().parent)) 46 DATA_DIR = BASE_DIR / "data" 47 OUTPUT_PATH = DATA_DIR / "icp-recommendations.json" 48 49 # Database connection string 50 DATABASE_URL = os.environ.get("DATABASE_URL", "") 51 52 # Default ICP config (override with --config flag) 53 DEFAULT_CONFIG = { 54 # Maps your prospect source names to analysis categories 55 "source_type_mapping": { 56 "cold_outbound": "cold", 57 "trigger_prospector": "trigger", 58 "website_visitor": "warm", 59 "deal_revival": "revival", 60 "referral": "warm", 61 "inbound": "warm", 62 }, 63 # Minimum approved samples before generating recommendations 64 "min_sample_size": 30, 65 } 66 67 68 def load_config(config_path=None): 69 """Load ICP config from file or use defaults.""" 70 if config_path and Path(config_path).exists(): 71 with open(config_path) as f: 72 return json.load(f) 73 default_path = DATA_DIR / "icp-config.json" 74 if default_path.exists(): 75 with open(default_path) as f: 76 return json.load(f) 77 log.info("No config file found, using defaults") 78 return DEFAULT_CONFIG 79 80 81 def fetch_prospects(): 82 """Fetch approved/rejected prospects from database. 83 84 Expected schema: 85 prospects: source, status, signal, conviction_score, company_id, contact_id 86 companies: id, industry, employees, revenue_range 87 contacts: id, title 88 89 Status values: approved, skipped, sent, opened, replied, meeting, won, lost 90 """ 91 try: 92 import psycopg2 93 except ImportError: 94 log.error("psycopg2 not installed. Run: pip install psycopg2-binary") 95 return [] 96 97 if not DATABASE_URL: 98 log.error("DATABASE_URL not set. Set it in your environment or .env file.") 99 return [] 100 101 try: 102 conn = psycopg2.connect(DATABASE_URL) 103 cur = conn.cursor() 104 cur.execute(""" 105 SELECT p.source, p.status, p.signal, p.conviction_score, 106 c.industry, c.employees, c.revenue_range, 107 ct.title 108 FROM prospects p 109 LEFT JOIN companies c ON p.company_id = c.id 110 LEFT JOIN contacts ct ON p.contact_id = ct.id 111 WHERE p.status IN ('approved', 'skipped', 'sent', 'opened', 112 'replied', 'meeting', 'won', 'lost') 113 """) 114 cols = [d[0] for d in cur.description] 115 rows = [dict(zip(cols, row)) for row in cur.fetchall()] 116 conn.close() 117 log.info(f"Fetched {len(rows)} prospect records") 118 return rows 119 except Exception as e: 120 log.error(f"Database query failed: {e}") 121 return [] 122 123 124 def classify_status(status): 125 """Map database status to binary approved/rejected for analysis.""" 126 approved_statuses = {"approved", "sent", "opened", "replied", "meeting", "won"} 127 return "approved" if status in approved_statuses else "rejected" 128 129 130 def parse_revenue(revenue_range): 131 """Parse revenue_range string to midpoint integer. 132 133 Handles formats like: "$10M-$50M", "10M-50M", "$5M - $10M" 134 Returns None if unparseable. 135 """ 136 if not revenue_range: 137 return None 138 cleaned = str(revenue_range).replace("$", "").replace(",", "").strip() 139 parts = (cleaned 140 .replace("M", "000000") 141 .replace("B", "000000000") 142 .replace("K", "000") 143 .split("-")) 144 try: 145 nums = [int(float(p.strip())) for p in parts if p.strip()] 146 return sum(nums) // len(nums) if nums else None 147 except (ValueError, ZeroDivisionError): 148 return None 149 150 151 def analyze_source_group(prospects, min_sample): 152 """Analyze a group of prospects and return filter recommendations. 153 154 Returns recommendations for: 155 - industries: which to target, which to exclude 156 - employees: min/max employee count range 157 - titles: top-performing job titles 158 - revenue: min/max revenue range 159 - confidence: overall approval rate 160 """ 161 approved = [p for p in prospects if classify_status(p["status"]) == "approved"] 162 rejected = [p for p in prospects if classify_status(p["status"]) == "rejected"] 163 164 if len(approved) < min_sample: 165 return { 166 "status": "insufficient_data", 167 "sample_size": len(approved), 168 "min_required": min_sample, 169 "filters": {}, 170 } 171 172 total_approved = len(approved) 173 total_rejected = max(len(rejected), 1) 174 175 # ── Industry Analysis ──────────────────────────────────────────────── 176 approved_industries = Counter(p["industry"] for p in approved if p.get("industry")) 177 rejected_industries = Counter(p["industry"] for p in rejected if p.get("industry")) 178 179 # Industries with >10% of approvals = recommend targeting 180 rec_industries = [ind for ind, cnt in approved_industries.most_common(10) 181 if cnt / total_approved >= 0.10] 182 # Industries with >30% of rejections and <5% of approvals = recommend excluding 183 exclude_industries = [ind for ind, cnt in rejected_industries.most_common() 184 if cnt / total_rejected >= 0.30 185 and approved_industries.get(ind, 0) / total_approved < 0.05] 186 187 # ── Employee Count Analysis ────────────────────────────────────────── 188 approved_emp = sorted([p["employees"] for p in approved if p.get("employees")]) 189 emp_filters = {} 190 if approved_emp: 191 p10 = approved_emp[max(0, len(approved_emp) // 10)] 192 p90 = approved_emp[min(len(approved_emp) - 1, len(approved_emp) * 9 // 10)] 193 emp_filters["min_employees"] = p10 194 emp_filters["max_employees"] = p90 195 196 # ── Title Analysis ─────────────────────────────────────────────────── 197 approved_titles = Counter(p["title"] for p in approved if p.get("title")) 198 top_titles = [t for t, _ in approved_titles.most_common(8)] 199 200 # ── Revenue Analysis ───────────────────────────────────────────────── 201 approved_rev = [parse_revenue(p.get("revenue_range")) for p in approved] 202 approved_rev = sorted([r for r in approved_rev if r is not None]) 203 rev_filters = {} 204 if approved_rev: 205 rev_filters["revenue_min"] = approved_rev[max(0, len(approved_rev) // 10)] 206 rev_filters["revenue_max"] = approved_rev[min(len(approved_rev) - 1, 207 len(approved_rev) * 9 // 10)] 208 209 # ── Compile Filters ────────────────────────────────────────────────── 210 approval_rate = total_approved / (total_approved + len(rejected)) 211 filters = {**emp_filters, **rev_filters} 212 if rec_industries: 213 filters["industries"] = rec_industries 214 if exclude_industries: 215 filters["exclude_industries"] = exclude_industries 216 if top_titles: 217 filters["titles"] = top_titles 218 219 return { 220 "status": "ready", 221 "filters": filters, 222 "confidence": round(approval_rate, 3), 223 "sample_size": total_approved, 224 "rejected_count": len(rejected), 225 "approval_rate": round(approval_rate, 3), 226 } 227 228 229 # ─── Main ──────────────────────────────────────────────────────────────────── 230 231 def main(): 232 parser = argparse.ArgumentParser(description="ICP Learning Analyzer") 233 parser.add_argument("--config", help="Path to icp-config.json") 234 args = parser.parse_args() 235 236 config = load_config(args.config) 237 source_mapping = config.get("source_type_mapping", DEFAULT_CONFIG["source_type_mapping"]) 238 min_sample = config.get("min_sample_size", DEFAULT_CONFIG["min_sample_size"]) 239 240 prospects = fetch_prospects() 241 242 # Group by mapped source type 243 grouped = defaultdict(list) 244 for p in prospects: 245 mapped = source_mapping.get(p.get("source", ""), "other") 246 grouped[mapped].append(p) 247 248 recommendations = {} 249 for source_type in ["cold", "trigger", "warm", "revival"]: 250 group = grouped.get(source_type, []) 251 log.info(f"[{source_type}] {len(group)} total prospects") 252 recommendations[source_type] = analyze_source_group(group, min_sample) 253 254 output = { 255 "generated_at": datetime.now(timezone.utc).isoformat(), 256 "status": "complete" if prospects else "no_data", 257 "total_prospects_analyzed": len(prospects), 258 "recommendations": recommendations, 259 } 260 261 DATA_DIR.mkdir(parents=True, exist_ok=True) 262 with open(OUTPUT_PATH, "w") as f: 263 json.dump(output, f, indent=2) 264 265 log.info(f"Wrote recommendations to {OUTPUT_PATH}") 266 267 # Summary 268 print(f"\n📊 ICP Learning Analyzer Results") 269 print(f" Total prospects analyzed: {len(prospects)}") 270 print(f" {'─'*40}") 271 for src, rec in recommendations.items(): 272 status = rec.get("status", "unknown") 273 sample = rec.get("sample_size", 0) 274 rate = rec.get("approval_rate", 0) 275 print(f" {src:10s}: {status:20s} (n={sample}, approval={rate:.0%})") 276 if rec.get("filters"): 277 f = rec["filters"] 278 if f.get("industries"): 279 print(f" → Target: {', '.join(f['industries'][:5])}") 280 if f.get("exclude_industries"): 281 print(f" → Exclude: {', '.join(f['exclude_industries'][:3])}") 282 if f.get("min_employees"): 283 print(f" → Employees: {f['min_employees']}-{f.get('max_employees', '?')}") 284 285 286 if __name__ == "__main__": 287 main()