/ video-clip-pipeline / longform_pipeline.py
longform_pipeline.py
  1  #!/usr/bin/env python3
  2  """
  3  longform_pipeline.py — End-to-end long-form clip pipeline
  4  Segments long YouTube videos into 5-15 minute highlight clips for YouTube.
  5  
  6  Usage:
  7    python3 longform_pipeline.py --url URL [--max-clips 3]
  8    python3 longform_pipeline.py --channel my-podcast [--max-clips 3]
  9  """
 10  
 11  import argparse
 12  import json
 13  import os
 14  import re
 15  import subprocess
 16  import sys
 17  import tempfile
 18  import shutil
 19  from datetime import datetime, timedelta
 20  from pathlib import Path
 21  import anthropic
 22  
 23  # ── Paths (configure these for your setup) ─────────────────────────────────────
 24  WORKSPACE = Path(os.environ.get("PIPELINE_WORKSPACE", Path.cwd()))
 25  DATA_DIR = WORKSPACE / "data" / "youtube-clips"
 26  PROCESSED_FILE = DATA_DIR / "processed-longform.json"
 27  KB_BASE = WORKSPACE / "knowledge_base" / "youtube"
 28  
 29  LONGFORM_SEGMENTATION_PROMPT = """You are a YouTube video editor specializing in extracting high-value highlight clips from long-form content.
 30  
 31  You will be given a transcript of a long-form YouTube video with timestamps.
 32  
 33  Your job: Find {n} self-contained segments of 5-15 minutes each that would work as standalone YouTube videos.
 34  
 35  ## Rules for Segment Selection
 36  
 37  ### Structure
 38  - Each segment must have a CLEAR NARRATIVE ARC: setup → development → resolution
 39  - The segment must open with a STRONG HOOK that gives viewers immediate context
 40  - The segment must end NATURALLY — at a conclusion, insight landing, or story resolution
 41  - Never end mid-topic or mid-story
 42  
 43  ### Content criteria (pick the best)
 44  1. A complete story or case study with a clear result
 45  2. A step-by-step tutorial or walkthrough
 46  3. A debate, discussion, or analysis that reaches a conclusion
 47  4. A "how we did X and got Y result" narrative
 48  5. A contrarian take with supporting evidence and a conclusion
 49  
 50  ### Length
 51  - Minimum: 5 minutes (300 seconds)
 52  - Maximum: 15 minutes (900 seconds)
 53  - Sweet spot: 7-12 minutes
 54  
 55  ### What to avoid
 56  - Starting in the middle of a thought
 57  - Ending with a question or cliffhanger (viewers came for answers)
 58  - Topics that require external context from earlier in the video
 59  
 60  ## Output Format
 61  
 62  Return ONLY valid JSON array. No markdown, no commentary.
 63  
 64  [
 65    {{
 66      "title": "descriptive YouTube-style title (under 70 chars)",
 67      "start_time": "MM:SS",
 68      "end_time": "MM:SS",
 69      "hook_sentence": "exact words from transcript that open the segment — must immediately establish context",
 70      "payoff_sentence": "exact words from transcript where the key insight/resolution lands",
 71      "narrative_arc": "1-2 sentences describing setup → development → resolution",
 72      "why": "2-3 sentences on why this works as a standalone YouTube video"
 73    }}
 74  ]
 75  
 76  ## Important
 77  
 78  - `hook_sentence` and `payoff_sentence` must be verbatim from the transcript
 79  - Each segment must be fully self-contained — someone who hasn't seen the full video should understand and benefit
 80  - Find exactly {n} segments
 81  - Spread across the video — don't cluster at the beginning
 82  
 83  ## Transcript
 84  
 85  {TRANSCRIPT}"""
 86  
 87  
 88  # ── Helpers ─────────────────────────────────────────────────────────────────────
 89  
 90  def log(msg: str):
 91      print(f"[longform] {msg}", flush=True)
 92  
 93  
 94  def run(cmd: list, **kwargs) -> subprocess.CompletedProcess:
 95      log(f"$ {' '.join(str(c) for c in cmd)}")
 96      return subprocess.run(cmd, **kwargs)
 97  
 98  
 99  def get_anthropic_client() -> anthropic.Anthropic:
