/ video-clip-pipeline / clip_cutter.py
clip_cutter.py
1 #!/usr/bin/env python3 2 """ 3 Clip Cutter - Cut video clips from segments identified by the AI segmenter. 4 Uses FFmpeg stream copy for speed (under 5 seconds per clip, zero quality loss). 5 6 Usage: 7 python3 clip_cutter.py --source video.mp4 --segments segments.json --output-dir clips/ 8 python3 clip_cutter.py --source-dir downloads/ --segments-dir segments/ --output-dir clips/ 9 """ 10 11 import argparse 12 import json 13 import os 14 import subprocess 15 import re 16 from pathlib import Path 17 from datetime import datetime 18 19 20 def slugify(text): 21 """Convert text to URL-friendly slug""" 22 slug = re.sub(r'[^\w\s-]', '', text.lower()) 23 slug = re.sub(r'[-\s]+', '-', slug) 24 return slug.strip('-') 25 26 27 def seconds_to_timestamp(seconds): 28 """Convert seconds to HH:MM:SS.mmm format for FFmpeg""" 29 hours = int(seconds // 3600) 30 minutes = int((seconds % 3600) // 60) 31 secs = seconds % 60 32 return f"{hours:02d}:{minutes:02d}:{secs:06.3f}" 33 34 35 def cut_clip(source_video, start_time, end_time, output_path, reencode=False): 36 """Use FFmpeg to cut a clip from source video. 37 38 Args: 39 source_video: Path to source MP4 40 start_time: Start time in seconds 41 end_time: End time in seconds 42 output_path: Path for output clip 43 reencode: If True, re-encode for frame-accurate cuts (slower but precise) 44 """ 45 start_ts = seconds_to_timestamp(start_time) 46 end_ts = seconds_to_timestamp(end_time) 47 48 if reencode: 49 # Frame-accurate but slower 50 cmd = [ 51 'ffmpeg', '-y', 52 '-ss', start_ts, 53 '-i', str(source_video), 54 '-to', seconds_to_timestamp(end_time - start_time), 55 '-c:v', 'libx264', '-c:a', 'aac', 56 '-avoid_negative_ts', 'make_zero', 57 str(output_path) 58 ] 59 else: 60 # Stream copy ā instant, zero quality loss, but keyframe-aligned (±1-2 sec) 61 cmd = [ 62 'ffmpeg', '-y', 63 '-ss', start_ts, 64 '-to', end_ts, 65 '-i', str(source_video), 66 '-c', 'copy', 67 '-avoid_negative_ts', 'make_zero', 68 str(output_path) 69 ] 70 71 try: 72 result = subprocess.run(cmd, capture_output=True, text=True, check=True) 73 return True, "" 74 except subprocess.CalledProcessError as e: 75 return False, f"FFmpeg error: {e.stderr}" 76 77 78 def get_episode_title_slug(filename): 79 """Extract a clean title slug from the video filename""" 80 name = filename.replace('.mp4', '') 81 parts = name.split('_', 1) 82 if len(parts) > 1: 83 title = parts[1] 84 else: 85 title = name 86 return slugify(title) 87 88 89 def main(): 90 parser = argparse.ArgumentParser(description="Cut video clips from segment metadata") 91 parser.add_argument("--source", help="Path to source video MP4") 92 parser.add_argument("--source-dir", help="Directory containing source MP4s (batch mode)") 93 parser.add_argument("--segments", help="Path to segment metadata JSON") 94 parser.add_argument("--segments-dir", help="Directory of segment JSONs (batch mode)") 95 parser.add_argument("--output-dir", default="clips", help="Output directory for clips (default: clips)") 96 parser.add_argument("--buffer-start", type=float, default=0, help="Seconds to add before clip start (default: 0)") 97 parser.add_argument("--buffer-end", type=float, default=0, help="Seconds to add after clip end (default: 0)") 98 parser.add_argument("--naming-prefix", default="", help="Prefix for output filenames") 99 parser.add_argument("--reencode", action="store_true", help="Re-encode for frame-accurate cuts (slower)") 100 args = parser.parse_args() 101 102 if not args.source and not args.source_dir: 103 parser.error("Provide --source or --source-dir") 104 105 clips_dir = Path(args.output_dir) 106 clips_dir.mkdir(parents=True, exist_ok=True) 107 108 # Load segments 109 if args.segments: 110 with open(args.segments, 'r') as f: 111 all_segments = json.load(f) 112 # Ensure each segment has a video_file reference 113 if args.source: 114 for seg in all_segments: 115 if 'video_file' not in seg: 116 seg['video_file'] = Path(args.source).name 117 elif args.segments_dir: 118 segments_dir = Path(args.segments_dir) 119 combined_file = segments_dir / "all_segments.json" 120 if combined_file.exists(): 121 with open(combined_file, 'r') as f: 122 all_segments = json.load(f) 123 else: 124 all_segments = [] 125 for seg_file in segments_dir.glob("*_segments.json"): 126 with open(seg_file, 'r') as f: 127 all_segments.extend(json.load(f)) 128 else: 129 parser.error("Provide --segments or --segments-dir") 130 131 print(f"Found {len(all_segments)} segments to cut") 132 133 successful_clips = [] 134 failed_clips = [] 135 total_duration = 0 136 137 for i, segment in enumerate(all_segments, 1): 138 # Find source video 139 if args.source: 140 source_path = Path(args.source) 141 else: 142 video_file = segment.get('video_file', '') 143 source_path = Path(args.source_dir) / video_file 144 145 if not source_path.exists(): 146 print(f"ā {i}/{len(all_segments)}: Source video not found: {source_path}") 147 failed_clips.append(segment) 148 continue 149 150 # Apply buffers 151 start_time = max(0, segment['start_time'] - args.buffer_start) 152 end_time = segment['end_time'] + args.buffer_end 153 154 # Generate output filename 155 episode_slug = get_episode_title_slug(source_path.name) 156 title_slug = slugify(segment.get('suggested_title', f'clip{i}')) 157 prefix = f"{args.naming_prefix}_" if args.naming_prefix else "" 158 output_filename = f"{prefix}{episode_slug}-clip-{i}-{title_slug}.mp4" 159 output_path = clips_dir / output_filename 160 161 # Cut the clip 162 duration = end_time - start_time 163 print(f"š¬ {i}/{len(all_segments)}: Cutting '{segment.get('suggested_title', 'Untitled')}'") 164 print(f" Duration: {duration:.0f}s | Output: {output_filename}") 165 166 success, error = cut_clip(source_path, start_time, end_time, output_path, reencode=args.reencode) 167 168 if success: 169 file_size = output_path.stat().st_size / (1024 * 1024) 170 total_duration += duration 171 172 clip_info = { 173 **segment, 174 'output_file': output_filename, 175 'file_size_mb': round(file_size, 1), 176 'duration_seconds': duration, 177 'start_time_adjusted': start_time, 178 'end_time_adjusted': end_time, 179 } 180 successful_clips.append(clip_info) 181 print(f" ā Success! ({file_size:.1f} MB)") 182 else: 183 print(f" ā Failed: {error}") 184 failed_clips.append(segment) 185 186 # Save clip metadata 187 clips_metadata_file = clips_dir / "clips_metadata.json" 188 with open(clips_metadata_file, 'w') as f: 189 json.dump(successful_clips, f, indent=2) 190 191 # Summary 192 print(f"\nš Clip cutting complete!") 193 print(f"ā Successfully cut: {len(successful_clips)} clips") 194 print(f"ā Failed: {len(failed_clips)} clips") 195 print(f"š Total clips duration: {total_duration/60:.1f} minutes") 196 print(f"š Metadata saved to: {clips_metadata_file}") 197 198 if successful_clips: 199 print(f"\nš„ Top clips by hook strength:") 200 top_clips = sorted(successful_clips, key=lambda x: x.get('hook_strength', 0), reverse=True)[:3] 201 for i, clip in enumerate(top_clips, 1): 202 print(f"{i}. {clip.get('suggested_title', 'Untitled')} (hook: {clip.get('hook_strength', '?')}/10)") 203 print(f" File: {clip['output_file']} ({clip['duration_seconds']:.0f}s)") 204 205 print(f"\nš All clips saved to: {clips_dir}") 206 207 208 if __name__ == "__main__": 209 main()