/ video-clip-pipeline / clip_cutter.py
clip_cutter.py
  1  #!/usr/bin/env python3
  2  """
  3  Clip Cutter - Cut video clips from segments identified by the AI segmenter.
  4  Uses FFmpeg stream copy for speed (under 5 seconds per clip, zero quality loss).
  5  
  6  Usage:
  7      python3 clip_cutter.py --source video.mp4 --segments segments.json --output-dir clips/
  8      python3 clip_cutter.py --source-dir downloads/ --segments-dir segments/ --output-dir clips/
  9  """
 10  
 11  import argparse
 12  import json
 13  import os
 14  import subprocess
 15  import re
 16  from pathlib import Path
 17  from datetime import datetime
 18  
 19  
 20  def slugify(text):
 21      """Convert text to URL-friendly slug"""
 22      slug = re.sub(r'[^\w\s-]', '', text.lower())
 23      slug = re.sub(r'[-\s]+', '-', slug)
 24      return slug.strip('-')
 25  
 26  
 27  def seconds_to_timestamp(seconds):
 28      """Convert seconds to HH:MM:SS.mmm format for FFmpeg"""
 29      hours = int(seconds // 3600)
 30      minutes = int((seconds % 3600) // 60)
 31      secs = seconds % 60
 32      return f"{hours:02d}:{minutes:02d}:{secs:06.3f}"
 33  
 34  
 35  def cut_clip(source_video, start_time, end_time, output_path, reencode=False):
 36      """Use FFmpeg to cut a clip from source video.
 37  
 38      Args:
 39          source_video: Path to source MP4
 40          start_time: Start time in seconds
 41          end_time: End time in seconds
 42          output_path: Path for output clip
 43          reencode: If True, re-encode for frame-accurate cuts (slower but precise)
 44      """
 45      start_ts = seconds_to_timestamp(start_time)
 46      end_ts = seconds_to_timestamp(end_time)
 47  
 48      if reencode:
 49          # Frame-accurate but slower
 50          cmd = [
 51              'ffmpeg', '-y',
 52              '-ss', start_ts,
 53              '-i', str(source_video),
 54              '-to', seconds_to_timestamp(end_time - start_time),
 55              '-c:v', 'libx264', '-c:a', 'aac',
 56              '-avoid_negative_ts', 'make_zero',
 57              str(output_path)
 58          ]
 59      else:
 60          # Stream copy — instant, zero quality loss, but keyframe-aligned (±1-2 sec)
 61          cmd = [
 62              'ffmpeg', '-y',
 63              '-ss', start_ts,
 64              '-to', end_ts,
 65              '-i', str(source_video),
 66              '-c', 'copy',
 67              '-avoid_negative_ts', 'make_zero',
 68              str(output_path)
 69          ]
 70  
 71      try:
 72          result = subprocess.run(cmd, capture_output=True, text=True, check=True)
 73          return True, ""
 74      except subprocess.CalledProcessError as e:
 75          return False, f"FFmpeg error: {e.stderr}"
 76  
 77  
 78  def get_episode_title_slug(filename):
 79      """Extract a clean title slug from the video filename"""
 80      name = filename.replace('.mp4', '')
 81      parts = name.split('_', 1)
 82      if len(parts) > 1:
 83          title = parts[1]
 84      else:
 85          title = name
 86      return slugify(title)
 87  
 88  
 89  def main():
 90      parser = argparse.ArgumentParser(description="Cut video clips from segment metadata")
 91      parser.add_argument("--source", help="Path to source video MP4")
 92      parser.add_argument("--source-dir", help="Directory containing source MP4s (batch mode)")
 93      parser.add_argument("--segments", help="Path to segment metadata JSON")
 94      parser.add_argument("--segments-dir", help="Directory of segment JSONs (batch mode)")
 95      parser.add_argument("--output-dir", default="clips", help="Output directory for clips (default: clips)")
 96      parser.add_argument("--buffer-start", type=float, default=0, help="Seconds to add before clip start (default: 0)")
 97      parser.add_argument("--buffer-end", type=float, default=0, help="Seconds to add after clip end (default: 0)")
 98      parser.add_argument("--naming-prefix", default="", help="Prefix for output filenames")
 99      parser.add_argument("--reencode", action="store_true", help="Re-encode for frame-accurate cuts (slower)")
100      args = parser.parse_args()
101  
102      if not args.source and not args.source_dir:
103          parser.error("Provide --source or --source-dir")
104  
105      clips_dir = Path(args.output_dir)
106      clips_dir.mkdir(parents=True, exist_ok=True)
107  
108      # Load segments
109      if args.segments:
110          with open(args.segments, 'r') as f:
111              all_segments = json.load(f)
112          # Ensure each segment has a video_file reference
113          if args.source:
114              for seg in all_segments:
115                  if 'video_file' not in seg:
116                      seg['video_file'] = Path(args.source).name
117      elif args.segments_dir:
118          segments_dir = Path(args.segments_dir)
119          combined_file = segments_dir / "all_segments.json"
120          if combined_file.exists():
121              with open(combined_file, 'r') as f:
122                  all_segments = json.load(f)
123          else:
124              all_segments = []
125              for seg_file in segments_dir.glob("*_segments.json"):
126                  with open(seg_file, 'r') as f:
127                      all_segments.extend(json.load(f))
128      else:
129          parser.error("Provide --segments or --segments-dir")
130  
131      print(f"Found {len(all_segments)} segments to cut")
132  
133      successful_clips = []
134      failed_clips = []
135      total_duration = 0
136  
137      for i, segment in enumerate(all_segments, 1):
138          # Find source video
139          if args.source:
140              source_path = Path(args.source)
141          else:
142              video_file = segment.get('video_file', '')
143              source_path = Path(args.source_dir) / video_file
144  
145          if not source_path.exists():
146              print(f"āŒ {i}/{len(all_segments)}: Source video not found: {source_path}")
147              failed_clips.append(segment)
148              continue
149  
150          # Apply buffers
151          start_time = max(0, segment['start_time'] - args.buffer_start)
152          end_time = segment['end_time'] + args.buffer_end
153  
154          # Generate output filename
155          episode_slug = get_episode_title_slug(source_path.name)
156          title_slug = slugify(segment.get('suggested_title', f'clip{i}'))
157          prefix = f"{args.naming_prefix}_" if args.naming_prefix else ""
158          output_filename = f"{prefix}{episode_slug}-clip-{i}-{title_slug}.mp4"
159          output_path = clips_dir / output_filename
160  
161          # Cut the clip
162          duration = end_time - start_time
163          print(f"šŸŽ¬ {i}/{len(all_segments)}: Cutting '{segment.get('suggested_title', 'Untitled')}'")
164          print(f"   Duration: {duration:.0f}s | Output: {output_filename}")
165  
166          success, error = cut_clip(source_path, start_time, end_time, output_path, reencode=args.reencode)
167  
168          if success:
169              file_size = output_path.stat().st_size / (1024 * 1024)
170              total_duration += duration
171  
172              clip_info = {
173                  **segment,
174                  'output_file': output_filename,
175                  'file_size_mb': round(file_size, 1),
176                  'duration_seconds': duration,
177                  'start_time_adjusted': start_time,
178                  'end_time_adjusted': end_time,
179              }
180              successful_clips.append(clip_info)
181              print(f"   āœ… Success! ({file_size:.1f} MB)")
182          else:
183              print(f"   āŒ Failed: {error}")
184              failed_clips.append(segment)
185  
186      # Save clip metadata
187      clips_metadata_file = clips_dir / "clips_metadata.json"
188      with open(clips_metadata_file, 'w') as f:
189          json.dump(successful_clips, f, indent=2)
190  
191      # Summary
192      print(f"\nšŸŽ‰ Clip cutting complete!")
193      print(f"āœ… Successfully cut: {len(successful_clips)} clips")
194      print(f"āŒ Failed: {len(failed_clips)} clips")
195      print(f"šŸ“ Total clips duration: {total_duration/60:.1f} minutes")
196      print(f"šŸ“„ Metadata saved to: {clips_metadata_file}")
197  
198      if successful_clips:
199          print(f"\nšŸ”„ Top clips by hook strength:")
200          top_clips = sorted(successful_clips, key=lambda x: x.get('hook_strength', 0), reverse=True)[:3]
201          for i, clip in enumerate(top_clips, 1):
202              print(f"{i}. {clip.get('suggested_title', 'Untitled')} (hook: {clip.get('hook_strength', '?')}/10)")
203              print(f"   File: {clip['output_file']} ({clip['duration_seconds']:.0f}s)")
204  
205      print(f"\nšŸ“‚ All clips saved to: {clips_dir}")
206  
207  
208  if __name__ == "__main__":
209      main()