100      """Get Anthropic client from environment variable."""
101      key = os.environ.get("ANTHROPIC_API_KEY") or os.environ.get("ANTHROPIC_KEY")
102      if not key:
103          raise RuntimeError(
104              "No ANTHROPIC_API_KEY found. Set the environment variable:\n"
105              "  export ANTHROPIC_API_KEY='sk-ant-...'"
106          )
107      return anthropic.Anthropic(api_key=key)
108  
109  
110  def load_processed() -> set:
111      if PROCESSED_FILE.exists():
112          try:
113              data = json.loads(PROCESSED_FILE.read_text())
114              return set(data.get("urls", []))
115          except Exception:
116              pass
117      return set()
118  
119  
120  def save_processed(processed: set):
121      PROCESSED_FILE.parent.mkdir(parents=True, exist_ok=True)
122      PROCESSED_FILE.write_text(json.dumps({"urls": sorted(processed)}, indent=2))
123  
124  
125  def parse_time_to_seconds(t: str) -> float:
126      parts = t.strip().split(":")
127      if len(parts) == 2:
128          return int(parts[0]) * 60 + float(parts[1])
129      elif len(parts) == 3:
130          return int(parts[0]) * 3600 + int(parts[1]) * 60 + float(parts[2])
131      return float(t)
132  
133  
134  def seconds_to_mmss(s: float) -> str:
135      m = int(s) // 60
136      sec = int(s) % 60
137      return f"{m:02d}:{sec:02d}"
138  
139  
140  def get_video_duration(video_path: str) -> float:
141      result = subprocess.run(
142          ["ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", video_path],
143          capture_output=True, text=True
144      )
145      data = json.loads(result.stdout)
146      return float(data["format"]["duration"])
147  
148  
149  def parse_vtt(vtt_path: str) -> list[dict]:
150      entries = []
151      with open(vtt_path, encoding="utf-8", errors="replace") as f:
152          content = f.read()
153      pattern = re.compile(
154          r'(\d{2}:\d{2}:\d{2}\.\d{3})\s*-->\s*(\d{2}:\d{2}:\d{2}\.\d{3})[^\n]*\n(.*?)(?=\n\n|\Z)',
155          re.DOTALL
156      )
157      for m in pattern.finditer(content):
158          start_str, end_str, text = m.group(1), m.group(2), m.group(3)
159          text = re.sub(r'<[^>]+>', '', text).strip()
160          text = re.sub(r'\s+', ' ', text)
161          if not text:
162              continue
163          def vtt_time(s):
164              h, mi, rest = s.split(":")
165              sec, ms = rest.split(".")
166              return int(h)*3600 + int(mi)*60 + int(sec) + int(ms)/1000
167          entries.append({
168              "start": vtt_time(start_str),
169              "end": vtt_time(end_str),
170              "text": text,
171          })
172      return entries
173  
174  
175  def transcript_to_text(entries: list[dict]) -> str:
176      lines = []
177      seen = set()
178      for e in entries:
179          t = e["text"]
180          if t in seen:
181              continue
182          seen.add(t)
183          ts = seconds_to_mmss(e["start"])
184          lines.append(f"[{ts}] {t}")
185      return "\n".join(lines)
186  
187  
188  def get_transcript_window(entries: list[dict], center_seconds: float, window: float = 10.0) -> str:
189      lo, hi = center_seconds - window, center_seconds + window
190      lines = []
191      seen = set()
192      for e in entries:
193          if e["end"] < lo or e["start"] > hi:
194              continue
195          t = e["text"]
196          if t in seen:
197              continue
198          seen.add(t)
199          ts = seconds_to_mmss(e["start"])
200          lines.append(f"[{ts}] {t}")
201      return "\n".join(lines)
202  
203  
204  def download_video(url: str, out_dir: str) -> tuple[str, str]:
205      log(f"Downloading: {url}")
206      cmd = [
207          "yt-dlp",
208          "--write-auto-sub", "--sub-lang", "en", "--convert-subs", "vtt",
209          "-f", "bestvideo[height<=1080]+bestaudio/best[height<=1080]",
210          "--merge-output-format", "mp4",
211          "-o", f"{out_dir}/%(title)s.%(ext)s",
212          url,
213      ]
214      result = run(cmd, capture_output=False)
215      if result.returncode != 0:
216          raise RuntimeError(f"yt-dlp failed for {url}")
217      mp4_files = list(Path(out_dir).glob("*.mp4"))
218      vtt_files = list(Path(out_dir).glob("*.vtt"))
219      if not mp4_files:
220          raise RuntimeError("No MP4 found after download")
221      if not vtt_files:
222          raise RuntimeError("No VTT found after download")
223      return str(mp4_files[0]), str(vtt_files[0])
224  
225  
226  def call_claude_segmentation(client: anthropic.Anthropic, transcript: str, n: int = 3) -> list[dict]:
227      prompt = LONGFORM_SEGMENTATION_PROMPT.format(n=n, TRANSCRIPT=transcript)
228      log(f"Calling Claude for long-form segmentation ({n} clips)...")
229      message = client.messages.create(
230          model="claude-sonnet-4-6",
231          max_tokens=3000,
232          messages=[{"role": "user", "content": prompt}]
233      )
234      raw = message.content[0].text.strip()
235      json_match = re.search(r'(\[[\s\S]+\])', raw)
236      if not json_match:
237          json_match = re.search(r'(\{[\s\S]+\})', raw)
238          if not json_match:
239              raise ValueError(f"No JSON found in Claude response:\n{raw[:500]}")
240          parsed = [json.loads(json_match.group(1))]
241      else:
242          parsed = json.loads(json_match.group(1))
243      if isinstance(parsed, dict):
244          parsed = [parsed]
245      return parsed[:n]
246  
247  
248  def verify_cut(client: anthropic.Anthropic, clip: dict, entries: list[dict]) -> dict:
249      end_sec = parse_time_to_seconds(clip["end_time"])
250      window_text = get_transcript_window(entries, end_sec, window=10.0)
251  
252      prompt = f"""You are verifying whether a long-form YouTube clip ends at a clean, complete point.
253  
254  Proposed end time: {clip['end_time']}
255  Expected payoff: "{clip.get('payoff_sentence', '')}"
256  
257  Transcript around the proposed end time (±10 seconds):
258  {window_text}
259  
260  Does the thought/narrative complete at {clip['end_time']}, or does it continue?
261  If it continues, provide the corrected end_time where the thought actually resolves.
262  The clip must end on a complete thought, insight, or story beat — never mid-sentence or mid-idea.
263  
264  Return ONLY valid JSON:
265  {{
266    "end_is_clean": true/false,
267    "corrected_end_time": "MM:SS or same as proposed if clean",
268    "reason": "one-sentence explanation"
269  }}"""
270  
271      log(f"Verifying cut at {clip['end_time']}...")
272      message = client.messages.create(
273          model="claude-sonnet-4-6",
274          max_tokens=400,
275          messages=[{"role": "user", "content": prompt}]
276      )
277      raw = message.content[0].text.strip()
278      json_match = re.search(r'(\{[\s\S]+\})', raw)
279      if not json_match:
280          log("Warning: no JSON in verification, keeping original")
281          return clip
282      verification = json.loads(json_match.group(1))
283      if not verification.get("end_is_clean") and verification.get("corrected_end_time"):
284          old_end = clip["end_time"]
285          clip["end_time"] = verification["corrected_end_time"]
286          log(f"  ✂️  Cut corrected: {old_end} → {clip['end_time']} ({verification.get('reason', '')})")
287      else:
288          log(f"  ✅ Cut is clean at {clip['end_time']}")
289      return clip
290  
291  
292  def cut_clip_landscape(video_path: str, start: str, end: str, output_path: str):
293      """Cut a clip, keeping 16:9 landscape. No crop or caption burn."""
294      start_sec = parse_time_to_seconds(start)
295      end_sec = parse_time_to_seconds(end)
296      duration = end_sec - start_sec
297      cmd = [
298          "ffmpeg", "-y",
299          "-ss", str(start_sec),
300          "-i", video_path,
301          "-t", str(duration),
302          "-c:v", "libx264", "-c:a", "aac",
303          "-avoid_negative_ts", "make_zero",
304          output_path,
305      ]
306      result = run(cmd, capture_output=True, text=True)
307      if result.returncode != 0:
308          log(f"FFmpeg error: {result.stderr[-500:]}")
309          raise RuntimeError("FFmpeg cut failed")
310  
311  
312  def scan_channel(channel: str, processed: set) -> list[dict]:
313      """Scan a channel knowledge base directory for new videos to process."""
314      kb_dir = KB_BASE / channel
315      if not kb_dir.exists():
316          log(f"Channel KB dir not found: {kb_dir}")
317          return []
318  
319      today = datetime.now()
320      week_start = (today - timedelta(days=today.weekday())).replace(hour=0, minute=0, second=0)
321      week_end = (week_start + timedelta(days=6)).replace(hour=23, minute=59, second=59)
322      videos = []
323  
324      for md_file in sorted(kb_dir.glob("*.md")):
325          name = md_file.stem
326          date_match = re.match(r'(\d{4}-\d{2}-\d{2})', name)
327          if not date_match:
328              continue
329          try:
330              file_date = datetime.strptime(date_match.group(1), "%Y-%m-%d")
331          except ValueError:
332              continue
333          if not (week_start <= file_date <= week_end):
334              continue
335          if "summary" in name.lower():
336              continue
337  
338          content = md_file.read_text(encoding="utf-8", errors="replace")
339          url_match = re.search(r'^url:\s*(https://www\.youtube\.com/watch\?v=\S+)', content, re.MULTILINE)
340          if not url_match:
341              url_match = re.search(r'(https://www\.youtube\.com/watch\?v=[\w-]+)', content)
342          if not url_match:
343              continue
344  
345          url = url_match.group(1).strip()
346          if url in processed:
347              log(f"Skipping already processed: {url}")
348              continue
349  
350          title_match = re.search(r'^title:\s*"?(.+?)"?\s*$', content, re.MULTILINE)
351          title = title_match.group(1) if title_match else name
352  
353          videos.append({"url": url, "title": title, "date": file_date})
354          log(f"Found new video: {title} ({url})")
355  
356      return videos
357  
358  
359  def process_video(url: str, client: anthropic.Anthropic, args, work_dir: str) -> list[dict]:
360      results = []
361      video_dir = tempfile.mkdtemp(dir=work_dir, prefix="video_")
362  
363      try:
364          video_path, vtt_path = download_video(url, video_dir)
365  
366          duration = get_video_duration(video_path)
367          if duration < 600:
368              log(f"Video too short ({duration:.0f}s < 600s). Skipping.")
369              return []
370          log(f"Video duration: {duration:.0f}s ({duration/60:.1f} min)")
371  
372          entries = parse_vtt(vtt_path)
373          if not entries:
374              raise RuntimeError("No transcript entries found in VTT")
375  
376          transcript = transcript_to_text(entries)
377          log(f"Transcript: {len(transcript)} chars, {len(entries)} entries")
378  
379          clips = call_claude_segmentation(client, transcript, n=args.max_clips)
380          log(f"Got {len(clips)} clip suggestions")
381  
382          for clip in clips:
383              clip = verify_cut(client, clip, entries)
384  
385          video_stem = Path(video_path).stem[:40].replace(" ", "_").replace("/", "-")
386          output_dir = args.output_dir or os.path.join(work_dir, "output")
387          os.makedirs(output_dir, exist_ok=True)
388  
389          for i, clip in enumerate(clips, 1):
390              safe_title = re.sub(r'[^\w\s-]', '', clip.get('title', f'clip{i}'))[:50].replace(' ', '_')
391              clip_name = f"{safe_title}_clip{i}"
392  
393              log(f"\n--- Clip {i}: {clip.get('title', 'Untitled')} ---")
394              log(f"  Start: {clip['start_time']} | End: {clip['end_time']}")
395  
396              start_sec = parse_time_to_seconds(clip["start_time"])
397              end_sec = parse_time_to_seconds(clip["end_time"])
398              seg_dur = end_sec - start_sec
399              if seg_dur < 60:
400                  log(f"  ⚠️  Clip too short ({seg_dur:.0f}s), skipping")
401                  continue
402              if seg_dur > 1200:
403                  log(f"  ⚠️  Clip very long ({seg_dur:.0f}s), but proceeding")
404  
405              final_path = os.path.join(output_dir, f"{clip_name}_landscape.mp4")
406  
407              try:
408                  cut_clip_landscape(video_path, clip["start_time"], clip["end_time"], final_path)
409  
410                  results.append({
411                      "title": clip.get("title", clip_name),
412                      "start_time": clip["start_time"],
413                      "end_time": clip["end_time"],
414                      "duration_seconds": seg_dur,
415                      "hook_sentence": clip.get("hook_sentence", ""),
416                      "payoff_sentence": clip.get("payoff_sentence", ""),
417                      "narrative_arc": clip.get("narrative_arc", ""),
418                      "why": clip.get("why", ""),
419                      "local_path": final_path,
420                      "source_url": url,
421                  })
422                  log(f"  ✅ Clip {i} done: {final_path}")
423  
424              except Exception as e:
425                  log(f"  ❌ Clip {i} failed: {e}")
426  
427      except Exception as e:
428          log(f"Video processing failed: {e}")
429          import traceback
430          traceback.print_exc()
431  
432      return results
433  
434  
435  def main():
436      parser = argparse.ArgumentParser(description="YouTube long-form clip pipeline")
437      parser.add_argument("--url", help="Single YouTube URL to process")
438      parser.add_argument("--channel", help="Channel name in knowledge base directory")
439      parser.add_argument("--max-clips", type=int, default=3, help="Max clips per episode (default: 3)")
440      parser.add_argument("--output-dir", help="Output directory for clips")
441      args = parser.parse_args()
442  
443      if not args.url and not args.channel:
444          parser.error("Provide --url or --channel")
445  
446      client = get_anthropic_client()
447      processed = load_processed()
448      work_dir = tempfile.mkdtemp(prefix="/tmp/longform_")
449      all_results = []
450  
451      try:
452          if args.url:
453              urls = [{"url": args.url, "title": args.url}]
454          else:
455              urls = scan_channel(args.channel, processed)
456              if not urls:
457                  log("No new videos found for this week.")
458                  return
459  
460          for video_info in urls:
461              url = video_info["url"]
462              log(f"\n{'='*60}")
463              log(f"Processing: {video_info.get('title', url)}")
464              results = process_video(url, client, args, work_dir)
465              all_results.extend(results)
466              if results:
467                  processed.add(url)
468                  save_processed(processed)
469  
470          print("\n" + "="*60)
471          print(f"DONE — {len(all_results)} long-form clip(s) produced")
472          print("="*60)
473          for r in all_results:
474              dur = r["duration_seconds"]
475              print(f"\n🎬 {r['title']}")
476              print(f"   Duration: {dur:.0f}s ({dur/60:.1f}m)")
477              print(f"   Arc: {r.get('narrative_arc', '')[:150]}")
478              print(f"   Hook: {r['hook_sentence'][:100]}")
479              print(f"   Path: {r['local_path']}")
480  
481          results_path = DATA_DIR / "last-run-longform.json"
482          results_path.parent.mkdir(parents=True, exist_ok=True)
483          results_path.write_text(json.dumps(all_results, indent=2, default=str))
484          log(f"Results written to {results_path}")
485  
486      finally:
487          log(f"Work dir: {work_dir}")
488  
489  
490  if __name__ == "__main__":
491      main()