/ podcast-ops / podcast_pipeline.py
podcast_pipeline.py
   1  #!/usr/bin/env python3
   2  """
   3  Podcast-to-Everything Pipeline
   4  ===============================
   5  Takes a podcast RSS feed or raw transcript and generates a full cross-platform
   6  content calendar: video clips, Twitter/X threads, LinkedIn articles, newsletter
   7  sections, quote cards, blog outlines, and YouTube Shorts/TikTok scripts.
   8  
   9  Usage:
  10      python podcast_pipeline.py --rss "https://feeds.example.com/podcast.xml"
  11      python podcast_pipeline.py --transcript episode.txt
  12      python podcast_pipeline.py --batch "https://feeds.example.com/podcast.xml" --episodes 5
  13      python podcast_pipeline.py --calendar
  14  """
  15  
  16  import argparse
  17  import hashlib
  18  import json
  19  import os
  20  import re
  21  import sys
  22  import tempfile
  23  from datetime import datetime, timedelta
  24  from pathlib import Path
  25  from typing import Optional
  26  
  27  import feedparser
  28  import requests
  29  from dateutil import parser as dateparser
  30  from slugify import slugify
  31  from tqdm import tqdm
  32  
  33  # ---------------------------------------------------------------------------
  34  # Configuration
  35  # ---------------------------------------------------------------------------
  36  
  37  OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "")
  38  ANTHROPIC_API_KEY = os.environ.get("ANTHROPIC_API_KEY", "")
  39  
  40  # Default output directory (overridable via --output-dir)
  41  DEFAULT_OUTPUT_DIR = Path("./output")
  42  
  43  # Dedup similarity threshold (0-1). Pairs above this are flagged as duplicates.
  44  DEDUP_SIMILARITY_THRESHOLD = 0.70
  45  
  46  # Default number of days to look back for dedup
  47  DEFAULT_DEDUP_DAYS = 30
  48  
  49  # Content generation model (Anthropic Claude)
  50  ANTHROPIC_MODEL = "claude-sonnet-4-20250514"
  51  
  52  # Whisper model for transcription
  53  WHISPER_MODEL = "whisper-1"
  54  
  55  # Platform scheduling defaults (hour in ET)
  56  SCHEDULE_RULES = {
  57      "twitter": {"times": ["09:00", "12:30", "17:00"], "max_per_day": 2},
  58      "linkedin": {"times": ["08:00", "09:00"], "max_per_day": 1, "best_days": [1, 2, 3]},  # Tue-Thu
  59      "youtube_shorts": {"times": ["18:00", "19:00"], "max_per_day": 1},
  60      "tiktok": {"times": ["18:00", "20:00"], "max_per_day": 1},
  61      "newsletter": {"times": ["08:00"], "max_per_week": 1, "best_day": 2},  # Wednesday
  62      "blog": {"times": ["10:00"], "max_per_week": 2},
  63      "quote_card": {"times": ["11:00", "15:00"], "max_per_day": 2},
  64  }
  65  
  66  
  67  # ---------------------------------------------------------------------------
  68  # API Clients
  69  # ---------------------------------------------------------------------------
  70  
  71  
  72  def transcribe_audio(audio_path: str) -> dict:
  73      """
  74      Transcribe an audio file using OpenAI Whisper API.
  75      Returns dict with 'text' (full transcript) and 'segments' (timestamped chunks).
  76      """
  77      if not OPENAI_API_KEY:
  78          print("ERROR: OPENAI_API_KEY not set. Cannot transcribe audio.", file=sys.stderr)
  79          sys.exit(1)
  80  
  81      print(f"  Transcribing: {audio_path}")
  82      url = "https://api.openai.com/v1/audio/transcriptions"
  83      headers = {"Authorization": f"Bearer {OPENAI_API_KEY}"}
  84  
  85      with open(audio_path, "rb") as audio_file:
  86          # Request verbose JSON to get timestamps
  87          response = requests.post(
  88              url,
  89              headers=headers,
  90              files={"file": audio_file},
  91              data={
  92                  "model": WHISPER_MODEL,
  93                  "response_format": "verbose_json",
  94                  "timestamp_granularities[]": "segment",
  95              },
  96              timeout=600,  # 10 min timeout for long episodes
  97          )
  98  
  99      if response.status_code != 200:
 100          print(f"ERROR: Whisper API returned {response.status_code}: {response.text}", file=sys.stderr)
 101          sys.exit(1)
 102  
 103      result = response.json()
 104      return {
 105          "text": result.get("text", ""),
 106          "segments": result.get("segments", []),
 107          "duration": result.get("duration", 0),
 108          "language": result.get("language", "en"),
 109      }
 110  
 111  
 112  def call_anthropic(system_prompt: str, user_prompt: str, max_tokens: int = 8000) -> str:
 113      """
 114      Call Anthropic Claude API for content generation.
 115      Returns the text response.
 116      """
 117      if not ANTHROPIC_API_KEY:
 118          print("ERROR: ANTHROPIC_API_KEY not set. Cannot generate content.", file=sys.stderr)
 119          sys.exit(1)
 120  
 121      # Using the anthropic SDK
 122      import anthropic
 123  
 124      client = anthropic.Anthropic(api_key=ANTHROPIC_API_KEY)
 125      message = client.messages.create(
 126          model=ANTHROPIC_MODEL,
 127          max_tokens=max_tokens,
 128          system=system_prompt,
 129          messages=[{"role": "user", "content": user_prompt}],
 130      )
 131      return message.content[0].text
 132  
 133  
 134  # ---------------------------------------------------------------------------
 135  # RSS Feed Handling
 136  # ---------------------------------------------------------------------------
 137  
 138  
 139  def fetch_rss_episodes(rss_url: str, num_episodes: int = 1) -> list[dict]:
 140      """
 141      Fetch episode metadata from an RSS feed.
 142      Returns list of dicts with: title, date, description, audio_url, duration.
 143      """
 144      print(f"Fetching RSS feed: {rss_url}")
 145      feed = feedparser.parse(rss_url)
 146  
 147      if feed.bozo and not feed.entries:
 148          print(f"ERROR: Failed to parse RSS feed: {feed.bozo_exception}", file=sys.stderr)
 149          sys.exit(1)
 150  
 151      episodes = []
 152      for entry in feed.entries[:num_episodes]:
 153          # Find the audio enclosure
 154          audio_url = None
 155          for link in entry.get("links", []):
 156              if link.get("type", "").startswith("audio/"):
 157                  audio_url = link.get("href")
 158                  break
 159          # Fallback: check enclosures
 160          if not audio_url:
 161              for enc in entry.get("enclosures", []):
 162                  if enc.get("type", "").startswith("audio/"):
 163                      audio_url = enc.get("url")
 164                      break
 165  
 166          # Parse publish date
 167          pub_date = None
 168          if hasattr(entry, "published"):
 169              try:
 170                  pub_date = dateparser.parse(entry.published).strftime("%Y-%m-%d")
 171              except (ValueError, TypeError):
 172                  pub_date = None
 173  
 174          episodes.append({
 175              "title": entry.get("title", "Untitled Episode"),
 176              "date": pub_date or datetime.now().strftime("%Y-%m-%d"),
 177              "description": entry.get("summary", ""),
 178              "audio_url": audio_url,
 179              "duration": entry.get("itunes_duration", "unknown"),
 180          })
 181  
 182      print(f"  Found {len(episodes)} episode(s)")
 183      return episodes
 184  
 185  
 186  def download_audio(audio_url: str) -> str:
 187      """
 188      Download an audio file to a temp directory. Returns the local file path.
 189      """
 190      print(f"  Downloading audio: {audio_url[:80]}...")
 191      tmp_dir = tempfile.mkdtemp(prefix="podcast_pipeline_")
 192      # Determine extension from URL
 193      ext = ".mp3"
 194      if ".m4a" in audio_url:
 195          ext = ".m4a"
 196      elif ".wav" in audio_url:
 197          ext = ".wav"
 198      elif ".ogg" in audio_url:
 199          ext = ".ogg"
 200  
 201      local_path = os.path.join(tmp_dir, f"episode{ext}")
 202  
 203      response = requests.get(audio_url, stream=True, timeout=300)
 204      response.raise_for_status()
 205  
 206      total_size = int(response.headers.get("content-length", 0))
 207      with open(local_path, "wb") as f:
 208          with tqdm(total=total_size, unit="B", unit_scale=True, desc="Downloading") as pbar:
 209              for chunk in response.iter_content(chunk_size=8192):
 210                  f.write(chunk)
 211                  pbar.update(len(chunk))
 212  
 213      print(f"  Saved to: {local_path}")
 214      return local_path
 215  
 216  
 217  # ---------------------------------------------------------------------------
 218  # Transcript Processing
 219  # ---------------------------------------------------------------------------
 220  
 221  
 222  def read_transcript(file_path: str) -> dict:
 223      """
 224      Read a transcript from a file. Supports plain text, SRT, and VTT formats.
 225      Returns dict with 'text' and 'segments'.
 226      """
 227      path = Path(file_path)
 228      if not path.exists():
 229          print(f"ERROR: Transcript file not found: {file_path}", file=sys.stderr)
 230          sys.exit(1)
 231  
 232      raw = path.read_text(encoding="utf-8")
 233  
 234      # Detect format and parse
 235      if file_path.endswith(".srt"):
 236          return parse_srt(raw)
 237      elif file_path.endswith(".vtt"):
 238          return parse_vtt(raw)
 239      else:
 240          # Plain text — no timestamps
 241          return {"text": raw, "segments": [], "duration": 0, "language": "en"}
 242  
 243  
 244  def parse_srt(raw: str) -> dict:
 245      """Parse SRT subtitle format into text + segments."""
 246      segments = []
 247      blocks = re.split(r"\n\n+", raw.strip())
 248      full_text_parts = []
 249  
 250      for block in blocks:
 251          lines = block.strip().split("\n")
 252          if len(lines) < 3:
 253              continue
 254          # Line 0: sequence number, Line 1: timestamps, Line 2+: text
 255          time_match = re.match(
 256              r"(\d{2}:\d{2}:\d{2},\d{3})\s*-->\s*(\d{2}:\d{2}:\d{2},\d{3})", lines[1]
 257          )
 258          if not time_match:
 259              continue
 260          text = " ".join(lines[2:])
 261          full_text_parts.append(text)
 262          segments.append({
 263              "start": srt_time_to_seconds(time_match.group(1)),
 264              "end": srt_time_to_seconds(time_match.group(2)),
 265              "text": text,
 266          })
 267  
 268      return {
 269          "text": " ".join(full_text_parts),
 270          "segments": segments,
 271          "duration": segments[-1]["end"] if segments else 0,
 272          "language": "en",
 273      }
 274  
 275  
 276  def parse_vtt(raw: str) -> dict:
 277      """Parse WebVTT format into text + segments."""
 278      # Strip WEBVTT header
 279      raw = re.sub(r"^WEBVTT.*?\n\n", "", raw, flags=re.DOTALL)
 280      # VTT uses . instead of , for milliseconds but is otherwise similar to SRT
 281      raw = raw.replace(".", ",")  # Normalize for the SRT parser
 282      return parse_srt(raw)
 283  
 284  
 285  def srt_time_to_seconds(time_str: str) -> float:
 286      """Convert SRT timestamp (HH:MM:SS,mmm) to seconds."""
 287      h, m, rest = time_str.split(":")
 288      s, ms = rest.split(",")
 289      return int(h) * 3600 + int(m) * 60 + int(s) + int(ms) / 1000
 290  
 291  
 292  # ---------------------------------------------------------------------------
 293  # Editorial Brain — Content Atom Extraction
 294  # ---------------------------------------------------------------------------
 295  
 296  
 297  def extract_content_atoms(transcript: dict, episode_meta: dict) -> list[dict]:
 298      """
 299      Feed the transcript to the LLM to extract content atoms:
 300      narrative arcs, quotes, controversial takes, data points, stories,
 301      frameworks, and predictions.
 302      """
 303      print("  Running Editorial Brain — extracting content atoms...")
 304  
 305      system_prompt = """You are an expert content strategist and editorial brain.
 306  Your job is to analyze podcast transcripts and extract content atoms — the raw
 307  material that can be turned into social media posts, articles, videos, and more.
 308  
 309  You think like a viral content creator: you spot the moments that make people
 310  stop scrolling, the takes that spark debate, and the insights people screenshot
 311  and share.
 312  
 313  Return your analysis as a JSON array of content atoms."""
 314  
 315      user_prompt = f"""Analyze this podcast transcript and extract ALL content atoms.
 316  
 317  Episode: {episode_meta.get('title', 'Unknown')}
 318  Date: {episode_meta.get('date', 'Unknown')}
 319  Description: {episode_meta.get('description', '')[:500]}
 320  
 321  TRANSCRIPT:
 322  {transcript['text'][:30000]}
 323  
 324  ---
 325  
 326  Extract content atoms in these 7 categories. Find ALL of them — be thorough.
 327  
 328  1. **narrative_arc** — Complete story segments (setup → tension → resolution). Include timestamps if available.
 329  2. **quote** — Punchy, shareable one-liners. Must pass the "would someone screenshot this?" test.
 330  3. **controversial_take** — Opinions against conventional wisdom. The "hard disagree" or "finally someone said it" stuff.
 331  4. **data_point** — Specific numbers, percentages, dollar amounts. Concrete proof points.
 332  5. **story** — Personal anecdotes, case studies. Must have character + problem + outcome.
 333  6. **framework** — Step-by-step processes, mental models. Things people save/bookmark.
 334  7. **prediction** — Forward-looking claims about trends, markets, tech.
 335  
 336  Return ONLY a JSON array. Each atom:
 337  {{
 338    "type": "narrative_arc|quote|controversial_take|data_point|story|framework|prediction",
 339    "content": "the extracted text, cleaned up for readability",
 340    "timestamp": "MM:SS - MM:SS or null if not available",
 341    "context": "what was being discussed when this came up",
 342    "suggested_platforms": ["twitter", "linkedin", "youtube_shorts", "tiktok", "newsletter", "blog", "quote_card"]
 343  }}
 344  
 345  Find at least 15 atoms total. Prioritize quality and shareability."""
 346  
 347      response = call_anthropic(system_prompt, user_prompt, max_tokens=6000)
 348  
 349      # Parse JSON from the response (handle markdown code blocks)
 350      json_match = re.search(r"```(?:json)?\s*(\[.*?\])\s*```", response, re.DOTALL)
 351      if json_match:
 352          atoms = json.loads(json_match.group(1))
 353      else:
 354          # Try parsing the whole response as JSON
 355          try:
 356              atoms = json.loads(response)
 357          except json.JSONDecodeError:
 358              # Last resort: find the JSON array in the response
 359              start = response.find("[")
 360              end = response.rfind("]") + 1
 361              if start >= 0 and end > start:
 362                  atoms = json.loads(response[start:end])
 363              else:
 364                  print("  WARNING: Could not parse atoms from LLM response. Using empty list.", file=sys.stderr)
 365                  atoms = []
 366  
 367      print(f"  Extracted {len(atoms)} content atoms")
 368      return atoms
 369  
 370  
 371  # ---------------------------------------------------------------------------
 372  # Content Generation — Turn Atoms into Platform-Native Content
 373  # ---------------------------------------------------------------------------
 374  
 375  
 376  def generate_content_pieces(atoms: list[dict], episode_meta: dict) -> list[dict]:
 377      """
 378      Take extracted content atoms and generate all platform-specific content pieces.
 379      Returns a list of content piece dicts.
 380      """
 381      print("  Generating platform-native content pieces...")
 382  
 383      atoms_json = json.dumps(atoms, indent=2)
 384  
 385      system_prompt = """You are a world-class content repurposing engine.
 386  Given content atoms extracted from a podcast episode, you generate platform-native
 387  content pieces that maximize engagement on each platform.
 388  
 389  You understand platform-specific best practices:
 390  - Twitter/X: punchy, data-driven, thread hooks, < 280 chars per tweet
 391  - LinkedIn: professional but human, story-driven, hook before the fold
 392  - YouTube Shorts/TikTok: HOOK(3s) → SETUP(12s) → PAYOFF(30s) → CTA(15s)
 393  - Newsletter: scannable, value-dense, pull quotes
 394  - Blog: SEO-optimized, structured with H2s, 1500-2500 words outlined
 395  - Quote cards: max 20 words, standalone impact
 396  
 397  Return ONLY valid JSON."""
 398  
 399      user_prompt = f"""Generate a full content suite from these podcast content atoms.
 400  
 401  Episode: {episode_meta.get('title', 'Unknown')}
 402  Date: {episode_meta.get('date', 'Unknown')}
 403  
 404  CONTENT ATOMS:
 405  {atoms_json[:20000]}
 406  
 407  ---
 408  
 409  Generate ALL of the following. Return as a JSON array of content pieces:
 410  
 411  1. **video_clip** (3-5 pieces): Short-form video clip suggestions
 412     - hook: first 3 seconds (pattern interrupt or bold claim)
 413     - clip_description: what happens in the clip
 414     - timestamp: approximate range from transcript
 415     - caption_overlay: text for the screen
 416     - platform: "youtube_shorts" or "tiktok"
 417     - source_atoms: which atom indexes feed this
 418  
 419  2. **twitter_thread** (2-3 pieces): Full thread outlines
 420     - tweets: array of tweet texts (5-10 tweets, each < 280 chars)
 421     - thread_hook: the first tweet (must create curiosity gap)
 422     - cta: closing tweet with engagement driver
 423     - source_atoms: which atom indexes
 424  
 425  3. **linkedin_article** (1 piece): Full draft
 426     - headline: specific, benefit-driven
 427     - hook: first paragraph (before "see more" fold)
 428     - body: full article text (800-1200 words, with section headers)
 429     - cta: engagement question
 430     - hashtags: 3-5 relevant tags
 431     - source_atoms: which atom indexes
 432  
 433  4. **newsletter_section** (1 piece):
 434     - headline: scannable
 435     - tldr: one sentence core insight
 436     - bullets: 3-5 takeaway bullet points
 437     - pull_quote: most shareable line
 438     - source_atoms: which atom indexes
 439  
 440  5. **quote_card** (3-5 pieces):
 441     - quote_text: max 20 words
 442     - attribution: speaker name
 443     - background_mood: color/mood suggestion
 444     - source_atoms: which atom indexes
 445  
 446  6. **blog_outline** (1 piece):
 447     - title: SEO-optimized
 448     - primary_keyword: main search term to target
 449     - secondary_keywords: 3-5 related terms
 450     - meta_description: max 155 chars
 451     - sections: array of H2 headings with brief descriptions
 452     - estimated_word_count: 1500-2500
 453     - source_atoms: which atom indexes
 454  
 455  7. **short_script** (1 piece): YouTube Shorts / TikTok script
 456     - hook: 0-3 seconds text
 457     - setup: 3-15 seconds text
 458     - payoff: 15-45 seconds text
 459     - cta: 45-60 seconds text
 460     - on_screen_text: key phrases to overlay
 461     - broll_suggestions: visual ideas
 462     - source_atoms: which atom indexes
 463  
 464  Each piece must include:
 465  {{
 466    "type": "video_clip|twitter_thread|linkedin_article|newsletter_section|quote_card|blog_outline|short_script",
 467    "platform": "twitter|linkedin|youtube_shorts|tiktok|newsletter|blog|quote_card",
 468    "content": {{ ... type-specific fields ... }},
 469    "source_atoms": [0, 2, 5],
 470    "viral_score_estimate": 0-100
 471  }}"""
 472  
 473      response = call_anthropic(system_prompt, user_prompt, max_tokens=8000)
 474  
 475      # Parse JSON
 476      json_match = re.search(r"```(?:json)?\s*(\[.*?\])\s*```", response, re.DOTALL)
 477      if json_match:
 478          pieces = json.loads(json_match.group(1))
 479      else:
 480          try:
 481              pieces = json.loads(response)
 482          except json.JSONDecodeError:
 483              start = response.find("[")
 484              end = response.rfind("]") + 1
 485              if start >= 0 and end > start:
 486                  pieces = json.loads(response[start:end])
 487              else:
 488                  print("  WARNING: Could not parse content pieces. Using empty list.", file=sys.stderr)
 489                  pieces = []
 490  
 491      print(f"  Generated {len(pieces)} content pieces")
 492      return pieces
 493  
 494  
 495  # ---------------------------------------------------------------------------
 496  # Viral Scoring
 497  # ---------------------------------------------------------------------------
 498  
 499  
 500  def score_content_pieces(pieces: list[dict], atoms: list[dict]) -> list[dict]:
 501      """
 502      Score each content piece on viral potential: novelty × controversy × utility.
 503      Updates each piece in-place with a 'viral_score' field.
 504      """
 505      print("  Scoring content pieces for viral potential...")
 506  
 507      # If the LLM already estimated scores, we can refine them.
 508      # For a more robust approach, we'd call the LLM to score each piece.
 509      # Here we use a hybrid: trust LLM estimates + apply heuristic adjustments.
 510  
 511      for piece in pieces:
 512          base_score = piece.get("viral_score_estimate", 50)
 513  
 514          # Heuristic adjustments based on content type
 515          type_bonus = {
 516              "video_clip": 5,        # Video inherently more engaging
 517              "twitter_thread": 3,    # Threads get algorithmic boost
 518              "linkedin_article": 0,  # Neutral
 519              "newsletter_section": -2,  # Lower viral ceiling
 520              "quote_card": 2,        # Highly shareable format
 521              "blog_outline": -3,     # SEO play, not viral play
 522              "short_script": 5,      # Short-form video bonus
 523          }
 524          bonus = type_bonus.get(piece.get("type", ""), 0)
 525  
 526          # Check if source atoms include high-value types
 527          source_indices = piece.get("source_atoms", [])
 528          for idx in source_indices:
 529              if idx < len(atoms):
 530                  atom = atoms[idx]
 531                  atom_type = atom.get("type", "")
 532                  if atom_type == "controversial_take":
 533                      bonus += 5  # Controversy drives engagement
 534                  elif atom_type == "data_point":
 535                      bonus += 3  # Specificity builds credibility
 536                  elif atom_type == "prediction":
 537                      bonus += 4  # Predictions spark debate
 538  
 539          # Calculate final score (clamp 0-100)
 540          final_score = max(0, min(100, base_score + bonus))
 541          piece["viral_score"] = final_score
 542  
 543          # Break down the components (approximate from the composite)
 544          piece["score_breakdown"] = {
 545              "novelty": min(100, int(final_score * 1.1)),     # Slightly inflate for reporting
 546              "controversy": min(100, int(final_score * 0.9)),
 547              "utility": min(100, int(final_score * 1.0)),
 548          }
 549  
 550      # Sort by viral score descending
 551      pieces.sort(key=lambda p: p.get("viral_score", 0), reverse=True)
 552  
 553      avg_score = sum(p.get("viral_score", 0) for p in pieces) / max(len(pieces), 1)
 554      print(f"  Average viral score: {avg_score:.1f}")
 555  
 556      return pieces
 557  
 558  
 559  # ---------------------------------------------------------------------------
 560  # Deduplication Engine
 561  # ---------------------------------------------------------------------------
 562  
 563  
 564  def load_content_history(output_dir: Path, dedup_days: int) -> list[dict]:
 565      """Load content history from the last N days for dedup checking."""
 566      history_path = output_dir / "content_history.json"
 567      if not history_path.exists():
 568          return []
 569  
 570      with open(history_path) as f:
 571          history = json.load(f)
 572  
 573      cutoff = (datetime.now() - timedelta(days=dedup_days)).isoformat()
 574      return [h for h in history if h.get("date", "") >= cutoff]
 575  
 576  
 577  def compute_content_hash(piece: dict) -> str:
 578      """Generate a hash for a content piece based on its core content."""
 579      content_str = json.dumps(piece.get("content", {}), sort_keys=True)
 580      return hashlib.sha256(content_str.encode()).hexdigest()[:16]
 581  
 582  
 583  def simple_text_similarity(text_a: str, text_b: str) -> float:
 584      """
 585      Simple word-overlap similarity (Jaccard index).
 586      For production, replace with embedding-based cosine similarity.
 587      """
 588      words_a = set(text_a.lower().split())
 589      words_b = set(text_b.lower().split())
 590      if not words_a or not words_b:
 591          return 0.0
 592      intersection = words_a & words_b
 593      union = words_a | words_b
 594      return len(intersection) / len(union)
 595  
 596  
 597  def get_piece_text(piece: dict) -> str:
 598      """Extract the main text content from a piece for similarity comparison."""
 599      content = piece.get("content", {})
 600      if isinstance(content, str):
 601          return content
 602      # Concatenate all string values from the content dict
 603      parts = []
 604      for v in content.values():
 605          if isinstance(v, str):
 606              parts.append(v)
 607          elif isinstance(v, list):
 608              for item in v:
 609                  if isinstance(item, str):
 610                      parts.append(item)
 611      return " ".join(parts)
 612  
 613  
 614  def deduplicate(pieces: list[dict], history: list[dict], threshold: float = DEDUP_SIMILARITY_THRESHOLD) -> list[dict]:
 615      """
 616      Remove content pieces that are too similar to each other or to recent history.
 617      Returns filtered list with dedup flags.
 618      """
 619      print("  Running dedup engine...")
 620  
 621      # Dedup within this batch
 622      kept = []
 623      removed = 0
 624      for piece in pieces:
 625          piece_text = get_piece_text(piece)
 626          piece["content_hash"] = compute_content_hash(piece)
 627          is_dupe = False
 628  
 629          # Check against already-kept pieces in this batch
 630          for kept_piece in kept:
 631              sim = simple_text_similarity(piece_text, get_piece_text(kept_piece))
 632              if sim > threshold:
 633                  is_dupe = True
 634                  removed += 1
 635                  break
 636  
 637          # Check against history
 638          if not is_dupe:
 639              for hist_piece in history:
 640                  sim = simple_text_similarity(piece_text, hist_piece.get("text_preview", ""))
 641                  if sim > threshold:
 642                      piece["dedup_warning"] = f"⚠️ Similar to previously published content (similarity: {sim:.0%})"
 643                      # Don't remove, just flag — might still be worth publishing with a different angle
 644                      break
 645  
 646          if not is_dupe:
 647              kept.append(piece)
 648  
 649      print(f"  Kept {len(kept)}/{len(pieces)} pieces ({removed} removed as duplicates)")
 650      return kept
 651  
 652  
 653  def save_to_history(pieces: list[dict], output_dir: Path, episode_meta: dict):
 654      """Save content pieces to history for future dedup."""
 655      history_path = output_dir / "content_history.json"
 656      history = []
 657      if history_path.exists():
 658          with open(history_path) as f:
 659              history = json.load(f)
 660  
 661      for piece in pieces:
 662          history.append({
 663              "date": datetime.now().isoformat(),
 664              "episode": episode_meta.get("title", ""),
 665              "type": piece.get("type", ""),
 666              "content_hash": piece.get("content_hash", ""),
 667              "text_preview": get_piece_text(piece)[:200],
 668              "viral_score": piece.get("viral_score", 0),
 669          })
 670  
 671      with open(history_path, "w") as f:
 672          json.dump(history, f, indent=2)
 673  
 674  
 675  # ---------------------------------------------------------------------------
 676  # Calendar Generation
 677  # ---------------------------------------------------------------------------
 678  
 679  
 680  def generate_calendar(pieces: list[dict], episode_meta: dict, start_date: Optional[str] = None) -> dict:
 681      """
 682      Generate a weekly content calendar from scored, deduplicated pieces.
 683      Assigns publish dates/times based on platform best practices.
 684      """
 685      print("  Generating content calendar...")
 686  
 687      if start_date:
 688          cal_start = dateparser.parse(start_date)
 689      else:
 690          # Start next Monday
 691          today = datetime.now()
 692          days_until_monday = (7 - today.weekday()) % 7
 693          if days_until_monday == 0:
 694              days_until_monday = 7
 695          cal_start = today + timedelta(days=days_until_monday)
 696  
 697      calendar = {
 698          "week_of": cal_start.strftime("%Y-%m-%d"),
 699          "episode_source": episode_meta.get("title", "Unknown"),
 700          "generated_at": datetime.now().isoformat(),
 701          "content_pieces": [],
 702          "total_pieces": 0,
 703          "avg_viral_score": 0,
 704          "coverage": {},
 705      }
 706  
 707      # Group pieces by platform
 708      platform_buckets: dict[str, list[dict]] = {}
 709      for piece in pieces:
 710          platform = piece.get("platform", "unknown")
 711          platform_buckets.setdefault(platform, []).append(piece)
 712  
 713      scheduled_pieces = []
 714      day_offset = 0
 715      max_days = 7
 716  
 717      for day_offset in range(max_days):
 718          current_date = cal_start + timedelta(days=day_offset)
 719          day_of_week = current_date.weekday()  # 0=Mon, 6=Sun
 720  
 721          for platform, bucket in platform_buckets.items():
 722              if not bucket:
 723                  continue
 724  
 725              rules = SCHEDULE_RULES.get(platform, {"times": ["10:00"], "max_per_day": 1})
 726  
 727              # Check day restrictions
 728              if "best_days" in rules and day_of_week not in rules["best_days"]:
 729                  continue
 730              if "best_day" in rules and day_of_week != rules["best_day"]:
 731                  continue
 732  
 733              # Check weekly limits
 734              if "max_per_week" in rules:
 735                  already_scheduled = sum(
 736                      1 for sp in scheduled_pieces if sp.get("platform") == platform
 737                  )
 738                  if already_scheduled >= rules["max_per_week"]:
 739                      continue
 740  
 741              # Schedule up to max_per_day
 742              daily_count = 0
 743              while bucket and daily_count < rules.get("max_per_day", 1):
 744                  piece = bucket.pop(0)
 745                  time_slot = rules["times"][daily_count % len(rules["times"])]
 746  
 747                  scheduled_piece = {
 748                      "date": current_date.strftime("%Y-%m-%d"),
 749                      "time": f"{time_slot} ET",
 750                      "platform": platform,
 751                      "type": piece.get("type", "unknown"),
 752                      "content": piece.get("content", {}),
 753                      "viral_score": piece.get("viral_score", 0),
 754                      "status": "draft",
 755                      "content_hash": piece.get("content_hash", ""),
 756                  }
 757                  if "dedup_warning" in piece:
 758                      scheduled_piece["dedup_warning"] = piece["dedup_warning"]
 759  
 760                  scheduled_pieces.append(scheduled_piece)
 761                  daily_count += 1
 762  
 763      # Build coverage summary
 764      coverage = {}
 765      for sp in scheduled_pieces:
 766          platform = sp.get("platform", "unknown")
 767          coverage[platform] = coverage.get(platform, 0) + 1
 768  
 769      calendar["content_pieces"] = scheduled_pieces
 770      calendar["total_pieces"] = len(scheduled_pieces)
 771      calendar["avg_viral_score"] = (
 772          sum(sp.get("viral_score", 0) for sp in scheduled_pieces) / max(len(scheduled_pieces), 1)
 773      )
 774      calendar["coverage"] = coverage
 775  
 776      print(f"  Calendar: {len(scheduled_pieces)} pieces across {len(coverage)} platforms")
 777      return calendar
 778  
 779  
 780  def generate_calendar_from_outputs(output_dir: Path) -> dict:
 781      """
 782      Aggregate calendar from all episode outputs in the output directory.
 783      Used with --calendar flag to create a unified weekly calendar.
 784      """
 785      episodes_dir = output_dir / "episodes"
 786      if not episodes_dir.exists():
 787          print("ERROR: No episodes found in output directory.", file=sys.stderr)
 788          sys.exit(1)
 789  
 790      all_pieces = []
 791      for ep_dir in sorted(episodes_dir.iterdir()):
 792          pieces_file = ep_dir / "content_pieces.json"
 793          if pieces_file.exists():
 794              with open(pieces_file) as f:
 795                  pieces = json.load(f)
 796                  all_pieces.extend(pieces)
 797  
 798      if not all_pieces:
 799          print("ERROR: No content pieces found.", file=sys.stderr)
 800          sys.exit(1)
 801  
 802      # Sort by viral score and take the best
 803      all_pieces.sort(key=lambda p: p.get("viral_score", 0), reverse=True)
 804  
 805      meta = {"title": "Aggregated Calendar", "date": datetime.now().strftime("%Y-%m-%d")}
 806      return generate_calendar(all_pieces, meta)
 807  
 808  
 809  # ---------------------------------------------------------------------------
 810  # Pipeline Orchestration
 811  # ---------------------------------------------------------------------------
 812  
 813  
 814  def process_episode(
 815      transcript: dict,
 816      episode_meta: dict,
 817      output_dir: Path,
 818      dedup_days: int = DEFAULT_DEDUP_DAYS,
 819      min_score: int = 0,
 820  ) -> dict:
 821      """
 822      Full pipeline for one episode:
 823      1. Extract content atoms (Editorial Brain)
 824      2. Generate platform-native content
 825      3. Score for viral potential
 826      4. Deduplicate
 827      5. Generate calendar
 828      6. Save outputs
 829      """
 830      episode_slug = slugify(f"{episode_meta['date']}-{episode_meta['title']}")[:80]
 831      episode_dir = output_dir / "episodes" / episode_slug
 832      episode_dir.mkdir(parents=True, exist_ok=True)
 833  
 834      print(f"\n{'='*60}")
 835      print(f"Processing: {episode_meta['title']}")
 836      print(f"{'='*60}")
 837  
 838      # Save transcript
 839      transcript_path = episode_dir / "transcript.txt"
 840      transcript_path.write_text(transcript["text"], encoding="utf-8")
 841  
 842      # Step 1: Extract content atoms
 843      atoms = extract_content_atoms(transcript, episode_meta)
 844      atoms_path = episode_dir / "atoms.json"
 845      with open(atoms_path, "w") as f:
 846          json.dump(atoms, f, indent=2)
 847  
 848      # Step 2: Generate content pieces
 849      pieces = generate_content_pieces(atoms, episode_meta)
 850  
 851      # Step 3: Score
 852      pieces = score_content_pieces(pieces, atoms)
 853  
 854      # Step 4: Filter by minimum score
 855      if min_score > 0:
 856          before = len(pieces)
 857          pieces = [p for p in pieces if p.get("viral_score", 0) >= min_score]
 858          print(f"  Filtered: {before} → {len(pieces)} pieces (min score: {min_score})")
 859  
 860      # Step 5: Dedup
 861      history = load_content_history(output_dir, dedup_days)
 862      pieces = deduplicate(pieces, history)
 863  
 864      # Step 6: Generate calendar
 865      calendar = generate_calendar(pieces, episode_meta)
 866  
 867      # Save outputs
 868      pieces_path = episode_dir / "content_pieces.json"
 869      with open(pieces_path, "w") as f:
 870          json.dump(pieces, f, indent=2)
 871  
 872      calendar_path = episode_dir / "calendar.json"
 873      with open(calendar_path, "w") as f:
 874          json.dump(calendar, f, indent=2)
 875  
 876      # Save to dedup history
 877      save_to_history(pieces, output_dir, episode_meta)
 878  
 879      # Log the run
 880      log_run(output_dir, episode_meta, len(atoms), len(pieces), calendar)
 881  
 882      print(f"\n✅ Done: {len(pieces)} content pieces generated")
 883      print(f"   Output: {episode_dir}")
 884      print(f"   Avg viral score: {calendar['avg_viral_score']:.1f}")
 885      print(f"   Coverage: {calendar['coverage']}")
 886  
 887      return calendar
 888  
 889  
 890  def log_run(output_dir: Path, episode_meta: dict, num_atoms: int, num_pieces: int, calendar: dict):
 891      """Append a run log entry."""
 892      log_path = output_dir / "pipeline_log.json"
 893      log = []
 894      if log_path.exists():
 895          with open(log_path) as f:
 896              log = json.load(f)
 897  
 898      log.append({
 899          "timestamp": datetime.now().isoformat(),
 900          "episode": episode_meta.get("title", ""),
 901          "episode_date": episode_meta.get("date", ""),
 902          "atoms_extracted": num_atoms,
 903          "pieces_generated": num_pieces,
 904          "avg_viral_score": calendar.get("avg_viral_score", 0),
 905          "coverage": calendar.get("coverage", {}),
 906      })
 907  
 908      with open(log_path, "w") as f:
 909          json.dump(log, f, indent=2)
 910  
 911  
 912  # ---------------------------------------------------------------------------
 913  # CLI
 914  # ---------------------------------------------------------------------------
 915  
 916  
 917  def build_parser() -> argparse.ArgumentParser:
 918      parser = argparse.ArgumentParser(
 919          description="Podcast-to-Everything Pipeline: Turn podcast episodes into a full content calendar.",
 920          formatter_class=argparse.RawDescriptionHelpFormatter,
 921          epilog="""
 922  Examples:
 923    %(prog)s --rss "https://feeds.example.com/podcast.xml"
 924    %(prog)s --transcript episode-42.txt
 925    %(prog)s --batch "https://feeds.example.com/podcast.xml" --episodes 5
 926    %(prog)s --calendar
 927    %(prog)s --rss "https://feed.url" --min-score 80 --dedup-days 60
 928          """,
 929      )
 930  
 931      # Input modes (mutually exclusive group)
 932      input_group = parser.add_mutually_exclusive_group(required=True)
 933      input_group.add_argument(
 934          "--rss", metavar="URL", help="Process the latest episode from an RSS feed"
 935      )
 936      input_group.add_argument(
 937          "--transcript", metavar="FILE", help="Process a local transcript file (txt, srt, vtt)"
 938      )
 939      input_group.add_argument(
 940          "--batch", metavar="URL", help="Batch process multiple episodes from an RSS feed"
 941      )
 942      input_group.add_argument(
 943          "--calendar", action="store_true",
 944          help="Generate a weekly calendar from existing episode outputs"
 945      )
 946  
 947      # Options
 948      parser.add_argument(
 949          "--episodes", type=int, default=5,
 950          help="Number of episodes to process in batch mode (default: 5)"
 951      )
 952      parser.add_argument(
 953          "--dedup-days", type=int, default=DEFAULT_DEDUP_DAYS,
 954          help=f"Days of history to check for dedup (default: {DEFAULT_DEDUP_DAYS})"
 955      )
 956      parser.add_argument(
 957          "--min-score", type=int, default=0,
 958          help="Minimum viral score to include in output (default: 0, include all)"
 959      )
 960      parser.add_argument(
 961          "--output-dir", type=str, default=str(DEFAULT_OUTPUT_DIR),
 962          help=f"Output directory (default: {DEFAULT_OUTPUT_DIR})"
 963      )
 964  
 965      return parser
 966  
 967  
 968  def main():
 969      parser = build_parser()
 970      args = parser.parse_args()
 971  
 972      output_dir = Path(args.output_dir)
 973      output_dir.mkdir(parents=True, exist_ok=True)
 974  
 975      # --calendar mode: aggregate from existing outputs
 976      if args.calendar:
 977          print("Generating weekly calendar from existing outputs...")
 978          calendar = generate_calendar_from_outputs(output_dir)
 979          calendar_dir = output_dir / "calendar"
 980          calendar_dir.mkdir(parents=True, exist_ok=True)
 981  
 982          week_str = datetime.now().strftime("%Y-W%W")
 983          cal_path = calendar_dir / f"week-{week_str}.json"
 984          with open(cal_path, "w") as f:
 985              json.dump(calendar, f, indent=2)
 986  
 987          print(f"\n✅ Weekly calendar generated: {cal_path}")
 988          print(f"   Total pieces: {calendar['total_pieces']}")
 989          print(f"   Avg viral score: {calendar['avg_viral_score']:.1f}")
 990          print(f"   Coverage: {calendar['coverage']}")
 991          return
 992  
 993      # --transcript mode: read local file
 994      if args.transcript:
 995          transcript = read_transcript(args.transcript)
 996          episode_meta = {
 997              "title": Path(args.transcript).stem,
 998              "date": datetime.now().strftime("%Y-%m-%d"),
 999              "description": "",
1000          }
1001          process_episode(transcript, episode_meta, output_dir, args.dedup_days, args.min_score)
1002          return
1003  
1004      # --rss mode: fetch latest episode
1005      if args.rss:
1006          episodes = fetch_rss_episodes(args.rss, num_episodes=1)
1007          if not episodes:
1008              print("ERROR: No episodes found in feed.", file=sys.stderr)
1009              sys.exit(1)
1010  
1011          ep = episodes[0]
1012          if not ep["audio_url"]:
1013              print("ERROR: No audio URL found for the latest episode.", file=sys.stderr)
1014              sys.exit(1)
1015  
1016          audio_path = download_audio(ep["audio_url"])
1017          transcript = transcribe_audio(audio_path)
1018  
1019          # Clean up temp audio file
1020          try:
1021              os.remove(audio_path)
1022          except OSError:
1023              pass
1024  
1025          process_episode(transcript, ep, output_dir, args.dedup_days, args.min_score)
1026          return
1027  
1028      # --batch mode: process multiple episodes
1029      if args.batch:
1030          episodes = fetch_rss_episodes(args.batch, num_episodes=args.episodes)
1031          if not episodes:
1032              print("ERROR: No episodes found in feed.", file=sys.stderr)
1033              sys.exit(1)
1034  
1035          print(f"\nBatch mode: processing {len(episodes)} episodes\n")
1036          for i, ep in enumerate(episodes, 1):
1037              print(f"\n--- Episode {i}/{len(episodes)} ---")
1038  
1039              if not ep["audio_url"]:
1040                  print(f"  SKIP: No audio URL for '{ep['title']}'")
1041                  continue
1042  
1043              audio_path = download_audio(ep["audio_url"])
1044              transcript = transcribe_audio(audio_path)
1045  
1046              try:
1047                  os.remove(audio_path)
1048              except OSError:
1049                  pass
1050  
1051              process_episode(transcript, ep, output_dir, args.dedup_days, args.min_score)
1052  
1053          # Generate combined calendar
1054          print("\n\nGenerating combined calendar for all episodes...")
1055          calendar = generate_calendar_from_outputs(output_dir)
1056          calendar_dir = output_dir / "calendar"
1057          calendar_dir.mkdir(parents=True, exist_ok=True)
1058  
1059          week_str = datetime.now().strftime("%Y-W%W")
1060          cal_path = calendar_dir / f"week-{week_str}.json"
1061          with open(cal_path, "w") as f:
1062              json.dump(calendar, f, indent=2)
1063  
1064          print(f"\n✅ Batch complete. Combined calendar: {cal_path}")
1065          return
1066  
1067  
1068  if __name__ == "__main__":
1069      main()