/ video-clip-pipeline / longform_pipeline.py
longform_pipeline.py
1 #!/usr/bin/env python3 2 """ 3 longform_pipeline.py — End-to-end long-form clip pipeline 4 Segments long YouTube videos into 5-15 minute highlight clips for YouTube. 5 6 Usage: 7 python3 longform_pipeline.py --url URL [--max-clips 3] 8 python3 longform_pipeline.py --channel my-podcast [--max-clips 3] 9 """ 10 11 import argparse 12 import json 13 import os 14 import re 15 import subprocess 16 import sys 17 import tempfile 18 import shutil 19 from datetime import datetime, timedelta 20 from pathlib import Path 21 import anthropic 22 23 # ── Paths (configure these for your setup) ───────────────────────────────────── 24 WORKSPACE = Path(os.environ.get("PIPELINE_WORKSPACE", Path.cwd())) 25 DATA_DIR = WORKSPACE / "data" / "youtube-clips" 26 PROCESSED_FILE = DATA_DIR / "processed-longform.json" 27 KB_BASE = WORKSPACE / "knowledge_base" / "youtube" 28 29 LONGFORM_SEGMENTATION_PROMPT = """You are a YouTube video editor specializing in extracting high-value highlight clips from long-form content. 30 31 You will be given a transcript of a long-form YouTube video with timestamps. 32 33 Your job: Find {n} self-contained segments of 5-15 minutes each that would work as standalone YouTube videos. 34 35 ## Rules for Segment Selection 36 37 ### Structure 38 - Each segment must have a CLEAR NARRATIVE ARC: setup → development → resolution 39 - The segment must open with a STRONG HOOK that gives viewers immediate context 40 - The segment must end NATURALLY — at a conclusion, insight landing, or story resolution 41 - Never end mid-topic or mid-story 42 43 ### Content criteria (pick the best) 44 1. A complete story or case study with a clear result 45 2. A step-by-step tutorial or walkthrough 46 3. A debate, discussion, or analysis that reaches a conclusion 47 4. A "how we did X and got Y result" narrative 48 5. A contrarian take with supporting evidence and a conclusion 49 50 ### Length 51 - Minimum: 5 minutes (300 seconds) 52 - Maximum: 15 minutes (900 seconds) 53 - Sweet spot: 7-12 minutes 54 55 ### What to avoid 56 - Starting in the middle of a thought 57 - Ending with a question or cliffhanger (viewers came for answers) 58 - Topics that require external context from earlier in the video 59 60 ## Output Format 61 62 Return ONLY valid JSON array. No markdown, no commentary. 63 64 [ 65 {{ 66 "title": "descriptive YouTube-style title (under 70 chars)", 67 "start_time": "MM:SS", 68 "end_time": "MM:SS", 69 "hook_sentence": "exact words from transcript that open the segment — must immediately establish context", 70 "payoff_sentence": "exact words from transcript where the key insight/resolution lands", 71 "narrative_arc": "1-2 sentences describing setup → development → resolution", 72 "why": "2-3 sentences on why this works as a standalone YouTube video" 73 }} 74 ] 75 76 ## Important 77 78 - `hook_sentence` and `payoff_sentence` must be verbatim from the transcript 79 - Each segment must be fully self-contained — someone who hasn't seen the full video should understand and benefit 80 - Find exactly {n} segments 81 - Spread across the video — don't cluster at the beginning 82 83 ## Transcript 84 85 {TRANSCRIPT}""" 86 87 88 # ── Helpers ───────────────────────────────────────────────────────────────────── 89 90 def log(msg: str): 91 print(f"[longform] {msg}", flush=True) 92 93 94 def run(cmd: list, **kwargs) -> subprocess.CompletedProcess: 95 log(f"$ {' '.join(str(c) for c in cmd)}") 96 return subprocess.run(cmd, **kwargs) 97 98 99 def get_anthropic_client() -> anthropic.Anthropic: 100 """Get Anthropic client from environment variable.""" 101 key = os.environ.get("ANTHROPIC_API_KEY") or os.environ.get("ANTHROPIC_KEY") 102 if not key: 103 raise RuntimeError( 104 "No ANTHROPIC_API_KEY found. Set the environment variable:\n" 105 " export ANTHROPIC_API_KEY='sk-ant-...'" 106 ) 107 return anthropic.Anthropic(api_key=key) 108 109 110 def load_processed() -> set: 111 if PROCESSED_FILE.exists(): 112 try: 113 data = json.loads(PROCESSED_FILE.read_text()) 114 return set(data.get("urls", [])) 115 except Exception: 116 pass 117 return set() 118 119 120 def save_processed(processed: set): 121 PROCESSED_FILE.parent.mkdir(parents=True, exist_ok=True) 122 PROCESSED_FILE.write_text(json.dumps({"urls": sorted(processed)}, indent=2)) 123 124 125 def parse_time_to_seconds(t: str) -> float: 126 parts = t.strip().split(":") 127 if len(parts) == 2: 128 return int(parts[0]) * 60 + float(parts[1]) 129 elif len(parts) == 3: 130 return int(parts[0]) * 3600 + int(parts[1]) * 60 + float(parts[2]) 131 return float(t) 132 133 134 def seconds_to_mmss(s: float) -> str: 135 m = int(s) // 60 136 sec = int(s) % 60 137 return f"{m:02d}:{sec:02d}" 138 139 140 def get_video_duration(video_path: str) -> float: 141 result = subprocess.run( 142 ["ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", video_path], 143 capture_output=True, text=True 144 ) 145 data = json.loads(result.stdout) 146 return float(data["format"]["duration"]) 147 148 149 def parse_vtt(vtt_path: str) -> list[dict]: 150 entries = [] 151 with open(vtt_path, encoding="utf-8", errors="replace") as f: 152 content = f.read() 153 pattern = re.compile( 154 r'(\d{2}:\d{2}:\d{2}\.\d{3})\s*-->\s*(\d{2}:\d{2}:\d{2}\.\d{3})[^\n]*\n(.*?)(?=\n\n|\Z)', 155 re.DOTALL 156 ) 157 for m in pattern.finditer(content): 158 start_str, end_str, text = m.group(1), m.group(2), m.group(3) 159 text = re.sub(r'<[^>]+>', '', text).strip() 160 text = re.sub(r'\s+', ' ', text) 161 if not text: 162 continue 163 def vtt_time(s): 164 h, mi, rest = s.split(":") 165 sec, ms = rest.split(".") 166 return int(h)*3600 + int(mi)*60 + int(sec) + int(ms)/1000 167 entries.append({ 168 "start": vtt_time(start_str), 169 "end": vtt_time(end_str), 170 "text": text, 171 }) 172 return entries 173 174 175 def transcript_to_text(entries: list[dict]) -> str: 176 lines = [] 177 seen = set() 178 for e in entries: 179 t = e["text"] 180 if t in seen: 181 continue 182 seen.add(t) 183 ts = seconds_to_mmss(e["start"]) 184 lines.append(f"[{ts}] {t}") 185 return "\n".join(lines) 186 187 188 def get_transcript_window(entries: list[dict], center_seconds: float, window: float = 10.0) -> str: 189 lo, hi = center_seconds - window, center_seconds + window 190 lines = [] 191 seen = set() 192 for e in entries: 193 if e["end"] < lo or e["start"] > hi: 194 continue 195 t = e["text"] 196 if t in seen: 197 continue 198 seen.add(t) 199 ts = seconds_to_mmss(e["start"]) 200 lines.append(f"[{ts}] {t}") 201 return "\n".join(lines) 202 203 204 def download_video(url: str, out_dir: str) -> tuple[str, str]: 205 log(f"Downloading: {url}") 206 cmd = [ 207 "yt-dlp", 208 "--write-auto-sub", "--sub-lang", "en", "--convert-subs", "vtt", 209 "-f", "bestvideo[height<=1080]+bestaudio/best[height<=1080]", 210 "--merge-output-format", "mp4", 211 "-o", f"{out_dir}/%(title)s.%(ext)s", 212 url, 213 ] 214 result = run(cmd, capture_output=False) 215 if result.returncode != 0: 216 raise RuntimeError(f"yt-dlp failed for {url}") 217 mp4_files = list(Path(out_dir).glob("*.mp4")) 218 vtt_files = list(Path(out_dir).glob("*.vtt")) 219 if not mp4_files: 220 raise RuntimeError("No MP4 found after download") 221 if not vtt_files: 222 raise RuntimeError("No VTT found after download") 223 return str(mp4_files[0]), str(vtt_files[0]) 224 225 226 def call_claude_segmentation(client: anthropic.Anthropic, transcript: str, n: int = 3) -> list[dict]: 227 prompt = LONGFORM_SEGMENTATION_PROMPT.format(n=n, TRANSCRIPT=transcript) 228 log(f"Calling Claude for long-form segmentation ({n} clips)...") 229 message = client.messages.create( 230 model="claude-sonnet-4-6", 231 max_tokens=3000, 232 messages=[{"role": "user", "content": prompt}] 233 ) 234 raw = message.content[0].text.strip() 235 json_match = re.search(r'(\[[\s\S]+\])', raw) 236 if not json_match: 237 json_match = re.search(r'(\{[\s\S]+\})', raw) 238 if not json_match: 239 raise ValueError(f"No JSON found in Claude response:\n{raw[:500]}") 240 parsed = [json.loads(json_match.group(1))] 241 else: 242 parsed = json.loads(json_match.group(1)) 243 if isinstance(parsed, dict): 244 parsed = [parsed] 245 return parsed[:n] 246 247 248 def verify_cut(client: anthropic.Anthropic, clip: dict, entries: list[dict]) -> dict: 249 end_sec = parse_time_to_seconds(clip["end_time"]) 250 window_text = get_transcript_window(entries, end_sec, window=10.0) 251 252 prompt = f"""You are verifying whether a long-form YouTube clip ends at a clean, complete point. 253 254 Proposed end time: {clip['end_time']} 255 Expected payoff: "{clip.get('payoff_sentence', '')}" 256 257 Transcript around the proposed end time (±10 seconds): 258 {window_text} 259 260 Does the thought/narrative complete at {clip['end_time']}, or does it continue? 261 If it continues, provide the corrected end_time where the thought actually resolves. 262 The clip must end on a complete thought, insight, or story beat — never mid-sentence or mid-idea. 263 264 Return ONLY valid JSON: 265 {{ 266 "end_is_clean": true/false, 267 "corrected_end_time": "MM:SS or same as proposed if clean", 268 "reason": "one-sentence explanation" 269 }}""" 270 271 log(f"Verifying cut at {clip['end_time']}...") 272 message = client.messages.create( 273 model="claude-sonnet-4-6", 274 max_tokens=400, 275 messages=[{"role": "user", "content": prompt}] 276 ) 277 raw = message.content[0].text.strip() 278 json_match = re.search(r'(\{[\s\S]+\})', raw) 279 if not json_match: 280 log("Warning: no JSON in verification, keeping original") 281 return clip 282 verification = json.loads(json_match.group(1)) 283 if not verification.get("end_is_clean") and verification.get("corrected_end_time"): 284 old_end = clip["end_time"] 285 clip["end_time"] = verification["corrected_end_time"] 286 log(f" ✂️ Cut corrected: {old_end} → {clip['end_time']} ({verification.get('reason', '')})") 287 else: 288 log(f" ✅ Cut is clean at {clip['end_time']}") 289 return clip 290 291 292 def cut_clip_landscape(video_path: str, start: str, end: str, output_path: str): 293 """Cut a clip, keeping 16:9 landscape. No crop or caption burn.""" 294 start_sec = parse_time_to_seconds(start) 295 end_sec = parse_time_to_seconds(end) 296 duration = end_sec - start_sec 297 cmd = [ 298 "ffmpeg", "-y", 299 "-ss", str(start_sec), 300 "-i", video_path, 301 "-t", str(duration), 302 "-c:v", "libx264", "-c:a", "aac", 303 "-avoid_negative_ts", "make_zero", 304 output_path, 305 ] 306 result = run(cmd, capture_output=True, text=True) 307 if result.returncode != 0: 308 log(f"FFmpeg error: {result.stderr[-500:]}") 309 raise RuntimeError("FFmpeg cut failed") 310 311 312 def scan_channel(channel: str, processed: set) -> list[dict]: 313 """Scan a channel knowledge base directory for new videos to process.""" 314 kb_dir = KB_BASE / channel 315 if not kb_dir.exists(): 316 log(f"Channel KB dir not found: {kb_dir}") 317 return [] 318 319 today = datetime.now() 320 week_start = (today - timedelta(days=today.weekday())).replace(hour=0, minute=0, second=0) 321 week_end = (week_start + timedelta(days=6)).replace(hour=23, minute=59, second=59) 322 videos = [] 323 324 for md_file in sorted(kb_dir.glob("*.md")): 325 name = md_file.stem 326 date_match = re.match(r'(\d{4}-\d{2}-\d{2})', name) 327 if not date_match: 328 continue 329 try: 330 file_date = datetime.strptime(date_match.group(1), "%Y-%m-%d") 331 except ValueError: 332 continue 333 if not (week_start <= file_date <= week_end): 334 continue 335 if "summary" in name.lower(): 336 continue 337 338 content = md_file.read_text(encoding="utf-8", errors="replace") 339 url_match = re.search(r'^url:\s*(https://www\.youtube\.com/watch\?v=\S+)', content, re.MULTILINE) 340 if not url_match: 341 url_match = re.search(r'(https://www\.youtube\.com/watch\?v=[\w-]+)', content) 342 if not url_match: 343 continue 344 345 url = url_match.group(1).strip() 346 if url in processed: 347 log(f"Skipping already processed: {url}") 348 continue 349 350 title_match = re.search(r'^title:\s*"?(.+?)"?\s*$', content, re.MULTILINE) 351 title = title_match.group(1) if title_match else name 352 353 videos.append({"url": url, "title": title, "date": file_date}) 354 log(f"Found new video: {title} ({url})") 355 356 return videos 357 358 359 def process_video(url: str, client: anthropic.Anthropic, args, work_dir: str) -> list[dict]: 360 results = [] 361 video_dir = tempfile.mkdtemp(dir=work_dir, prefix="video_") 362 363 try: 364 video_path, vtt_path = download_video(url, video_dir) 365 366 duration = get_video_duration(video_path) 367 if duration < 600: 368 log(f"Video too short ({duration:.0f}s < 600s). Skipping.") 369 return [] 370 log(f"Video duration: {duration:.0f}s ({duration/60:.1f} min)") 371 372 entries = parse_vtt(vtt_path) 373 if not entries: 374 raise RuntimeError("No transcript entries found in VTT") 375 376 transcript = transcript_to_text(entries) 377 log(f"Transcript: {len(transcript)} chars, {len(entries)} entries") 378 379 clips = call_claude_segmentation(client, transcript, n=args.max_clips) 380 log(f"Got {len(clips)} clip suggestions") 381 382 for clip in clips: 383 clip = verify_cut(client, clip, entries) 384 385 video_stem = Path(video_path).stem[:40].replace(" ", "_").replace("/", "-") 386 output_dir = args.output_dir or os.path.join(work_dir, "output") 387 os.makedirs(output_dir, exist_ok=True) 388 389 for i, clip in enumerate(clips, 1): 390 safe_title = re.sub(r'[^\w\s-]', '', clip.get('title', f'clip{i}'))[:50].replace(' ', '_') 391 clip_name = f"{safe_title}_clip{i}" 392 393 log(f"\n--- Clip {i}: {clip.get('title', 'Untitled')} ---") 394 log(f" Start: {clip['start_time']} | End: {clip['end_time']}") 395 396 start_sec = parse_time_to_seconds(clip["start_time"]) 397 end_sec = parse_time_to_seconds(clip["end_time"]) 398 seg_dur = end_sec - start_sec 399 if seg_dur < 60: 400 log(f" ⚠️ Clip too short ({seg_dur:.0f}s), skipping") 401 continue 402 if seg_dur > 1200: 403 log(f" ⚠️ Clip very long ({seg_dur:.0f}s), but proceeding") 404 405 final_path = os.path.join(output_dir, f"{clip_name}_landscape.mp4") 406 407 try: 408 cut_clip_landscape(video_path, clip["start_time"], clip["end_time"], final_path) 409 410 results.append({ 411 "title": clip.get("title", clip_name), 412 "start_time": clip["start_time"], 413 "end_time": clip["end_time"], 414 "duration_seconds": seg_dur, 415 "hook_sentence": clip.get("hook_sentence", ""), 416 "payoff_sentence": clip.get("payoff_sentence", ""), 417 "narrative_arc": clip.get("narrative_arc", ""), 418 "why": clip.get("why", ""), 419 "local_path": final_path, 420 "source_url": url, 421 }) 422 log(f" ✅ Clip {i} done: {final_path}") 423 424 except Exception as e: 425 log(f" ❌ Clip {i} failed: {e}") 426 427 except Exception as e: 428 log(f"Video processing failed: {e}") 429 import traceback 430 traceback.print_exc() 431 432 return results 433 434 435 def main(): 436 parser = argparse.ArgumentParser(description="YouTube long-form clip pipeline") 437 parser.add_argument("--url", help="Single YouTube URL to process") 438 parser.add_argument("--channel", help="Channel name in knowledge base directory") 439 parser.add_argument("--max-clips", type=int, default=3, help="Max clips per episode (default: 3)") 440 parser.add_argument("--output-dir", help="Output directory for clips") 441 args = parser.parse_args() 442 443 if not args.url and not args.channel: 444 parser.error("Provide --url or --channel") 445 446 client = get_anthropic_client() 447 processed = load_processed() 448 work_dir = tempfile.mkdtemp(prefix="/tmp/longform_") 449 all_results = [] 450 451 try: 452 if args.url: 453 urls = [{"url": args.url, "title": args.url}] 454 else: 455 urls = scan_channel(args.channel, processed) 456 if not urls: 457 log("No new videos found for this week.") 458 return 459 460 for video_info in urls: 461 url = video_info["url"] 462 log(f"\n{'='*60}") 463 log(f"Processing: {video_info.get('title', url)}") 464 results = process_video(url, client, args, work_dir) 465 all_results.extend(results) 466 if results: 467 processed.add(url) 468 save_processed(processed) 469 470 print("\n" + "="*60) 471 print(f"DONE — {len(all_results)} long-form clip(s) produced") 472 print("="*60) 473 for r in all_results: 474 dur = r["duration_seconds"] 475 print(f"\n🎬 {r['title']}") 476 print(f" Duration: {dur:.0f}s ({dur/60:.1f}m)") 477 print(f" Arc: {r.get('narrative_arc', '')[:150]}") 478 print(f" Hook: {r['hook_sentence'][:100]}") 479 print(f" Path: {r['local_path']}") 480 481 results_path = DATA_DIR / "last-run-longform.json" 482 results_path.parent.mkdir(parents=True, exist_ok=True) 483 results_path.write_text(json.dumps(all_results, indent=2, default=str)) 484 log(f"Results written to {results_path}") 485 486 finally: 487 log(f"Work dir: {work_dir}") 488 489 490 if __name__ == "__main__": 491 main()