video_processor.py
1 #!/usr/bin/env python3 2 """ 3 Video Processor - Deconstruct video into graph-friendly atoms. 4 5 Handles the "technician with smart glasses streaming video" use case: 6 1. Video → Audio → Transcript (with timestamps) 7 2. Video → Keyframes (scene changes, every N seconds) 8 3. Keyframes → OCR text (signs, labels, readings) 9 4. Keyframes → Scene descriptions 10 5. All → Correlated timeline in graph 11 12 Pipeline: 13 video.mp4 14 ↓ 15 ┌──────────────────────────────────────────────┐ 16 │ FFmpeg: Extract audio + keyframes │ 17 └──────────────────────────────────────────────┘ 18 ↓ ↓ 19 [audio.wav] [frames/] 20 ↓ ↓ 21 ┌──────────────┐ ┌──────────────┐ 22 │ Whisper │ │ Vision API │ 23 │ Transcribe │ │ + OCR │ 24 └──────┬───────┘ └──────┬───────┘ 25 ↓ ↓ 26 [transcript with [keyframe 27 timestamps] descriptions] 28 ↓ ↓ 29 └────────────┬────────────┘ 30 ↓ 31 ┌──────────────────┐ 32 │ Graph Atoms │ 33 │ + Correlations │ 34 └──────────────────┘ 35 36 Usage: 37 processor = VideoProcessor() 38 result = processor.process_video(video_path, job_id="job-123") 39 40 # Access results 41 result.transcript # Full timestamped transcript 42 result.keyframes # List of keyframe data 43 result.atoms # Graph-ready atoms 44 """ 45 46 import json 47 import os 48 import re 49 import subprocess 50 import tempfile 51 from dataclasses import dataclass, field 52 from datetime import datetime, timedelta 53 from pathlib import Path 54 from typing import Dict, List, Optional, Any, Tuple 55 import hashlib 56 57 58 @dataclass 59 class TranscriptSegment: 60 """A segment of transcribed audio with timestamp.""" 61 start_time: float # seconds 62 end_time: float 63 text: str 64 speaker: str = "unknown" 65 confidence: float = 1.0 66 67 @property 68 def duration(self) -> float: 69 return self.end_time - self.start_time 70 71 def to_dict(self) -> Dict[str, Any]: 72 return { 73 'start': self.start_time, 74 'end': self.end_time, 75 'text': self.text, 76 'speaker': self.speaker 77 } 78 79 80 @dataclass 81 class Keyframe: 82 """A key frame extracted from video.""" 83 frame_number: int 84 timestamp: float # seconds into video 85 file_path: str 86 87 # Extracted content 88 ocr_text: str = "" 89 scene_description: str = "" 90 objects_detected: List[str] = field(default_factory=list) 91 92 # Analysis 93 scene_change_score: float = 0.0 # How different from previous frame 94 is_significant: bool = False # Worth highlighting 95 96 def to_dict(self) -> Dict[str, Any]: 97 return { 98 'frame': self.frame_number, 99 'timestamp': self.timestamp, 100 'file_path': self.file_path, 101 'ocr_text': self.ocr_text[:200] if self.ocr_text else '', 102 'scene_description': self.scene_description, 103 'objects': self.objects_detected, 104 'significant': self.is_significant 105 } 106 107 108 @dataclass 109 class VideoProcessingResult: 110 """Complete result of video processing.""" 111 video_id: str 112 source_path: str 113 duration: float # seconds 114 115 # Extracted content 116 transcript_segments: List[TranscriptSegment] = field(default_factory=list) 117 keyframes: List[Keyframe] = field(default_factory=list) 118 119 # Full transcript 120 full_transcript: str = "" 121 122 # Generated atoms for graph 123 atoms: List[Dict[str, Any]] = field(default_factory=list) 124 125 # Work context 126 job_id: Optional[str] = None 127 property_name: Optional[str] = None 128 129 # Processing stats 130 processing_time: float = 0.0 131 132 @property 133 def significant_keyframes(self) -> List[Keyframe]: 134 return [kf for kf in self.keyframes if kf.is_significant] 135 136 def to_dict(self) -> Dict[str, Any]: 137 return { 138 'video_id': self.video_id, 139 'duration_seconds': self.duration, 140 'transcript_segments': len(self.transcript_segments), 141 'keyframes': len(self.keyframes), 142 'significant_frames': len(self.significant_keyframes), 143 'atoms_generated': len(self.atoms), 144 'job_id': self.job_id, 145 'processing_time': self.processing_time 146 } 147 148 149 class VideoProcessor: 150 """ 151 Processes video into graph-friendly atoms. 152 153 Designed for: 154 - Smart glasses recordings 155 - Phone video captures 156 - Screen recordings 157 - Surveillance/monitoring footage 158 """ 159 160 DEFAULT_OUTPUT_DIR = Path.home() / 'repos/Sovereign_OS/data/video' 161 162 # Keyframe extraction settings 163 KEYFRAME_INTERVAL = 5.0 # Extract frame every N seconds 164 SCENE_CHANGE_THRESHOLD = 0.3 # Detect scene changes 165 166 def __init__( 167 self, 168 output_dir: Optional[str] = None, 169 whisper_model: str = "base" # tiny, base, small, medium, large 170 ): 171 self.output_dir = Path(output_dir) if output_dir else self.DEFAULT_OUTPUT_DIR 172 self.output_dir.mkdir(parents=True, exist_ok=True) 173 174 self.whisper_model = whisper_model 175 176 # Check dependencies 177 self._check_dependencies() 178 179 def _check_dependencies(self): 180 """Check for required command-line tools.""" 181 self.has_ffmpeg = self._command_exists('ffmpeg') 182 self.has_whisper = self._command_exists('whisper') 183 self.has_tesseract = self._command_exists('tesseract') 184 185 if not self.has_ffmpeg: 186 print("[VideoProcessor] Warning: ffmpeg not found. Install with: brew install ffmpeg") 187 if not self.has_whisper: 188 print("[VideoProcessor] Warning: whisper not found. Install with: pip install openai-whisper") 189 190 def _command_exists(self, cmd: str) -> bool: 191 """Check if a command exists.""" 192 try: 193 subprocess.run(['which', cmd], capture_output=True, check=True) 194 return True 195 except: 196 return False 197 198 def process_video( 199 self, 200 video_path: str, 201 job_id: Optional[str] = None, 202 property_name: Optional[str] = None, 203 extract_keyframes: bool = True, 204 transcribe: bool = True, 205 keyframe_interval: Optional[float] = None 206 ) -> VideoProcessingResult: 207 """ 208 Process a video file completely. 209 210 Args: 211 video_path: Path to video file 212 job_id: Associated work session ID 213 property_name: Property name for context 214 extract_keyframes: Whether to extract keyframes 215 transcribe: Whether to transcribe audio 216 keyframe_interval: Override default keyframe interval 217 218 Returns: 219 VideoProcessingResult with all extracted content 220 """ 221 start_time = datetime.now() 222 video_path = Path(video_path) 223 224 if not video_path.exists(): 225 raise FileNotFoundError(f"Video not found: {video_path}") 226 227 # Generate video ID from content hash 228 video_hash = self._hash_file(video_path) 229 video_id = f"vid-{video_hash[:12]}" 230 231 # Create output directory for this video 232 video_output_dir = self.output_dir / video_id 233 video_output_dir.mkdir(exist_ok=True) 234 235 # Get video metadata 236 duration = self._get_video_duration(video_path) 237 238 result = VideoProcessingResult( 239 video_id=video_id, 240 source_path=str(video_path), 241 duration=duration, 242 job_id=job_id, 243 property_name=property_name 244 ) 245 246 # Extract keyframes 247 if extract_keyframes and self.has_ffmpeg: 248 interval = keyframe_interval or self.KEYFRAME_INTERVAL 249 result.keyframes = self._extract_keyframes( 250 video_path, video_output_dir, interval 251 ) 252 253 # Process keyframes (OCR, descriptions) 254 for kf in result.keyframes: 255 self._process_keyframe(kf) 256 257 # Transcribe audio 258 if transcribe: 259 result.transcript_segments = self._transcribe_video( 260 video_path, video_output_dir 261 ) 262 result.full_transcript = self._build_full_transcript( 263 result.transcript_segments 264 ) 265 266 # Generate graph atoms 267 result.atoms = self._generate_atoms(result) 268 269 # Calculate processing time 270 result.processing_time = (datetime.now() - start_time).total_seconds() 271 272 # Save result metadata 273 self._save_result(result, video_output_dir) 274 275 return result 276 277 def _hash_file(self, path: Path) -> str: 278 """Get hash of file content.""" 279 hasher = hashlib.sha256() 280 with open(path, 'rb') as f: 281 # Read first 10MB for large files 282 chunk = f.read(10 * 1024 * 1024) 283 hasher.update(chunk) 284 return hasher.hexdigest() 285 286 def _get_video_duration(self, video_path: Path) -> float: 287 """Get video duration in seconds.""" 288 try: 289 result = subprocess.run([ 290 'ffprobe', '-v', 'error', 291 '-show_entries', 'format=duration', 292 '-of', 'default=noprint_wrappers=1:nokey=1', 293 str(video_path) 294 ], capture_output=True, text=True, check=True) 295 return float(result.stdout.strip()) 296 except: 297 return 0.0 298 299 def _extract_keyframes( 300 self, 301 video_path: Path, 302 output_dir: Path, 303 interval: float 304 ) -> List[Keyframe]: 305 """Extract keyframes from video.""" 306 keyframes = [] 307 frames_dir = output_dir / 'frames' 308 frames_dir.mkdir(exist_ok=True) 309 310 try: 311 # Extract frames at interval using ffmpeg 312 subprocess.run([ 313 'ffmpeg', '-y', 314 '-i', str(video_path), 315 '-vf', f'fps=1/{interval}', # 1 frame per N seconds 316 '-frame_pts', '1', 317 str(frames_dir / 'frame_%04d.jpg') 318 ], capture_output=True, check=True, timeout=300) 319 320 # Collect extracted frames 321 for frame_file in sorted(frames_dir.glob('frame_*.jpg')): 322 # Extract frame number from filename 323 match = re.search(r'frame_(\d+)', frame_file.name) 324 if match: 325 frame_num = int(match.group(1)) 326 timestamp = (frame_num - 1) * interval 327 328 keyframe = Keyframe( 329 frame_number=frame_num, 330 timestamp=timestamp, 331 file_path=str(frame_file) 332 ) 333 keyframes.append(keyframe) 334 335 except subprocess.TimeoutExpired: 336 print("[VideoProcessor] Keyframe extraction timed out") 337 except Exception as e: 338 print(f"[VideoProcessor] Keyframe extraction error: {e}") 339 340 return keyframes 341 342 def _process_keyframe(self, keyframe: Keyframe): 343 """Process a single keyframe (OCR, description).""" 344 # OCR 345 if self.has_tesseract: 346 try: 347 result = subprocess.run([ 348 'tesseract', keyframe.file_path, 'stdout', '-l', 'eng' 349 ], capture_output=True, text=True, timeout=30) 350 if result.returncode == 0: 351 keyframe.ocr_text = result.stdout.strip() 352 except: 353 pass 354 355 # Mark as significant if has readable text 356 if keyframe.ocr_text and len(keyframe.ocr_text) > 20: 357 keyframe.is_significant = True 358 359 def _transcribe_video( 360 self, 361 video_path: Path, 362 output_dir: Path 363 ) -> List[TranscriptSegment]: 364 """Transcribe audio from video.""" 365 segments = [] 366 audio_path = output_dir / 'audio.wav' 367 368 try: 369 # Extract audio with ffmpeg 370 subprocess.run([ 371 'ffmpeg', '-y', 372 '-i', str(video_path), 373 '-vn', # No video 374 '-acodec', 'pcm_s16le', 375 '-ar', '16000', # 16kHz sample rate 376 '-ac', '1', # Mono 377 str(audio_path) 378 ], capture_output=True, check=True, timeout=120) 379 380 # Transcribe with whisper 381 if self.has_whisper and audio_path.exists(): 382 result = subprocess.run([ 383 'whisper', str(audio_path), 384 '--model', self.whisper_model, 385 '--output_format', 'json', 386 '--output_dir', str(output_dir) 387 ], capture_output=True, text=True, timeout=600) 388 389 # Parse whisper output 390 json_path = output_dir / 'audio.json' 391 if json_path.exists(): 392 with open(json_path) as f: 393 whisper_data = json.load(f) 394 395 for seg in whisper_data.get('segments', []): 396 segments.append(TranscriptSegment( 397 start_time=seg.get('start', 0), 398 end_time=seg.get('end', 0), 399 text=seg.get('text', '').strip(), 400 confidence=seg.get('avg_logprob', 0) 401 )) 402 403 except subprocess.TimeoutExpired: 404 print("[VideoProcessor] Transcription timed out") 405 except Exception as e: 406 print(f"[VideoProcessor] Transcription error: {e}") 407 408 return segments 409 410 def _build_full_transcript(self, segments: List[TranscriptSegment]) -> str: 411 """Build full transcript with timestamps.""" 412 lines = [] 413 for seg in segments: 414 timestamp = self._format_timestamp(seg.start_time) 415 lines.append(f"[{timestamp}] {seg.text}") 416 return '\n'.join(lines) 417 418 def _format_timestamp(self, seconds: float) -> str: 419 """Format seconds as MM:SS or HH:MM:SS.""" 420 minutes, secs = divmod(int(seconds), 60) 421 hours, minutes = divmod(minutes, 60) 422 if hours > 0: 423 return f"{hours:02d}:{minutes:02d}:{secs:02d}" 424 return f"{minutes:02d}:{secs:02d}" 425 426 def _generate_atoms(self, result: VideoProcessingResult) -> List[Dict[str, Any]]: 427 """Generate graph atoms from video processing result.""" 428 atoms = [] 429 base_timestamp = datetime.now() 430 431 # Create atom for each transcript segment 432 for i, seg in enumerate(result.transcript_segments): 433 atom = { 434 'uuid': f"{result.video_id}-seg-{i:04d}", 435 'content': seg.text, 436 'source_type': 'video_transcript', 437 'session_id': result.job_id or result.video_id, 438 'timestamp': (base_timestamp + timedelta(seconds=seg.start_time)).isoformat(), 439 'visible_tags': ['transcript'], 440 'meta_tags': { 441 'altitude': 'operational', 442 'resonance': 0.5, 443 'video_timestamp': seg.start_time, 444 'source': 'video' 445 }, 446 'edges': [] 447 } 448 449 # Link to adjacent segments 450 if i > 0: 451 atom['edges'].append({ 452 'type': 'follows', 453 'target': f"{result.video_id}-seg-{i-1:04d}" 454 }) 455 456 atoms.append(atom) 457 458 # Create atoms for significant keyframes 459 for kf in result.significant_keyframes: 460 atom = { 461 'uuid': f"{result.video_id}-kf-{kf.frame_number:04d}", 462 'content': kf.ocr_text or kf.scene_description or f"Keyframe at {self._format_timestamp(kf.timestamp)}", 463 'source_type': 'video_keyframe', 464 'session_id': result.job_id or result.video_id, 465 'timestamp': (base_timestamp + timedelta(seconds=kf.timestamp)).isoformat(), 466 'visible_tags': ['keyframe', 'visual'], 467 'meta_tags': { 468 'altitude': 'operational', 469 'resonance': 0.7 if kf.ocr_text else 0.5, 470 'video_timestamp': kf.timestamp, 471 'frame_path': kf.file_path, 472 'source': 'video' 473 }, 474 'edges': [] 475 } 476 477 # Link to nearest transcript segment 478 nearest_seg = self._find_nearest_segment( 479 kf.timestamp, result.transcript_segments 480 ) 481 if nearest_seg is not None: 482 atom['edges'].append({ 483 'type': 'relates_to', 484 'target': f"{result.video_id}-seg-{nearest_seg:04d}" 485 }) 486 487 atoms.append(atom) 488 489 return atoms 490 491 def _find_nearest_segment( 492 self, 493 timestamp: float, 494 segments: List[TranscriptSegment] 495 ) -> Optional[int]: 496 """Find index of nearest transcript segment to a timestamp.""" 497 if not segments: 498 return None 499 500 best_idx = 0 501 best_diff = abs(segments[0].start_time - timestamp) 502 503 for i, seg in enumerate(segments[1:], 1): 504 diff = abs(seg.start_time - timestamp) 505 if diff < best_diff: 506 best_diff = diff 507 best_idx = i 508 509 return best_idx 510 511 def _save_result(self, result: VideoProcessingResult, output_dir: Path): 512 """Save processing result metadata.""" 513 metadata_path = output_dir / 'metadata.json' 514 with open(metadata_path, 'w') as f: 515 json.dump(result.to_dict(), f, indent=2) 516 517 # Save full transcript 518 transcript_path = output_dir / 'transcript.txt' 519 transcript_path.write_text(result.full_transcript) 520 521 # Save atoms for graph ingestion 522 atoms_path = output_dir / 'atoms.jsonl' 523 with open(atoms_path, 'w') as f: 524 for atom in result.atoms: 525 f.write(json.dumps(atom) + '\n') 526 527 def get_video_summary(self, video_id: str) -> Optional[Dict[str, Any]]: 528 """Get summary of a processed video.""" 529 video_dir = self.output_dir / video_id 530 metadata_path = video_dir / 'metadata.json' 531 532 if not metadata_path.exists(): 533 return None 534 535 with open(metadata_path) as f: 536 return json.load(f) 537 538 539 # Singleton 540 _video_processor: Optional[VideoProcessor] = None 541 542 543 def get_video_processor() -> VideoProcessor: 544 """Get or create video processor singleton.""" 545 global _video_processor 546 if _video_processor is None: 547 _video_processor = VideoProcessor() 548 return _video_processor 549 550 551 if __name__ == '__main__': 552 print("=== Video Processor Test ===\n") 553 554 processor = VideoProcessor() 555 print(f"FFmpeg available: {processor.has_ffmpeg}") 556 print(f"Whisper available: {processor.has_whisper}") 557 print(f"Tesseract available: {processor.has_tesseract}") 558 print(f"\nOutput directory: {processor.output_dir}")