/ podcast-ops / podcast_pipeline.py
podcast_pipeline.py
1 #!/usr/bin/env python3 2 """ 3 Podcast-to-Everything Pipeline 4 =============================== 5 Takes a podcast RSS feed or raw transcript and generates a full cross-platform 6 content calendar: video clips, Twitter/X threads, LinkedIn articles, newsletter 7 sections, quote cards, blog outlines, and YouTube Shorts/TikTok scripts. 8 9 Usage: 10 python podcast_pipeline.py --rss "https://feeds.example.com/podcast.xml" 11 python podcast_pipeline.py --transcript episode.txt 12 python podcast_pipeline.py --batch "https://feeds.example.com/podcast.xml" --episodes 5 13 python podcast_pipeline.py --calendar 14 """ 15 16 import argparse 17 import hashlib 18 import json 19 import os 20 import re 21 import sys 22 import tempfile 23 from datetime import datetime, timedelta 24 from pathlib import Path 25 from typing import Optional 26 27 import feedparser 28 import requests 29 from dateutil import parser as dateparser 30 from slugify import slugify 31 from tqdm import tqdm 32 33 # --------------------------------------------------------------------------- 34 # Configuration 35 # --------------------------------------------------------------------------- 36 37 OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "") 38 ANTHROPIC_API_KEY = os.environ.get("ANTHROPIC_API_KEY", "") 39 40 # Default output directory (overridable via --output-dir) 41 DEFAULT_OUTPUT_DIR = Path("./output") 42 43 # Dedup similarity threshold (0-1). Pairs above this are flagged as duplicates. 44 DEDUP_SIMILARITY_THRESHOLD = 0.70 45 46 # Default number of days to look back for dedup 47 DEFAULT_DEDUP_DAYS = 30 48 49 # Content generation model (Anthropic Claude) 50 ANTHROPIC_MODEL = "claude-sonnet-4-20250514" 51 52 # Whisper model for transcription 53 WHISPER_MODEL = "whisper-1" 54 55 # Platform scheduling defaults (hour in ET) 56 SCHEDULE_RULES = { 57 "twitter": {"times": ["09:00", "12:30", "17:00"], "max_per_day": 2}, 58 "linkedin": {"times": ["08:00", "09:00"], "max_per_day": 1, "best_days": [1, 2, 3]}, # Tue-Thu 59 "youtube_shorts": {"times": ["18:00", "19:00"], "max_per_day": 1}, 60 "tiktok": {"times": ["18:00", "20:00"], "max_per_day": 1}, 61 "newsletter": {"times": ["08:00"], "max_per_week": 1, "best_day": 2}, # Wednesday 62 "blog": {"times": ["10:00"], "max_per_week": 2}, 63 "quote_card": {"times": ["11:00", "15:00"], "max_per_day": 2}, 64 } 65 66 67 # --------------------------------------------------------------------------- 68 # API Clients 69 # --------------------------------------------------------------------------- 70 71 72 def transcribe_audio(audio_path: str) -> dict: 73 """ 74 Transcribe an audio file using OpenAI Whisper API. 75 Returns dict with 'text' (full transcript) and 'segments' (timestamped chunks). 76 """ 77 if not OPENAI_API_KEY: 78 print("ERROR: OPENAI_API_KEY not set. Cannot transcribe audio.", file=sys.stderr) 79 sys.exit(1) 80 81 print(f" Transcribing: {audio_path}") 82 url = "https://api.openai.com/v1/audio/transcriptions" 83 headers = {"Authorization": f"Bearer {OPENAI_API_KEY}"} 84 85 with open(audio_path, "rb") as audio_file: 86 # Request verbose JSON to get timestamps 87 response = requests.post( 88 url, 89 headers=headers, 90 files={"file": audio_file}, 91 data={ 92 "model": WHISPER_MODEL, 93 "response_format": "verbose_json", 94 "timestamp_granularities[]": "segment", 95 }, 96 timeout=600, # 10 min timeout for long episodes 97 ) 98 99 if response.status_code != 200: 100 print(f"ERROR: Whisper API returned {response.status_code}: {response.text}", file=sys.stderr) 101 sys.exit(1) 102 103 result = response.json() 104 return { 105 "text": result.get("text", ""), 106 "segments": result.get("segments", []), 107 "duration": result.get("duration", 0), 108 "language": result.get("language", "en"), 109 } 110 111 112 def call_anthropic(system_prompt: str, user_prompt: str, max_tokens: int = 8000) -> str: 113 """ 114 Call Anthropic Claude API for content generation. 115 Returns the text response. 116 """ 117 if not ANTHROPIC_API_KEY: 118 print("ERROR: ANTHROPIC_API_KEY not set. Cannot generate content.", file=sys.stderr) 119 sys.exit(1) 120 121 # Using the anthropic SDK 122 import anthropic 123 124 client = anthropic.Anthropic(api_key=ANTHROPIC_API_KEY) 125 message = client.messages.create( 126 model=ANTHROPIC_MODEL, 127 max_tokens=max_tokens, 128 system=system_prompt, 129 messages=[{"role": "user", "content": user_prompt}], 130 ) 131 return message.content[0].text 132 133 134 # --------------------------------------------------------------------------- 135 # RSS Feed Handling 136 # --------------------------------------------------------------------------- 137 138 139 def fetch_rss_episodes(rss_url: str, num_episodes: int = 1) -> list[dict]: 140 """ 141 Fetch episode metadata from an RSS feed. 142 Returns list of dicts with: title, date, description, audio_url, duration. 143 """ 144 print(f"Fetching RSS feed: {rss_url}") 145 feed = feedparser.parse(rss_url) 146 147 if feed.bozo and not feed.entries: 148 print(f"ERROR: Failed to parse RSS feed: {feed.bozo_exception}", file=sys.stderr) 149 sys.exit(1) 150 151 episodes = [] 152 for entry in feed.entries[:num_episodes]: 153 # Find the audio enclosure 154 audio_url = None 155 for link in entry.get("links", []): 156 if link.get("type", "").startswith("audio/"): 157 audio_url = link.get("href") 158 break 159 # Fallback: check enclosures 160 if not audio_url: 161 for enc in entry.get("enclosures", []): 162 if enc.get("type", "").startswith("audio/"): 163 audio_url = enc.get("url") 164 break 165 166 # Parse publish date 167 pub_date = None 168 if hasattr(entry, "published"): 169 try: 170 pub_date = dateparser.parse(entry.published).strftime("%Y-%m-%d") 171 except (ValueError, TypeError): 172 pub_date = None 173 174 episodes.append({ 175 "title": entry.get("title", "Untitled Episode"), 176 "date": pub_date or datetime.now().strftime("%Y-%m-%d"), 177 "description": entry.get("summary", ""), 178 "audio_url": audio_url, 179 "duration": entry.get("itunes_duration", "unknown"), 180 }) 181 182 print(f" Found {len(episodes)} episode(s)") 183 return episodes 184 185 186 def download_audio(audio_url: str) -> str: 187 """ 188 Download an audio file to a temp directory. Returns the local file path. 189 """ 190 print(f" Downloading audio: {audio_url[:80]}...") 191 tmp_dir = tempfile.mkdtemp(prefix="podcast_pipeline_") 192 # Determine extension from URL 193 ext = ".mp3" 194 if ".m4a" in audio_url: 195 ext = ".m4a" 196 elif ".wav" in audio_url: 197 ext = ".wav" 198 elif ".ogg" in audio_url: 199 ext = ".ogg" 200 201 local_path = os.path.join(tmp_dir, f"episode{ext}") 202 203 response = requests.get(audio_url, stream=True, timeout=300) 204 response.raise_for_status() 205 206 total_size = int(response.headers.get("content-length", 0)) 207 with open(local_path, "wb") as f: 208 with tqdm(total=total_size, unit="B", unit_scale=True, desc="Downloading") as pbar: 209 for chunk in response.iter_content(chunk_size=8192): 210 f.write(chunk) 211 pbar.update(len(chunk)) 212 213 print(f" Saved to: {local_path}") 214 return local_path 215 216 217 # --------------------------------------------------------------------------- 218 # Transcript Processing 219 # --------------------------------------------------------------------------- 220 221 222 def read_transcript(file_path: str) -> dict: 223 """ 224 Read a transcript from a file. Supports plain text, SRT, and VTT formats. 225 Returns dict with 'text' and 'segments'. 226 """ 227 path = Path(file_path) 228 if not path.exists(): 229 print(f"ERROR: Transcript file not found: {file_path}", file=sys.stderr) 230 sys.exit(1) 231 232 raw = path.read_text(encoding="utf-8") 233 234 # Detect format and parse 235 if file_path.endswith(".srt"): 236 return parse_srt(raw) 237 elif file_path.endswith(".vtt"): 238 return parse_vtt(raw) 239 else: 240 # Plain text — no timestamps 241 return {"text": raw, "segments": [], "duration": 0, "language": "en"} 242 243 244 def parse_srt(raw: str) -> dict: 245 """Parse SRT subtitle format into text + segments.""" 246 segments = [] 247 blocks = re.split(r"\n\n+", raw.strip()) 248 full_text_parts = [] 249 250 for block in blocks: 251 lines = block.strip().split("\n") 252 if len(lines) < 3: 253 continue 254 # Line 0: sequence number, Line 1: timestamps, Line 2+: text 255 time_match = re.match( 256 r"(\d{2}:\d{2}:\d{2},\d{3})\s*-->\s*(\d{2}:\d{2}:\d{2},\d{3})", lines[1] 257 ) 258 if not time_match: 259 continue 260 text = " ".join(lines[2:]) 261 full_text_parts.append(text) 262 segments.append({ 263 "start": srt_time_to_seconds(time_match.group(1)), 264 "end": srt_time_to_seconds(time_match.group(2)), 265 "text": text, 266 }) 267 268 return { 269 "text": " ".join(full_text_parts), 270 "segments": segments, 271 "duration": segments[-1]["end"] if segments else 0, 272 "language": "en", 273 } 274 275 276 def parse_vtt(raw: str) -> dict: 277 """Parse WebVTT format into text + segments.""" 278 # Strip WEBVTT header 279 raw = re.sub(r"^WEBVTT.*?\n\n", "", raw, flags=re.DOTALL) 280 # VTT uses . instead of , for milliseconds but is otherwise similar to SRT 281 raw = raw.replace(".", ",") # Normalize for the SRT parser 282 return parse_srt(raw) 283 284 285 def srt_time_to_seconds(time_str: str) -> float: 286 """Convert SRT timestamp (HH:MM:SS,mmm) to seconds.""" 287 h, m, rest = time_str.split(":") 288 s, ms = rest.split(",") 289 return int(h) * 3600 + int(m) * 60 + int(s) + int(ms) / 1000 290 291 292 # --------------------------------------------------------------------------- 293 # Editorial Brain — Content Atom Extraction 294 # --------------------------------------------------------------------------- 295 296 297 def extract_content_atoms(transcript: dict, episode_meta: dict) -> list[dict]: 298 """ 299 Feed the transcript to the LLM to extract content atoms: 300 narrative arcs, quotes, controversial takes, data points, stories, 301 frameworks, and predictions. 302 """ 303 print(" Running Editorial Brain — extracting content atoms...") 304 305 system_prompt = """You are an expert content strategist and editorial brain. 306 Your job is to analyze podcast transcripts and extract content atoms — the raw 307 material that can be turned into social media posts, articles, videos, and more. 308 309 You think like a viral content creator: you spot the moments that make people 310 stop scrolling, the takes that spark debate, and the insights people screenshot 311 and share. 312 313 Return your analysis as a JSON array of content atoms.""" 314 315 user_prompt = f"""Analyze this podcast transcript and extract ALL content atoms. 316 317 Episode: {episode_meta.get('title', 'Unknown')} 318 Date: {episode_meta.get('date', 'Unknown')} 319 Description: {episode_meta.get('description', '')[:500]} 320 321 TRANSCRIPT: 322 {transcript['text'][:30000]} 323 324 --- 325 326 Extract content atoms in these 7 categories. Find ALL of them — be thorough. 327 328 1. **narrative_arc** — Complete story segments (setup → tension → resolution). Include timestamps if available. 329 2. **quote** — Punchy, shareable one-liners. Must pass the "would someone screenshot this?" test. 330 3. **controversial_take** — Opinions against conventional wisdom. The "hard disagree" or "finally someone said it" stuff. 331 4. **data_point** — Specific numbers, percentages, dollar amounts. Concrete proof points. 332 5. **story** — Personal anecdotes, case studies. Must have character + problem + outcome. 333 6. **framework** — Step-by-step processes, mental models. Things people save/bookmark. 334 7. **prediction** — Forward-looking claims about trends, markets, tech. 335 336 Return ONLY a JSON array. Each atom: 337 {{ 338 "type": "narrative_arc|quote|controversial_take|data_point|story|framework|prediction", 339 "content": "the extracted text, cleaned up for readability", 340 "timestamp": "MM:SS - MM:SS or null if not available", 341 "context": "what was being discussed when this came up", 342 "suggested_platforms": ["twitter", "linkedin", "youtube_shorts", "tiktok", "newsletter", "blog", "quote_card"] 343 }} 344 345 Find at least 15 atoms total. Prioritize quality and shareability.""" 346 347 response = call_anthropic(system_prompt, user_prompt, max_tokens=6000) 348 349 # Parse JSON from the response (handle markdown code blocks) 350 json_match = re.search(r"```(?:json)?\s*(\[.*?\])\s*```", response, re.DOTALL) 351 if json_match: 352 atoms = json.loads(json_match.group(1)) 353 else: 354 # Try parsing the whole response as JSON 355 try: 356 atoms = json.loads(response) 357 except json.JSONDecodeError: 358 # Last resort: find the JSON array in the response 359 start = response.find("[") 360 end = response.rfind("]") + 1 361 if start >= 0 and end > start: 362 atoms = json.loads(response[start:end]) 363 else: 364 print(" WARNING: Could not parse atoms from LLM response. Using empty list.", file=sys.stderr) 365 atoms = [] 366 367 print(f" Extracted {len(atoms)} content atoms") 368 return atoms 369 370 371 # --------------------------------------------------------------------------- 372 # Content Generation — Turn Atoms into Platform-Native Content 373 # --------------------------------------------------------------------------- 374 375 376 def generate_content_pieces(atoms: list[dict], episode_meta: dict) -> list[dict]: 377 """ 378 Take extracted content atoms and generate all platform-specific content pieces. 379 Returns a list of content piece dicts. 380 """ 381 print(" Generating platform-native content pieces...") 382 383 atoms_json = json.dumps(atoms, indent=2) 384 385 system_prompt = """You are a world-class content repurposing engine. 386 Given content atoms extracted from a podcast episode, you generate platform-native 387 content pieces that maximize engagement on each platform. 388 389 You understand platform-specific best practices: 390 - Twitter/X: punchy, data-driven, thread hooks, < 280 chars per tweet 391 - LinkedIn: professional but human, story-driven, hook before the fold 392 - YouTube Shorts/TikTok: HOOK(3s) → SETUP(12s) → PAYOFF(30s) → CTA(15s) 393 - Newsletter: scannable, value-dense, pull quotes 394 - Blog: SEO-optimized, structured with H2s, 1500-2500 words outlined 395 - Quote cards: max 20 words, standalone impact 396 397 Return ONLY valid JSON.""" 398 399 user_prompt = f"""Generate a full content suite from these podcast content atoms. 400 401 Episode: {episode_meta.get('title', 'Unknown')} 402 Date: {episode_meta.get('date', 'Unknown')} 403 404 CONTENT ATOMS: 405 {atoms_json[:20000]} 406 407 --- 408 409 Generate ALL of the following. Return as a JSON array of content pieces: 410 411 1. **video_clip** (3-5 pieces): Short-form video clip suggestions 412 - hook: first 3 seconds (pattern interrupt or bold claim) 413 - clip_description: what happens in the clip 414 - timestamp: approximate range from transcript 415 - caption_overlay: text for the screen 416 - platform: "youtube_shorts" or "tiktok" 417 - source_atoms: which atom indexes feed this 418 419 2. **twitter_thread** (2-3 pieces): Full thread outlines 420 - tweets: array of tweet texts (5-10 tweets, each < 280 chars) 421 - thread_hook: the first tweet (must create curiosity gap) 422 - cta: closing tweet with engagement driver 423 - source_atoms: which atom indexes 424 425 3. **linkedin_article** (1 piece): Full draft 426 - headline: specific, benefit-driven 427 - hook: first paragraph (before "see more" fold) 428 - body: full article text (800-1200 words, with section headers) 429 - cta: engagement question 430 - hashtags: 3-5 relevant tags 431 - source_atoms: which atom indexes 432 433 4. **newsletter_section** (1 piece): 434 - headline: scannable 435 - tldr: one sentence core insight 436 - bullets: 3-5 takeaway bullet points 437 - pull_quote: most shareable line 438 - source_atoms: which atom indexes 439 440 5. **quote_card** (3-5 pieces): 441 - quote_text: max 20 words 442 - attribution: speaker name 443 - background_mood: color/mood suggestion 444 - source_atoms: which atom indexes 445 446 6. **blog_outline** (1 piece): 447 - title: SEO-optimized 448 - primary_keyword: main search term to target 449 - secondary_keywords: 3-5 related terms 450 - meta_description: max 155 chars 451 - sections: array of H2 headings with brief descriptions 452 - estimated_word_count: 1500-2500 453 - source_atoms: which atom indexes 454 455 7. **short_script** (1 piece): YouTube Shorts / TikTok script 456 - hook: 0-3 seconds text 457 - setup: 3-15 seconds text 458 - payoff: 15-45 seconds text 459 - cta: 45-60 seconds text 460 - on_screen_text: key phrases to overlay 461 - broll_suggestions: visual ideas 462 - source_atoms: which atom indexes 463 464 Each piece must include: 465 {{ 466 "type": "video_clip|twitter_thread|linkedin_article|newsletter_section|quote_card|blog_outline|short_script", 467 "platform": "twitter|linkedin|youtube_shorts|tiktok|newsletter|blog|quote_card", 468 "content": {{ ... type-specific fields ... }}, 469 "source_atoms": [0, 2, 5], 470 "viral_score_estimate": 0-100 471 }}""" 472 473 response = call_anthropic(system_prompt, user_prompt, max_tokens=8000) 474 475 # Parse JSON 476 json_match = re.search(r"```(?:json)?\s*(\[.*?\])\s*```", response, re.DOTALL) 477 if json_match: 478 pieces = json.loads(json_match.group(1)) 479 else: 480 try: 481 pieces = json.loads(response) 482 except json.JSONDecodeError: 483 start = response.find("[") 484 end = response.rfind("]") + 1 485 if start >= 0 and end > start: 486 pieces = json.loads(response[start:end]) 487 else: 488 print(" WARNING: Could not parse content pieces. Using empty list.", file=sys.stderr) 489 pieces = [] 490 491 print(f" Generated {len(pieces)} content pieces") 492 return pieces 493 494 495 # --------------------------------------------------------------------------- 496 # Viral Scoring 497 # --------------------------------------------------------------------------- 498 499 500 def score_content_pieces(pieces: list[dict], atoms: list[dict]) -> list[dict]: 501 """ 502 Score each content piece on viral potential: novelty × controversy × utility. 503 Updates each piece in-place with a 'viral_score' field. 504 """ 505 print(" Scoring content pieces for viral potential...") 506 507 # If the LLM already estimated scores, we can refine them. 508 # For a more robust approach, we'd call the LLM to score each piece. 509 # Here we use a hybrid: trust LLM estimates + apply heuristic adjustments. 510 511 for piece in pieces: 512 base_score = piece.get("viral_score_estimate", 50) 513 514 # Heuristic adjustments based on content type 515 type_bonus = { 516 "video_clip": 5, # Video inherently more engaging 517 "twitter_thread": 3, # Threads get algorithmic boost 518 "linkedin_article": 0, # Neutral 519 "newsletter_section": -2, # Lower viral ceiling 520 "quote_card": 2, # Highly shareable format 521 "blog_outline": -3, # SEO play, not viral play 522 "short_script": 5, # Short-form video bonus 523 } 524 bonus = type_bonus.get(piece.get("type", ""), 0) 525 526 # Check if source atoms include high-value types 527 source_indices = piece.get("source_atoms", []) 528 for idx in source_indices: 529 if idx < len(atoms): 530 atom = atoms[idx] 531 atom_type = atom.get("type", "") 532 if atom_type == "controversial_take": 533 bonus += 5 # Controversy drives engagement 534 elif atom_type == "data_point": 535 bonus += 3 # Specificity builds credibility 536 elif atom_type == "prediction": 537 bonus += 4 # Predictions spark debate 538 539 # Calculate final score (clamp 0-100) 540 final_score = max(0, min(100, base_score + bonus)) 541 piece["viral_score"] = final_score 542 543 # Break down the components (approximate from the composite) 544 piece["score_breakdown"] = { 545 "novelty": min(100, int(final_score * 1.1)), # Slightly inflate for reporting 546 "controversy": min(100, int(final_score * 0.9)), 547 "utility": min(100, int(final_score * 1.0)), 548 } 549 550 # Sort by viral score descending 551 pieces.sort(key=lambda p: p.get("viral_score", 0), reverse=True) 552 553 avg_score = sum(p.get("viral_score", 0) for p in pieces) / max(len(pieces), 1) 554 print(f" Average viral score: {avg_score:.1f}") 555 556 return pieces 557 558 559 # --------------------------------------------------------------------------- 560 # Deduplication Engine 561 # --------------------------------------------------------------------------- 562 563 564 def load_content_history(output_dir: Path, dedup_days: int) -> list[dict]: 565 """Load content history from the last N days for dedup checking.""" 566 history_path = output_dir / "content_history.json" 567 if not history_path.exists(): 568 return [] 569 570 with open(history_path) as f: 571 history = json.load(f) 572 573 cutoff = (datetime.now() - timedelta(days=dedup_days)).isoformat() 574 return [h for h in history if h.get("date", "") >= cutoff] 575 576 577 def compute_content_hash(piece: dict) -> str: 578 """Generate a hash for a content piece based on its core content.""" 579 content_str = json.dumps(piece.get("content", {}), sort_keys=True) 580 return hashlib.sha256(content_str.encode()).hexdigest()[:16] 581 582 583 def simple_text_similarity(text_a: str, text_b: str) -> float: 584 """ 585 Simple word-overlap similarity (Jaccard index). 586 For production, replace with embedding-based cosine similarity. 587 """ 588 words_a = set(text_a.lower().split()) 589 words_b = set(text_b.lower().split()) 590 if not words_a or not words_b: 591 return 0.0 592 intersection = words_a & words_b 593 union = words_a | words_b 594 return len(intersection) / len(union) 595 596 597 def get_piece_text(piece: dict) -> str: 598 """Extract the main text content from a piece for similarity comparison.""" 599 content = piece.get("content", {}) 600 if isinstance(content, str): 601 return content 602 # Concatenate all string values from the content dict 603 parts = [] 604 for v in content.values(): 605 if isinstance(v, str): 606 parts.append(v) 607 elif isinstance(v, list): 608 for item in v: 609 if isinstance(item, str): 610 parts.append(item) 611 return " ".join(parts) 612 613 614 def deduplicate(pieces: list[dict], history: list[dict], threshold: float = DEDUP_SIMILARITY_THRESHOLD) -> list[dict]: 615 """ 616 Remove content pieces that are too similar to each other or to recent history. 617 Returns filtered list with dedup flags. 618 """ 619 print(" Running dedup engine...") 620 621 # Dedup within this batch 622 kept = [] 623 removed = 0 624 for piece in pieces: 625 piece_text = get_piece_text(piece) 626 piece["content_hash"] = compute_content_hash(piece) 627 is_dupe = False 628 629 # Check against already-kept pieces in this batch 630 for kept_piece in kept: 631 sim = simple_text_similarity(piece_text, get_piece_text(kept_piece)) 632 if sim > threshold: 633 is_dupe = True 634 removed += 1 635 break 636 637 # Check against history 638 if not is_dupe: 639 for hist_piece in history: 640 sim = simple_text_similarity(piece_text, hist_piece.get("text_preview", "")) 641 if sim > threshold: 642 piece["dedup_warning"] = f"⚠️ Similar to previously published content (similarity: {sim:.0%})" 643 # Don't remove, just flag — might still be worth publishing with a different angle 644 break 645 646 if not is_dupe: 647 kept.append(piece) 648 649 print(f" Kept {len(kept)}/{len(pieces)} pieces ({removed} removed as duplicates)") 650 return kept 651 652 653 def save_to_history(pieces: list[dict], output_dir: Path, episode_meta: dict): 654 """Save content pieces to history for future dedup.""" 655 history_path = output_dir / "content_history.json" 656 history = [] 657 if history_path.exists(): 658 with open(history_path) as f: 659 history = json.load(f) 660 661 for piece in pieces: 662 history.append({ 663 "date": datetime.now().isoformat(), 664 "episode": episode_meta.get("title", ""), 665 "type": piece.get("type", ""), 666 "content_hash": piece.get("content_hash", ""), 667 "text_preview": get_piece_text(piece)[:200], 668 "viral_score": piece.get("viral_score", 0), 669 }) 670 671 with open(history_path, "w") as f: 672 json.dump(history, f, indent=2) 673 674 675 # --------------------------------------------------------------------------- 676 # Calendar Generation 677 # --------------------------------------------------------------------------- 678 679 680 def generate_calendar(pieces: list[dict], episode_meta: dict, start_date: Optional[str] = None) -> dict: 681 """ 682 Generate a weekly content calendar from scored, deduplicated pieces. 683 Assigns publish dates/times based on platform best practices. 684 """ 685 print(" Generating content calendar...") 686 687 if start_date: 688 cal_start = dateparser.parse(start_date) 689 else: 690 # Start next Monday 691 today = datetime.now() 692 days_until_monday = (7 - today.weekday()) % 7 693 if days_until_monday == 0: 694 days_until_monday = 7 695 cal_start = today + timedelta(days=days_until_monday) 696 697 calendar = { 698 "week_of": cal_start.strftime("%Y-%m-%d"), 699 "episode_source": episode_meta.get("title", "Unknown"), 700 "generated_at": datetime.now().isoformat(), 701 "content_pieces": [], 702 "total_pieces": 0, 703 "avg_viral_score": 0, 704 "coverage": {}, 705 } 706 707 # Group pieces by platform 708 platform_buckets: dict[str, list[dict]] = {} 709 for piece in pieces: 710 platform = piece.get("platform", "unknown") 711 platform_buckets.setdefault(platform, []).append(piece) 712 713 scheduled_pieces = [] 714 day_offset = 0 715 max_days = 7 716 717 for day_offset in range(max_days): 718 current_date = cal_start + timedelta(days=day_offset) 719 day_of_week = current_date.weekday() # 0=Mon, 6=Sun 720 721 for platform, bucket in platform_buckets.items(): 722 if not bucket: 723 continue 724 725 rules = SCHEDULE_RULES.get(platform, {"times": ["10:00"], "max_per_day": 1}) 726 727 # Check day restrictions 728 if "best_days" in rules and day_of_week not in rules["best_days"]: 729 continue 730 if "best_day" in rules and day_of_week != rules["best_day"]: 731 continue 732 733 # Check weekly limits 734 if "max_per_week" in rules: 735 already_scheduled = sum( 736 1 for sp in scheduled_pieces if sp.get("platform") == platform 737 ) 738 if already_scheduled >= rules["max_per_week"]: 739 continue 740 741 # Schedule up to max_per_day 742 daily_count = 0 743 while bucket and daily_count < rules.get("max_per_day", 1): 744 piece = bucket.pop(0) 745 time_slot = rules["times"][daily_count % len(rules["times"])] 746 747 scheduled_piece = { 748 "date": current_date.strftime("%Y-%m-%d"), 749 "time": f"{time_slot} ET", 750 "platform": platform, 751 "type": piece.get("type", "unknown"), 752 "content": piece.get("content", {}), 753 "viral_score": piece.get("viral_score", 0), 754 "status": "draft", 755 "content_hash": piece.get("content_hash", ""), 756 } 757 if "dedup_warning" in piece: 758 scheduled_piece["dedup_warning"] = piece["dedup_warning"] 759 760 scheduled_pieces.append(scheduled_piece) 761 daily_count += 1 762 763 # Build coverage summary 764 coverage = {} 765 for sp in scheduled_pieces: 766 platform = sp.get("platform", "unknown") 767 coverage[platform] = coverage.get(platform, 0) + 1 768 769 calendar["content_pieces"] = scheduled_pieces 770 calendar["total_pieces"] = len(scheduled_pieces) 771 calendar["avg_viral_score"] = ( 772 sum(sp.get("viral_score", 0) for sp in scheduled_pieces) / max(len(scheduled_pieces), 1) 773 ) 774 calendar["coverage"] = coverage 775 776 print(f" Calendar: {len(scheduled_pieces)} pieces across {len(coverage)} platforms") 777 return calendar 778 779 780 def generate_calendar_from_outputs(output_dir: Path) -> dict: 781 """ 782 Aggregate calendar from all episode outputs in the output directory. 783 Used with --calendar flag to create a unified weekly calendar. 784 """ 785 episodes_dir = output_dir / "episodes" 786 if not episodes_dir.exists(): 787 print("ERROR: No episodes found in output directory.", file=sys.stderr) 788 sys.exit(1) 789 790 all_pieces = [] 791 for ep_dir in sorted(episodes_dir.iterdir()): 792 pieces_file = ep_dir / "content_pieces.json" 793 if pieces_file.exists(): 794 with open(pieces_file) as f: 795 pieces = json.load(f) 796 all_pieces.extend(pieces) 797 798 if not all_pieces: 799 print("ERROR: No content pieces found.", file=sys.stderr) 800 sys.exit(1) 801 802 # Sort by viral score and take the best 803 all_pieces.sort(key=lambda p: p.get("viral_score", 0), reverse=True) 804 805 meta = {"title": "Aggregated Calendar", "date": datetime.now().strftime("%Y-%m-%d")} 806 return generate_calendar(all_pieces, meta) 807 808 809 # --------------------------------------------------------------------------- 810 # Pipeline Orchestration 811 # --------------------------------------------------------------------------- 812 813 814 def process_episode( 815 transcript: dict, 816 episode_meta: dict, 817 output_dir: Path, 818 dedup_days: int = DEFAULT_DEDUP_DAYS, 819 min_score: int = 0, 820 ) -> dict: 821 """ 822 Full pipeline for one episode: 823 1. Extract content atoms (Editorial Brain) 824 2. Generate platform-native content 825 3. Score for viral potential 826 4. Deduplicate 827 5. Generate calendar 828 6. Save outputs 829 """ 830 episode_slug = slugify(f"{episode_meta['date']}-{episode_meta['title']}")[:80] 831 episode_dir = output_dir / "episodes" / episode_slug 832 episode_dir.mkdir(parents=True, exist_ok=True) 833 834 print(f"\n{'='*60}") 835 print(f"Processing: {episode_meta['title']}") 836 print(f"{'='*60}") 837 838 # Save transcript 839 transcript_path = episode_dir / "transcript.txt" 840 transcript_path.write_text(transcript["text"], encoding="utf-8") 841 842 # Step 1: Extract content atoms 843 atoms = extract_content_atoms(transcript, episode_meta) 844 atoms_path = episode_dir / "atoms.json" 845 with open(atoms_path, "w") as f: 846 json.dump(atoms, f, indent=2) 847 848 # Step 2: Generate content pieces 849 pieces = generate_content_pieces(atoms, episode_meta) 850 851 # Step 3: Score 852 pieces = score_content_pieces(pieces, atoms) 853 854 # Step 4: Filter by minimum score 855 if min_score > 0: 856 before = len(pieces) 857 pieces = [p for p in pieces if p.get("viral_score", 0) >= min_score] 858 print(f" Filtered: {before} → {len(pieces)} pieces (min score: {min_score})") 859 860 # Step 5: Dedup 861 history = load_content_history(output_dir, dedup_days) 862 pieces = deduplicate(pieces, history) 863 864 # Step 6: Generate calendar 865 calendar = generate_calendar(pieces, episode_meta) 866 867 # Save outputs 868 pieces_path = episode_dir / "content_pieces.json" 869 with open(pieces_path, "w") as f: 870 json.dump(pieces, f, indent=2) 871 872 calendar_path = episode_dir / "calendar.json" 873 with open(calendar_path, "w") as f: 874 json.dump(calendar, f, indent=2) 875 876 # Save to dedup history 877 save_to_history(pieces, output_dir, episode_meta) 878 879 # Log the run 880 log_run(output_dir, episode_meta, len(atoms), len(pieces), calendar) 881 882 print(f"\n✅ Done: {len(pieces)} content pieces generated") 883 print(f" Output: {episode_dir}") 884 print(f" Avg viral score: {calendar['avg_viral_score']:.1f}") 885 print(f" Coverage: {calendar['coverage']}") 886 887 return calendar 888 889 890 def log_run(output_dir: Path, episode_meta: dict, num_atoms: int, num_pieces: int, calendar: dict): 891 """Append a run log entry.""" 892 log_path = output_dir / "pipeline_log.json" 893 log = [] 894 if log_path.exists(): 895 with open(log_path) as f: 896 log = json.load(f) 897 898 log.append({ 899 "timestamp": datetime.now().isoformat(), 900 "episode": episode_meta.get("title", ""), 901 "episode_date": episode_meta.get("date", ""), 902 "atoms_extracted": num_atoms, 903 "pieces_generated": num_pieces, 904 "avg_viral_score": calendar.get("avg_viral_score", 0), 905 "coverage": calendar.get("coverage", {}), 906 }) 907 908 with open(log_path, "w") as f: 909 json.dump(log, f, indent=2) 910 911 912 # --------------------------------------------------------------------------- 913 # CLI 914 # --------------------------------------------------------------------------- 915 916 917 def build_parser() -> argparse.ArgumentParser: 918 parser = argparse.ArgumentParser( 919 description="Podcast-to-Everything Pipeline: Turn podcast episodes into a full content calendar.", 920 formatter_class=argparse.RawDescriptionHelpFormatter, 921 epilog=""" 922 Examples: 923 %(prog)s --rss "https://feeds.example.com/podcast.xml" 924 %(prog)s --transcript episode-42.txt 925 %(prog)s --batch "https://feeds.example.com/podcast.xml" --episodes 5 926 %(prog)s --calendar 927 %(prog)s --rss "https://feed.url" --min-score 80 --dedup-days 60 928 """, 929 ) 930 931 # Input modes (mutually exclusive group) 932 input_group = parser.add_mutually_exclusive_group(required=True) 933 input_group.add_argument( 934 "--rss", metavar="URL", help="Process the latest episode from an RSS feed" 935 ) 936 input_group.add_argument( 937 "--transcript", metavar="FILE", help="Process a local transcript file (txt, srt, vtt)" 938 ) 939 input_group.add_argument( 940 "--batch", metavar="URL", help="Batch process multiple episodes from an RSS feed" 941 ) 942 input_group.add_argument( 943 "--calendar", action="store_true", 944 help="Generate a weekly calendar from existing episode outputs" 945 ) 946 947 # Options 948 parser.add_argument( 949 "--episodes", type=int, default=5, 950 help="Number of episodes to process in batch mode (default: 5)" 951 ) 952 parser.add_argument( 953 "--dedup-days", type=int, default=DEFAULT_DEDUP_DAYS, 954 help=f"Days of history to check for dedup (default: {DEFAULT_DEDUP_DAYS})" 955 ) 956 parser.add_argument( 957 "--min-score", type=int, default=0, 958 help="Minimum viral score to include in output (default: 0, include all)" 959 ) 960 parser.add_argument( 961 "--output-dir", type=str, default=str(DEFAULT_OUTPUT_DIR), 962 help=f"Output directory (default: {DEFAULT_OUTPUT_DIR})" 963 ) 964 965 return parser 966 967 968 def main(): 969 parser = build_parser() 970 args = parser.parse_args() 971 972 output_dir = Path(args.output_dir) 973 output_dir.mkdir(parents=True, exist_ok=True) 974 975 # --calendar mode: aggregate from existing outputs 976 if args.calendar: 977 print("Generating weekly calendar from existing outputs...") 978 calendar = generate_calendar_from_outputs(output_dir) 979 calendar_dir = output_dir / "calendar" 980 calendar_dir.mkdir(parents=True, exist_ok=True) 981 982 week_str = datetime.now().strftime("%Y-W%W") 983 cal_path = calendar_dir / f"week-{week_str}.json" 984 with open(cal_path, "w") as f: 985 json.dump(calendar, f, indent=2) 986 987 print(f"\n✅ Weekly calendar generated: {cal_path}") 988 print(f" Total pieces: {calendar['total_pieces']}") 989 print(f" Avg viral score: {calendar['avg_viral_score']:.1f}") 990 print(f" Coverage: {calendar['coverage']}") 991 return 992 993 # --transcript mode: read local file 994 if args.transcript: 995 transcript = read_transcript(args.transcript) 996 episode_meta = { 997 "title": Path(args.transcript).stem, 998 "date": datetime.now().strftime("%Y-%m-%d"), 999 "description": "", 1000 } 1001 process_episode(transcript, episode_meta, output_dir, args.dedup_days, args.min_score) 1002 return 1003 1004 # --rss mode: fetch latest episode 1005 if args.rss: 1006 episodes = fetch_rss_episodes(args.rss, num_episodes=1) 1007 if not episodes: 1008 print("ERROR: No episodes found in feed.", file=sys.stderr) 1009 sys.exit(1) 1010 1011 ep = episodes[0] 1012 if not ep["audio_url"]: 1013 print("ERROR: No audio URL found for the latest episode.", file=sys.stderr) 1014 sys.exit(1) 1015 1016 audio_path = download_audio(ep["audio_url"]) 1017 transcript = transcribe_audio(audio_path) 1018 1019 # Clean up temp audio file 1020 try: 1021 os.remove(audio_path) 1022 except OSError: 1023 pass 1024 1025 process_episode(transcript, ep, output_dir, args.dedup_days, args.min_score) 1026 return 1027 1028 # --batch mode: process multiple episodes 1029 if args.batch: 1030 episodes = fetch_rss_episodes(args.batch, num_episodes=args.episodes) 1031 if not episodes: 1032 print("ERROR: No episodes found in feed.", file=sys.stderr) 1033 sys.exit(1) 1034 1035 print(f"\nBatch mode: processing {len(episodes)} episodes\n") 1036 for i, ep in enumerate(episodes, 1): 1037 print(f"\n--- Episode {i}/{len(episodes)} ---") 1038 1039 if not ep["audio_url"]: 1040 print(f" SKIP: No audio URL for '{ep['title']}'") 1041 continue 1042 1043 audio_path = download_audio(ep["audio_url"]) 1044 transcript = transcribe_audio(audio_path) 1045 1046 try: 1047 os.remove(audio_path) 1048 except OSError: 1049 pass 1050 1051 process_episode(transcript, ep, output_dir, args.dedup_days, args.min_score) 1052 1053 # Generate combined calendar 1054 print("\n\nGenerating combined calendar for all episodes...") 1055 calendar = generate_calendar_from_outputs(output_dir) 1056 calendar_dir = output_dir / "calendar" 1057 calendar_dir.mkdir(parents=True, exist_ok=True) 1058 1059 week_str = datetime.now().strftime("%Y-W%W") 1060 cal_path = calendar_dir / f"week-{week_str}.json" 1061 with open(cal_path, "w") as f: 1062 json.dump(calendar, f, indent=2) 1063 1064 print(f"\n✅ Batch complete. Combined calendar: {cal_path}") 1065 return 1066 1067 1068 if __name__ == "__main__": 1069 main()