/ revenue-intelligence / revenue_attribution.py
revenue_attribution.py
  1  #!/usr/bin/env python3
  2  """
  3  Revenue Attribution Mapper
  4  
  5  Connects content pieces to pipeline and closed deals. Proves content ROI.
  6  Maps blog posts, videos, podcasts to first-touch and multi-touch attribution
  7  using GA4 + HubSpot deal data.
  8  
  9  Usage:
 10      python revenue_attribution.py --report
 11      python revenue_attribution.py --report --model linear
 12      python revenue_attribution.py --cpa --costs content_costs.json
 13      python revenue_attribution.py --gaps
 14  """
 15  
 16  import argparse
 17  import json
 18  import os
 19  import sys
 20  from collections import defaultdict
 21  from datetime import datetime, timedelta
 22  from pathlib import Path
 23  from typing import Optional
 24  
 25  # ---------------------------------------------------------------------------
 26  # API Configuration
 27  # ---------------------------------------------------------------------------
 28  
 29  # HubSpot: Set HUBSPOT_API_KEY to your private app token
 30  # Required scopes: crm.objects.deals.read, crm.objects.contacts.read
 31  HUBSPOT_API_KEY = os.environ.get("HUBSPOT_API_KEY", "")
 32  HUBSPOT_BASE_URL = "https://api.hubapi.com"
 33  
 34  # GA4: Set GA4_PROPERTY_ID and GA4_CREDENTIALS_JSON
 35  # GA4_CREDENTIALS_JSON should point to a service account JSON file
 36  # Required: Google Analytics Data API (v1beta) enabled
 37  GA4_PROPERTY_ID = os.environ.get("GA4_PROPERTY_ID", "")
 38  GA4_CREDENTIALS_JSON = os.environ.get("GA4_CREDENTIALS_JSON", "")
 39  
 40  OUTPUT_DIR = os.environ.get("OUTPUT_DIR", "./output")
 41  
 42  # ---------------------------------------------------------------------------
 43  # Content type classification
 44  # ---------------------------------------------------------------------------
 45  
 46  CONTENT_TYPE_PATTERNS = {
 47      "blog": ["/blog/", "/posts/", "/article/", "/insights/"],
 48      "video": ["/video/", "/youtube/", "/watch/", "/webinar-recording/"],
 49      "podcast": ["/podcast/", "/episode/", "/listen/"],
 50      "webinar": ["/webinar/", "/live/", "/register/"],
 51      "case_study": ["/case-study/", "/case-studies/", "/success-story/", "/customer-story/"],
 52      "landing_page": ["/lp/", "/landing/", "/offer/", "/download/"],
 53      "tool": ["/tool/", "/calculator/", "/grader/", "/analyzer/"],
 54      "comparison": ["/vs/", "/compare/", "/alternative/", "/versus/"],
 55  }
 56  
 57  # Funnel stage classification
 58  FUNNEL_STAGE_PATTERNS = {
 59      "awareness": ["/blog/", "/posts/", "/article/", "/podcast/", "/video/"],
 60      "consideration": ["/case-study/", "/webinar/", "/guide/", "/comparison/", "/vs/"],
 61      "decision": ["/pricing/", "/demo/", "/contact/", "/trial/", "/start/", "/lp/"],
 62  }
 63  
 64  
 65  def classify_content_type(url: str) -> str:
 66      """Classify a URL into a content type."""
 67      url_lower = url.lower()
 68      for content_type, patterns in CONTENT_TYPE_PATTERNS.items():
 69          if any(p in url_lower for p in patterns):
 70              return content_type
 71      return "other"
 72  
 73  
 74  def classify_funnel_stage(url: str) -> str:
 75      """Classify a URL into a funnel stage."""
 76      url_lower = url.lower()
 77      for stage, patterns in FUNNEL_STAGE_PATTERNS.items():
 78          if any(p in url_lower for p in patterns):
 79              return stage
 80      return "unknown"
 81  
 82  
 83  # ---------------------------------------------------------------------------
 84  # GA4 Data Client
 85  # ---------------------------------------------------------------------------
 86  
 87  def fetch_ga4_page_data(start_date: str, end_date: str) -> list[dict]:
 88      """
 89      Fetch page-level session and conversion data from GA4.
 90  
 91      Returns list of dicts:
 92      [{"page_path": "/blog/foo", "sessions": 1234, "conversions": 5, "users": 900}]
 93  
 94      NOTE: Requires google-analytics-data library.
 95      pip install google-analytics-data
 96  
 97      Setup:
 98      1. Create a service account in Google Cloud Console
 99      2. Enable the Google Analytics Data API
100      3. Add the service account email as a viewer on your GA4 property
101      4. Download the JSON key file and set GA4_CREDENTIALS_JSON env var
102      """
103      if not GA4_PROPERTY_ID or not GA4_CREDENTIALS_JSON:
104          print("WARNING: GA4_PROPERTY_ID or GA4_CREDENTIALS_JSON not set. Using sample data.", file=sys.stderr)
105          return _sample_ga4_data()
106  
107      try:
108          from google.analytics.data_v1beta import BetaAnalyticsDataClient
109          from google.analytics.data_v1beta.types import (
110              DateRange,
111              Dimension,
112              Metric,
113              RunReportRequest,
114          )
115  
116          client = BetaAnalyticsDataClient.from_service_account_json(GA4_CREDENTIALS_JSON)
117  
118          request = RunReportRequest(
119              property=f"properties/{GA4_PROPERTY_ID}",
120              dimensions=[
121                  Dimension(name="pagePath"),
122                  Dimension(name="sessionDefaultChannelGroup"),
123              ],
124              metrics=[
125                  Metric(name="sessions"),
126                  Metric(name="totalUsers"),
127                  Metric(name="conversions"),
128              ],
129              date_ranges=[DateRange(start_date=start_date, end_date=end_date)],
130          )
131  
132          response = client.run_report(request)
133  
134          results = []
135          for row in response.rows:
136              results.append({
137                  "page_path": row.dimension_values[0].value,
138                  "channel": row.dimension_values[1].value,
139                  "sessions": int(row.metric_values[0].value),
140                  "users": int(row.metric_values[1].value),
141                  "conversions": int(row.metric_values[2].value),
142              })
143  
144          return results
145  
146      except ImportError:
147          print("WARNING: google-analytics-data not installed. Using sample data.", file=sys.stderr)
148          return _sample_ga4_data()
149      except Exception as e:
150          print(f"WARNING: GA4 API error: {e}. Using sample data.", file=sys.stderr)
151          return _sample_ga4_data()
152  
153  
154  def _sample_ga4_data() -> list[dict]:
155      """Sample GA4 data for testing/demo purposes."""
156      return [
157          {"page_path": "/blog/seo-strategy-2025", "channel": "Organic Search", "sessions": 4200, "users": 3800, "conversions": 12},
158          {"page_path": "/blog/content-marketing-roi", "channel": "Organic Search", "sessions": 3100, "users": 2900, "conversions": 8},
159          {"page_path": "/blog/ai-marketing-tools", "channel": "Organic Search", "sessions": 5600, "users": 5100, "conversions": 15},
160          {"page_path": "/case-study/saas-company-3x-pipeline", "channel": "Direct", "sessions": 890, "users": 820, "conversions": 9},
161          {"page_path": "/case-study/ecommerce-seo-growth", "channel": "Organic Search", "sessions": 1200, "users": 1100, "conversions": 7},
162          {"page_path": "/podcast/episode-42-growth-loops", "channel": "Social", "sessions": 2300, "users": 2100, "conversions": 3},
163          {"page_path": "/webinar/ai-ops-for-marketers", "channel": "Email", "sessions": 650, "users": 600, "conversions": 11},
164          {"page_path": "/video/youtube-seo-masterclass", "channel": "Social", "sessions": 8900, "users": 8200, "conversions": 6},
165          {"page_path": "/blog/paid-media-benchmarks", "channel": "Organic Search", "sessions": 2700, "users": 2500, "conversions": 4},
166          {"page_path": "/lp/free-seo-audit", "channel": "Paid Search", "sessions": 1800, "users": 1700, "conversions": 22},
167          {"page_path": "/pricing", "channel": "Direct", "sessions": 3200, "users": 2900, "conversions": 18},
168          {"page_path": "/blog/b2b-lead-generation", "channel": "Organic Search", "sessions": 3400, "users": 3100, "conversions": 5},
169          {"page_path": "/vs/hubspot-alternative", "channel": "Organic Search", "sessions": 1500, "users": 1400, "conversions": 10},
170      ]
171  
172  
173  # ---------------------------------------------------------------------------
174  # HubSpot Deal Data
175  # ---------------------------------------------------------------------------
176  
177  def fetch_hubspot_deals(start_date: str, end_date: str) -> list[dict]:
178      """
179      Fetch closed-won deals from HubSpot with touchpoint history.
180  
181      Returns list of dicts:
182      [{
183          "deal_id": "123",
184          "deal_name": "Acme Corp",
185          "amount": 50000,
186          "close_date": "2025-03-15",
187          "touchpoints": [
188              {"url": "/blog/seo-strategy", "timestamp": "2025-01-10", "type": "first_touch"},
189              {"url": "/case-study/saas", "timestamp": "2025-02-20", "type": "page_view"},
190              {"url": "/pricing", "timestamp": "2025-03-01", "type": "page_view"},
191          ]
192      }]
193  
194      NOTE: Requires requests library.
195      Touchpoints come from HubSpot's contact timeline / page views.
196      You need a private app with crm.objects.deals.read + crm.objects.contacts.read scopes.
197      """
198      if not HUBSPOT_API_KEY:
199          print("WARNING: HUBSPOT_API_KEY not set. Using sample data.", file=sys.stderr)
200          return _sample_hubspot_deals()
201  
202      try:
203          import requests
204  
205          headers = {"Authorization": f"Bearer {HUBSPOT_API_KEY}"}
206  
207          # Fetch closed-won deals in date range
208          # Using the search API for better filtering
209          search_body = {
210              "filterGroups": [{
211                  "filters": [
212                      {"propertyName": "dealstage", "operator": "EQ", "value": "closedwon"},
213                      {"propertyName": "closedate", "operator": "GTE", "value": f"{start_date}T00:00:00Z"},
214                      {"propertyName": "closedate", "operator": "LTE", "value": f"{end_date}T23:59:59Z"},
215                  ]
216              }],
217              "properties": ["dealname", "amount", "closedate", "dealstage"],
218              "limit": 100,
219          }
220  
221          resp = requests.post(
222              f"{HUBSPOT_BASE_URL}/crm/v3/objects/deals/search",
223              headers=headers,
224              json=search_body,
225          )
226          resp.raise_for_status()
227          deals_data = resp.json().get("results", [])
228  
229          deals = []
230          for deal in deals_data:
231              props = deal.get("properties", {})
232              deal_id = deal["id"]
233  
234              # Get associated contacts
235              assoc_resp = requests.get(
236                  f"{HUBSPOT_BASE_URL}/crm/v3/objects/deals/{deal_id}/associations/contacts",
237                  headers=headers,
238              )
239              contact_ids = [r["id"] for r in assoc_resp.json().get("results", [])] if assoc_resp.ok else []
240  
241              # Get page views for each contact (from engagement timeline)
242              touchpoints = []
243              for cid in contact_ids[:5]:  # Limit to avoid rate limits
244                  # Fetch contact's page views from the timeline API
245                  timeline_resp = requests.get(
246                      f"{HUBSPOT_BASE_URL}/crm/v3/objects/contacts/{cid}/engagements",
247                      headers=headers,
248                      params={"limit": 50},
249                  )
250                  if timeline_resp.ok:
251                      for eng in timeline_resp.json().get("results", []):
252                          # Extract page view URLs from engagement metadata
253                          metadata = eng.get("properties", {})
254                          if metadata.get("hs_page_url"):
255                              touchpoints.append({
256                                  "url": metadata["hs_page_url"],
257                                  "timestamp": metadata.get("hs_timestamp", ""),
258                                  "type": "page_view",
259                              })
260  
261              # Mark first and last touch
262              if touchpoints:
263                  touchpoints.sort(key=lambda t: t["timestamp"])
264                  touchpoints[0]["type"] = "first_touch"
265                  touchpoints[-1]["type"] = "last_touch"
266  
267              deals.append({
268                  "deal_id": deal_id,
269                  "deal_name": props.get("dealname", "Unknown"),
270                  "amount": float(props.get("amount", 0) or 0),
271                  "close_date": props.get("closedate", "")[:10],
272                  "touchpoints": touchpoints,
273              })
274  
275          return deals
276  
277      except ImportError:
278          print("WARNING: requests not installed. Using sample data.", file=sys.stderr)
279          return _sample_hubspot_deals()
280      except Exception as e:
281          print(f"WARNING: HubSpot API error: {e}. Using sample data.", file=sys.stderr)
282          return _sample_hubspot_deals()
283  
284  
285  def _sample_hubspot_deals() -> list[dict]:
286      """Sample HubSpot deal data for testing/demo."""
287      return [
288          {
289              "deal_id": "deal_001",
290              "deal_name": "Acme Corp - SEO Retainer",
291              "amount": 120000,
292              "close_date": "2025-03-15",
293              "touchpoints": [
294                  {"url": "/blog/seo-strategy-2025", "timestamp": "2025-01-05", "type": "first_touch"},
295                  {"url": "/blog/content-marketing-roi", "timestamp": "2025-01-22", "type": "page_view"},
296                  {"url": "/case-study/saas-company-3x-pipeline", "timestamp": "2025-02-10", "type": "page_view"},
297                  {"url": "/pricing", "timestamp": "2025-02-28", "type": "page_view"},
298                  {"url": "/lp/free-seo-audit", "timestamp": "2025-03-05", "type": "last_touch"},
299              ],
300          },
301          {
302              "deal_id": "deal_002",
303              "deal_name": "TechStart Inc - Full Service",
304              "amount": 240000,
305              "close_date": "2025-02-20",
306              "touchpoints": [
307                  {"url": "/blog/ai-marketing-tools", "timestamp": "2024-12-01", "type": "first_touch"},
308                  {"url": "/podcast/episode-42-growth-loops", "timestamp": "2024-12-15", "type": "page_view"},
309                  {"url": "/webinar/ai-ops-for-marketers", "timestamp": "2025-01-10", "type": "page_view"},
310                  {"url": "/vs/hubspot-alternative", "timestamp": "2025-01-25", "type": "page_view"},
311                  {"url": "/pricing", "timestamp": "2025-02-10", "type": "last_touch"},
312              ],
313          },
314          {
315              "deal_id": "deal_003",
316              "deal_name": "GrowthCo - Content Marketing",
317              "amount": 84000,
318              "close_date": "2025-03-01",
319              "touchpoints": [
320                  {"url": "/blog/content-marketing-roi", "timestamp": "2025-01-15", "type": "first_touch"},
321                  {"url": "/case-study/ecommerce-seo-growth", "timestamp": "2025-02-01", "type": "page_view"},
322                  {"url": "/pricing", "timestamp": "2025-02-20", "type": "last_touch"},
323              ],
324          },
325          {
326              "deal_id": "deal_004",
327              "deal_name": "SaaS Corp - Paid Media",
328              "amount": 180000,
329              "close_date": "2025-01-30",
330              "touchpoints": [
331                  {"url": "/video/youtube-seo-masterclass", "timestamp": "2024-11-15", "type": "first_touch"},
332                  {"url": "/blog/paid-media-benchmarks", "timestamp": "2024-12-10", "type": "page_view"},
333                  {"url": "/blog/b2b-lead-generation", "timestamp": "2025-01-05", "type": "page_view"},
334                  {"url": "/lp/free-seo-audit", "timestamp": "2025-01-20", "type": "last_touch"},
335              ],
336          },
337          {
338              "deal_id": "deal_005",
339              "deal_name": "Enterprise Ltd - SEO + Content",
340              "amount": 360000,
341              "close_date": "2025-03-20",
342              "touchpoints": [
343                  {"url": "/blog/seo-strategy-2025", "timestamp": "2024-12-20", "type": "first_touch"},
344                  {"url": "/blog/ai-marketing-tools", "timestamp": "2025-01-08", "type": "page_view"},
345                  {"url": "/case-study/saas-company-3x-pipeline", "timestamp": "2025-01-25", "type": "page_view"},
346                  {"url": "/webinar/ai-ops-for-marketers", "timestamp": "2025-02-05", "type": "page_view"},
347                  {"url": "/pricing", "timestamp": "2025-03-01", "type": "page_view"},
348                  {"url": "/lp/free-seo-audit", "timestamp": "2025-03-10", "type": "last_touch"},
349              ],
350          },
351      ]
352  
353  
354  # ---------------------------------------------------------------------------
355  # Attribution Models
356  # ---------------------------------------------------------------------------
357  
358  def first_touch_attribution(deals: list[dict]) -> dict[str, float]:
359      """100% credit to the first touchpoint."""
360      attribution = defaultdict(float)
361      for deal in deals:
362          tps = deal.get("touchpoints", [])
363          if tps:
364              first = tps[0]
365              attribution[first["url"]] += deal["amount"]
366      return dict(attribution)
367  
368  
369  def last_touch_attribution(deals: list[dict]) -> dict[str, float]:
370      """100% credit to the last touchpoint."""
371      attribution = defaultdict(float)
372      for deal in deals:
373          tps = deal.get("touchpoints", [])
374          if tps:
375              last = tps[-1]
376              attribution[last["url"]] += deal["amount"]
377      return dict(attribution)
378  
379  
380  def linear_attribution(deals: list[dict]) -> dict[str, float]:
381      """Equal credit to all touchpoints."""
382      attribution = defaultdict(float)
383      for deal in deals:
384          tps = deal.get("touchpoints", [])
385          if tps:
386              credit = deal["amount"] / len(tps)
387              for tp in tps:
388                  attribution[tp["url"]] += credit
389      return dict(attribution)
390  
391  
392  def time_decay_attribution(deals: list[dict], half_life_days: int = 7) -> dict[str, float]:
393      """
394      More credit to touchpoints closer to close date.
395      Uses exponential decay with configurable half-life.
396      """
397      import math
398  
399      attribution = defaultdict(float)
400      for deal in deals:
401          tps = deal.get("touchpoints", [])
402          close_date = deal.get("close_date", "")
403          if not tps or not close_date:
404              continue
405  
406          try:
407              close_dt = datetime.strptime(close_date, "%Y-%m-%d")
408          except ValueError:
409              continue
410  
411          # Calculate decay weights
412          weights = []
413          for tp in tps:
414              try:
415                  tp_dt = datetime.strptime(tp["timestamp"][:10], "%Y-%m-%d")
416                  days_before = (close_dt - tp_dt).days
417                  weight = math.pow(0.5, days_before / half_life_days)
418                  weights.append(weight)
419              except (ValueError, KeyError):
420                  weights.append(0.1)
421  
422          total_weight = sum(weights) or 1
423          for tp, weight in zip(tps, weights):
424              attribution[tp["url"]] += deal["amount"] * (weight / total_weight)
425  
426      return dict(attribution)
427  
428  
429  ATTRIBUTION_MODELS = {
430      "first-touch": first_touch_attribution,
431      "last-touch": last_touch_attribution,
432      "linear": linear_attribution,
433      "time-decay": time_decay_attribution,
434  }
435  
436  
437  # ---------------------------------------------------------------------------
438  # Report Generation
439  # ---------------------------------------------------------------------------
440  
441  def generate_attribution_report(
442      deals: list[dict],
443      ga4_data: list[dict],
444      model: str = "linear",
445  ) -> dict:
446      """Generate a full attribution report."""
447      # Run attribution
448      model_func = ATTRIBUTION_MODELS.get(model, linear_attribution)
449      attribution = model_func(deals)
450  
451      # Enrich with GA4 data
452      ga4_by_path = {}
453      for row in ga4_data:
454          path = row["page_path"]
455          if path not in ga4_by_path:
456              ga4_by_path[path] = {"sessions": 0, "users": 0, "conversions": 0}
457          ga4_by_path[path]["sessions"] += row["sessions"]
458          ga4_by_path[path]["users"] += row["users"]
459          ga4_by_path[path]["conversions"] += row["conversions"]
460  
461      # Build content performance table
462      content_performance = []
463      for url, revenue in sorted(attribution.items(), key=lambda x: -x[1]):
464          ga4 = ga4_by_path.get(url, {"sessions": 0, "users": 0, "conversions": 0})
465          content_type = classify_content_type(url)
466          funnel_stage = classify_funnel_stage(url)
467  
468          content_performance.append({
469              "url": url,
470              "content_type": content_type,
471              "funnel_stage": funnel_stage,
472              "attributed_revenue": round(revenue, 2),
473              "sessions": ga4["sessions"],
474              "users": ga4["users"],
475              "conversions": ga4["conversions"],
476              "revenue_per_session": round(revenue / ga4["sessions"], 2) if ga4["sessions"] else 0,
477              "deals_touched": sum(
478                  1 for d in deals if any(tp["url"] == url for tp in d.get("touchpoints", []))
479              ),
480          })
481  
482      # Aggregate by content type
483      by_type = defaultdict(lambda: {"revenue": 0, "sessions": 0, "conversions": 0, "pieces": 0})
484      for cp in content_performance:
485          t = cp["content_type"]
486          by_type[t]["revenue"] += cp["attributed_revenue"]
487          by_type[t]["sessions"] += cp["sessions"]
488          by_type[t]["conversions"] += cp["conversions"]
489          by_type[t]["pieces"] += 1
490  
491      type_summary = []
492      for content_type, stats in sorted(by_type.items(), key=lambda x: -x[1]["revenue"]):
493          type_summary.append({
494              "content_type": content_type,
495              "total_revenue": round(stats["revenue"], 2),
496              "total_sessions": stats["sessions"],
497              "total_conversions": stats["conversions"],
498              "piece_count": stats["pieces"],
499              "avg_revenue_per_piece": round(stats["revenue"] / stats["pieces"], 2) if stats["pieces"] else 0,
500          })
501  
502      # Summary
503      total_revenue = sum(d["amount"] for d in deals)
504      total_deals = len(deals)
505  
506      report = {
507          "generated_at": datetime.utcnow().isoformat() + "Z",
508          "attribution_model": model,
509          "summary": {
510              "total_revenue": total_revenue,
511              "total_deals": total_deals,
512              "avg_deal_size": round(total_revenue / total_deals, 2) if total_deals else 0,
513              "content_pieces_with_attribution": len(content_performance),
514              "avg_touchpoints_per_deal": round(
515                  sum(len(d.get("touchpoints", [])) for d in deals) / total_deals, 1
516              ) if total_deals else 0,
517          },
518          "top_content": content_performance[:20],
519          "by_content_type": type_summary,
520      }
521  
522      return report
523  
524  
525  def calculate_cpa(report: dict, costs: dict) -> dict:
526      """
527      Calculate cost-per-acquisition by content type.
528  
529      costs should be: {"blog": 15000, "video": 8000, "podcast": 3000, ...}
530      representing total spend on each content type in the period.
531      """
532      cpa_report = []
533      for type_data in report["by_content_type"]:
534          ct = type_data["content_type"]
535          cost = costs.get(ct, 0)
536          revenue = type_data["total_revenue"]
537          conversions = type_data["total_conversions"]
538  
539          cpa_report.append({
540              "content_type": ct,
541              "total_cost": cost,
542              "total_revenue": revenue,
543              "conversions": conversions,
544              "cpa": round(cost / conversions, 2) if conversions else None,
545              "roi": round((revenue - cost) / cost, 2) if cost else None,
546              "roi_multiple": f"{round(revenue / cost, 1)}x" if cost else "N/A",
547          })
548  
549      cpa_report.sort(key=lambda x: (x["roi"] or 0), reverse=True)
550      return {"cpa_by_content_type": cpa_report}
551  
552  
553  def find_content_gaps(deals: list[dict]) -> dict:
554      """
555      Identify funnel stages with no or low content attribution.
556      """
557      stage_coverage = defaultdict(lambda: {"urls": set(), "deals": 0, "revenue": 0})
558  
559      for deal in deals:
560          stages_hit = set()
561          for tp in deal.get("touchpoints", []):
562              stage = classify_funnel_stage(tp["url"])
563              stage_coverage[stage]["urls"].add(tp["url"])
564              stages_hit.add(stage)
565  
566          for stage in stages_hit:
567              stage_coverage[stage]["deals"] += 1
568              stage_coverage[stage]["revenue"] += deal["amount"] / len(stages_hit)
569  
570      # Check for gaps
571      expected_stages = ["awareness", "consideration", "decision"]
572      gaps = []
573      for stage in expected_stages:
574          data = stage_coverage.get(stage, {"urls": set(), "deals": 0, "revenue": 0})
575          total_deals = len(deals)
576          coverage_pct = round(data["deals"] / total_deals * 100, 1) if total_deals else 0
577  
578          if coverage_pct < 30:
579              severity = "critical" if coverage_pct < 10 else "moderate"
580              gaps.append({
581                  "stage": stage,
582                  "coverage_percent": coverage_pct,
583                  "deals_with_stage": data["deals"],
584                  "content_pieces": len(data["urls"]),
585                  "severity": severity,
586                  "recommendation": _gap_recommendation(stage, coverage_pct),
587              })
588  
589      stage_summary = []
590      for stage in expected_stages:
591          data = stage_coverage.get(stage, {"urls": set(), "deals": 0, "revenue": 0})
592          stage_summary.append({
593              "stage": stage,
594              "content_pieces": len(data["urls"]),
595              "deals_touched": data["deals"],
596              "attributed_revenue": round(data["revenue"], 2),
597              "top_urls": list(data["urls"])[:5],
598          })
599  
600      return {
601          "gaps": gaps,
602          "stage_summary": stage_summary,
603          "total_deals_analyzed": len(deals),
604      }
605  
606  
607  def _gap_recommendation(stage: str, coverage_pct: float) -> str:
608      """Generate a recommendation for a content gap."""
609      recs = {
610          "awareness": "Create more top-of-funnel content (blog posts, videos, podcasts) targeting high-volume keywords. Focus on educational content that introduces the problem your product solves.",
611          "consideration": "Build comparison pages, case studies, and webinars that help prospects evaluate solutions. This is where you prove credibility and differentiation.",
612          "decision": "Add pricing pages, ROI calculators, free trials, and demo CTAs. Make it easy for ready-to-buy prospects to take action.",
613      }
614      return recs.get(stage, f"Create content for the {stage} stage to improve coverage from {coverage_pct}%.")
615  
616  
617  # ---------------------------------------------------------------------------
618  # Output Formatting
619  # ---------------------------------------------------------------------------
620  
621  def print_report(report: dict) -> None:
622      """Print attribution report in human-readable format."""
623      s = report["summary"]
624      print(f"\n{'='*70}")
625      print(f"  CONTENT REVENUE ATTRIBUTION REPORT")
626      print(f"  Model: {report['attribution_model']}")
627      print(f"  Generated: {report['generated_at']}")
628      print(f"{'='*70}")
629  
630      print(f"\n  📊 Summary")
631      print(f"     Total Revenue:          ${s['total_revenue']:,.0f}")
632      print(f"     Total Deals:            {s['total_deals']}")
633      print(f"     Avg Deal Size:          ${s['avg_deal_size']:,.0f}")
634      print(f"     Content w/ Attribution: {s['content_pieces_with_attribution']}")
635      print(f"     Avg Touchpoints/Deal:   {s['avg_touchpoints_per_deal']}")
636  
637      print(f"\n  📈 Revenue by Content Type")
638      print(f"  {'Type':<16} {'Revenue':>12} {'Sessions':>10} {'Pieces':>8} {'Avg/Piece':>12}")
639      print(f"  {'-'*58}")
640      for ct in report["by_content_type"]:
641          print(
642              f"  {ct['content_type']:<16} "
643              f"${ct['total_revenue']:>10,.0f} "
644              f"{ct['total_sessions']:>10,} "
645              f"{ct['piece_count']:>8} "
646              f"${ct['avg_revenue_per_piece']:>10,.0f}"
647          )
648  
649      print(f"\n  🏆 Top Content by Revenue")
650      print(f"  {'URL':<45} {'Revenue':>12} {'Sessions':>10} {'Type':<12}")
651      print(f"  {'-'*79}")
652      for cp in report["top_content"][:10]:
653          url_display = cp["url"][:43] + ".." if len(cp["url"]) > 45 else cp["url"]
654          print(
655              f"  {url_display:<45} "
656              f"${cp['attributed_revenue']:>10,.0f} "
657              f"{cp['sessions']:>10,} "
658              f"{cp['content_type']:<12}"
659          )
660  
661      print()
662  
663  
664  def print_gaps(gaps_report: dict) -> None:
665      """Print content gap analysis."""
666      print(f"\n{'='*70}")
667      print(f"  CONTENT GAP ANALYSIS")
668      print(f"{'='*70}")
669  
670      print(f"\n  📊 Funnel Stage Coverage ({gaps_report['total_deals_analyzed']} deals)")
671      for stage in gaps_report["stage_summary"]:
672          print(f"\n  {stage['stage'].upper()}")
673          print(f"     Content Pieces: {stage['content_pieces']}")
674          print(f"     Deals Touched:  {stage['deals_touched']}")
675          print(f"     Revenue:        ${stage['attributed_revenue']:,.0f}")
676  
677      if gaps_report["gaps"]:
678          print(f"\n  ⚠️  Gaps Identified")
679          for gap in gaps_report["gaps"]:
680              print(f"\n  [{gap['severity'].upper()}] {gap['stage'].upper()} — {gap['coverage_percent']}% coverage")
681              print(f"  → {gap['recommendation']}")
682      else:
683          print(f"\n  ✅ No significant gaps found")
684  
685      print()
686  
687  
688  # ---------------------------------------------------------------------------
689  # Main
690  # ---------------------------------------------------------------------------
691  
692  def main():
693      parser = argparse.ArgumentParser(
694          description="Map content to revenue with multi-touch attribution.",
695          formatter_class=argparse.RawDescriptionHelpFormatter,
696          epilog="""
697  Examples:
698    %(prog)s --report
699    %(prog)s --report --model time-decay
700    %(prog)s --cpa --costs content_costs.json
701    %(prog)s --gaps
702    %(prog)s --report --start 2025-01-01 --end 2025-03-31 --json
703          """,
704      )
705  
706      parser.add_argument("--report", action="store_true", help="Generate attribution report")
707      parser.add_argument("--gaps", action="store_true", help="Identify content gaps in buyer journey")
708      parser.add_argument("--cpa", action="store_true", help="Calculate cost-per-acquisition by content type")
709  
710      parser.add_argument("--model", choices=["first-touch", "last-touch", "linear", "time-decay"],
711                          default="linear", help="Attribution model (default: linear)")
712      parser.add_argument("--start", help="Start date YYYY-MM-DD (default: 90 days ago)")
713      parser.add_argument("--end", help="End date YYYY-MM-DD (default: today)")
714      parser.add_argument("--costs", help="JSON file with content costs by type (for --cpa)")
715  
716      parser.add_argument("--json", action="store_true", help="Output raw JSON")
717      parser.add_argument("--output", "-o", help="Write output to file")
718  
719      args = parser.parse_args()
720  
721      if not (args.report or args.gaps or args.cpa):
722          parser.error("At least one of --report, --gaps, or --cpa is required")
723  
724      # Date range
725      end_date = args.end or datetime.utcnow().strftime("%Y-%m-%d")
726      start_date = args.start or (datetime.utcnow() - timedelta(days=90)).strftime("%Y-%m-%d")
727  
728      print(f"Fetching data for {start_date} to {end_date}...", file=sys.stderr)
729  
730      # Fetch data
731      ga4_data = fetch_ga4_page_data(start_date, end_date)
732      deals = fetch_hubspot_deals(start_date, end_date)
733  
734      output = {
735          "date_range": {"start": start_date, "end": end_date},
736          "generated_at": datetime.utcnow().isoformat() + "Z",
737      }
738  
739      if args.report:
740          report = generate_attribution_report(deals, ga4_data, model=args.model)
741          output["attribution_report"] = report
742          if not args.json:
743              print_report(report)
744  
745      if args.cpa:
746          if not args.report:
747              report = generate_attribution_report(deals, ga4_data, model=args.model)
748              output["attribution_report"] = report
749  
750          costs = {}
751          if args.costs:
752              costs_path = Path(args.costs)
753              if costs_path.exists():
754                  costs = json.loads(costs_path.read_text())
755              else:
756                  print(f"WARNING: Costs file not found: {args.costs}. Using empty costs.", file=sys.stderr)
757  
758          cpa_data = calculate_cpa(output["attribution_report"], costs)
759          output["cpa"] = cpa_data
760  
761          if not args.json:
762              print(f"\n{'='*70}")
763              print(f"  COST PER ACQUISITION BY CONTENT TYPE")
764              print(f"{'='*70}")
765              print(f"  {'Type':<16} {'Cost':>10} {'Revenue':>12} {'CPA':>10} {'ROI':>8}")
766              print(f"  {'-'*56}")
767              for row in cpa_data["cpa_by_content_type"]:
768                  cpa_str = f"${row['cpa']:,.0f}" if row["cpa"] is not None else "N/A"
769                  roi_str = row["roi_multiple"]
770                  print(
771                      f"  {row['content_type']:<16} "
772                      f"${row['total_cost']:>8,} "
773                      f"${row['total_revenue']:>10,.0f} "
774                      f"{cpa_str:>10} "
775                      f"{roi_str:>8}"
776                  )
777              print()
778  
779      if args.gaps:
780          gaps_data = find_content_gaps(deals)
781          output["gaps"] = gaps_data
782          if not args.json:
783              print_gaps(gaps_data)
784  
785      if args.json:
786          print(json.dumps(output, indent=2, default=str))
787  
788      if args.output:
789          out_path = Path(args.output)
790          out_path.parent.mkdir(parents=True, exist_ok=True)
791          out_path.write_text(json.dumps(output, indent=2, default=str))
792          if not args.json:
793              print(f"✅ Output written to {args.output}")
794  
795  
796  if __name__ == "__main__":
797      main()