/ core / attention / video_processor.py
video_processor.py
  1  #!/usr/bin/env python3
  2  """
  3  Video Processor - Deconstruct video into graph-friendly atoms.
  4  
  5  Handles the "technician with smart glasses streaming video" use case:
  6  1. Video → Audio → Transcript (with timestamps)
  7  2. Video → Keyframes (scene changes, every N seconds)
  8  3. Keyframes → OCR text (signs, labels, readings)
  9  4. Keyframes → Scene descriptions
 10  5. All → Correlated timeline in graph
 11  
 12  Pipeline:
 13      video.mp4
 14 15      ┌──────────────────────────────────────────────┐
 16      │  FFmpeg: Extract audio + keyframes           │
 17      └──────────────────────────────────────────────┘
 18          ↓                              ↓
 19      [audio.wav]                   [frames/]
 20          ↓                              ↓
 21      ┌──────────────┐          ┌──────────────┐
 22      │   Whisper    │          │  Vision API  │
 23      │  Transcribe  │          │  + OCR       │
 24      └──────┬───────┘          └──────┬───────┘
 25             ↓                         ↓
 26      [transcript with               [keyframe
 27       timestamps]                   descriptions]
 28             ↓                         ↓
 29             └────────────┬────────────┘
 30 31                ┌──────────────────┐
 32                │  Graph Atoms     │
 33                │  + Correlations  │
 34                └──────────────────┘
 35  
 36  Usage:
 37      processor = VideoProcessor()
 38      result = processor.process_video(video_path, job_id="job-123")
 39  
 40      # Access results
 41      result.transcript  # Full timestamped transcript
 42      result.keyframes   # List of keyframe data
 43      result.atoms       # Graph-ready atoms
 44  """
 45  
 46  import json
 47  import os
 48  import re
 49  import subprocess
 50  import tempfile
 51  from dataclasses import dataclass, field
 52  from datetime import datetime, timedelta
 53  from pathlib import Path
 54  from typing import Dict, List, Optional, Any, Tuple
 55  import hashlib
 56  
 57  
 58  @dataclass
 59  class TranscriptSegment:
 60      """A segment of transcribed audio with timestamp."""
 61      start_time: float  # seconds
 62      end_time: float
 63      text: str
 64      speaker: str = "unknown"
 65      confidence: float = 1.0
 66  
 67      @property
 68      def duration(self) -> float:
 69          return self.end_time - self.start_time
 70  
 71      def to_dict(self) -> Dict[str, Any]:
 72          return {
 73              'start': self.start_time,
 74              'end': self.end_time,
 75              'text': self.text,
 76              'speaker': self.speaker
 77          }
 78  
 79  
 80  @dataclass
 81  class Keyframe:
 82      """A key frame extracted from video."""
 83      frame_number: int
 84      timestamp: float  # seconds into video
 85      file_path: str
 86  
 87      # Extracted content
 88      ocr_text: str = ""
 89      scene_description: str = ""
 90      objects_detected: List[str] = field(default_factory=list)
 91  
 92      # Analysis
 93      scene_change_score: float = 0.0  # How different from previous frame
 94      is_significant: bool = False  # Worth highlighting
 95  
 96      def to_dict(self) -> Dict[str, Any]:
 97          return {
 98              'frame': self.frame_number,
 99              'timestamp': self.timestamp,
100              'file_path': self.file_path,
101              'ocr_text': self.ocr_text[:200] if self.ocr_text else '',
102              'scene_description': self.scene_description,
103              'objects': self.objects_detected,
104              'significant': self.is_significant
105          }
106  
107  
108  @dataclass
109  class VideoProcessingResult:
110      """Complete result of video processing."""
111      video_id: str
112      source_path: str
113      duration: float  # seconds
114  
115      # Extracted content
116      transcript_segments: List[TranscriptSegment] = field(default_factory=list)
117      keyframes: List[Keyframe] = field(default_factory=list)
118  
119      # Full transcript
120      full_transcript: str = ""
121  
122      # Generated atoms for graph
123      atoms: List[Dict[str, Any]] = field(default_factory=list)
124  
125      # Work context
126      job_id: Optional[str] = None
127      property_name: Optional[str] = None
128  
129      # Processing stats
130      processing_time: float = 0.0
131  
132      @property
133      def significant_keyframes(self) -> List[Keyframe]:
134          return [kf for kf in self.keyframes if kf.is_significant]
135  
136      def to_dict(self) -> Dict[str, Any]:
137          return {
138              'video_id': self.video_id,
139              'duration_seconds': self.duration,
140              'transcript_segments': len(self.transcript_segments),
141              'keyframes': len(self.keyframes),
142              'significant_frames': len(self.significant_keyframes),
143              'atoms_generated': len(self.atoms),
144              'job_id': self.job_id,
145              'processing_time': self.processing_time
146          }
147  
148  
149  class VideoProcessor:
150      """
151      Processes video into graph-friendly atoms.
152  
153      Designed for:
154      - Smart glasses recordings
155      - Phone video captures
156      - Screen recordings
157      - Surveillance/monitoring footage
158      """
159  
160      DEFAULT_OUTPUT_DIR = Path.home() / 'repos/Sovereign_OS/data/video'
161  
162      # Keyframe extraction settings
163      KEYFRAME_INTERVAL = 5.0  # Extract frame every N seconds
164      SCENE_CHANGE_THRESHOLD = 0.3  # Detect scene changes
165  
166      def __init__(
167          self,
168          output_dir: Optional[str] = None,
169          whisper_model: str = "base"  # tiny, base, small, medium, large
170      ):
171          self.output_dir = Path(output_dir) if output_dir else self.DEFAULT_OUTPUT_DIR
172          self.output_dir.mkdir(parents=True, exist_ok=True)
173  
174          self.whisper_model = whisper_model
175  
176          # Check dependencies
177          self._check_dependencies()
178  
179      def _check_dependencies(self):
180          """Check for required command-line tools."""
181          self.has_ffmpeg = self._command_exists('ffmpeg')
182          self.has_whisper = self._command_exists('whisper')
183          self.has_tesseract = self._command_exists('tesseract')
184  
185          if not self.has_ffmpeg:
186              print("[VideoProcessor] Warning: ffmpeg not found. Install with: brew install ffmpeg")
187          if not self.has_whisper:
188              print("[VideoProcessor] Warning: whisper not found. Install with: pip install openai-whisper")
189  
190      def _command_exists(self, cmd: str) -> bool:
191          """Check if a command exists."""
192          try:
193              subprocess.run(['which', cmd], capture_output=True, check=True)
194              return True
195          except:
196              return False
197  
198      def process_video(
199          self,
200          video_path: str,
201          job_id: Optional[str] = None,
202          property_name: Optional[str] = None,
203          extract_keyframes: bool = True,
204          transcribe: bool = True,
205          keyframe_interval: Optional[float] = None
206      ) -> VideoProcessingResult:
207          """
208          Process a video file completely.
209  
210          Args:
211              video_path: Path to video file
212              job_id: Associated work session ID
213              property_name: Property name for context
214              extract_keyframes: Whether to extract keyframes
215              transcribe: Whether to transcribe audio
216              keyframe_interval: Override default keyframe interval
217  
218          Returns:
219              VideoProcessingResult with all extracted content
220          """
221          start_time = datetime.now()
222          video_path = Path(video_path)
223  
224          if not video_path.exists():
225              raise FileNotFoundError(f"Video not found: {video_path}")
226  
227          # Generate video ID from content hash
228          video_hash = self._hash_file(video_path)
229          video_id = f"vid-{video_hash[:12]}"
230  
231          # Create output directory for this video
232          video_output_dir = self.output_dir / video_id
233          video_output_dir.mkdir(exist_ok=True)
234  
235          # Get video metadata
236          duration = self._get_video_duration(video_path)
237  
238          result = VideoProcessingResult(
239              video_id=video_id,
240              source_path=str(video_path),
241              duration=duration,
242              job_id=job_id,
243              property_name=property_name
244          )
245  
246          # Extract keyframes
247          if extract_keyframes and self.has_ffmpeg:
248              interval = keyframe_interval or self.KEYFRAME_INTERVAL
249              result.keyframes = self._extract_keyframes(
250                  video_path, video_output_dir, interval
251              )
252  
253              # Process keyframes (OCR, descriptions)
254              for kf in result.keyframes:
255                  self._process_keyframe(kf)
256  
257          # Transcribe audio
258          if transcribe:
259              result.transcript_segments = self._transcribe_video(
260                  video_path, video_output_dir
261              )
262              result.full_transcript = self._build_full_transcript(
263                  result.transcript_segments
264              )
265  
266          # Generate graph atoms
267          result.atoms = self._generate_atoms(result)
268  
269          # Calculate processing time
270          result.processing_time = (datetime.now() - start_time).total_seconds()
271  
272          # Save result metadata
273          self._save_result(result, video_output_dir)
274  
275          return result
276  
277      def _hash_file(self, path: Path) -> str:
278          """Get hash of file content."""
279          hasher = hashlib.sha256()
280          with open(path, 'rb') as f:
281              # Read first 10MB for large files
282              chunk = f.read(10 * 1024 * 1024)
283              hasher.update(chunk)
284          return hasher.hexdigest()
285  
286      def _get_video_duration(self, video_path: Path) -> float:
287          """Get video duration in seconds."""
288          try:
289              result = subprocess.run([
290                  'ffprobe', '-v', 'error',
291                  '-show_entries', 'format=duration',
292                  '-of', 'default=noprint_wrappers=1:nokey=1',
293                  str(video_path)
294              ], capture_output=True, text=True, check=True)
295              return float(result.stdout.strip())
296          except:
297              return 0.0
298  
299      def _extract_keyframes(
300          self,
301          video_path: Path,
302          output_dir: Path,
303          interval: float
304      ) -> List[Keyframe]:
305          """Extract keyframes from video."""
306          keyframes = []
307          frames_dir = output_dir / 'frames'
308          frames_dir.mkdir(exist_ok=True)
309  
310          try:
311              # Extract frames at interval using ffmpeg
312              subprocess.run([
313                  'ffmpeg', '-y',
314                  '-i', str(video_path),
315                  '-vf', f'fps=1/{interval}',  # 1 frame per N seconds
316                  '-frame_pts', '1',
317                  str(frames_dir / 'frame_%04d.jpg')
318              ], capture_output=True, check=True, timeout=300)
319  
320              # Collect extracted frames
321              for frame_file in sorted(frames_dir.glob('frame_*.jpg')):
322                  # Extract frame number from filename
323                  match = re.search(r'frame_(\d+)', frame_file.name)
324                  if match:
325                      frame_num = int(match.group(1))
326                      timestamp = (frame_num - 1) * interval
327  
328                      keyframe = Keyframe(
329                          frame_number=frame_num,
330                          timestamp=timestamp,
331                          file_path=str(frame_file)
332                      )
333                      keyframes.append(keyframe)
334  
335          except subprocess.TimeoutExpired:
336              print("[VideoProcessor] Keyframe extraction timed out")
337          except Exception as e:
338              print(f"[VideoProcessor] Keyframe extraction error: {e}")
339  
340          return keyframes
341  
342      def _process_keyframe(self, keyframe: Keyframe):
343          """Process a single keyframe (OCR, description)."""
344          # OCR
345          if self.has_tesseract:
346              try:
347                  result = subprocess.run([
348                      'tesseract', keyframe.file_path, 'stdout', '-l', 'eng'
349                  ], capture_output=True, text=True, timeout=30)
350                  if result.returncode == 0:
351                      keyframe.ocr_text = result.stdout.strip()
352              except:
353                  pass
354  
355          # Mark as significant if has readable text
356          if keyframe.ocr_text and len(keyframe.ocr_text) > 20:
357              keyframe.is_significant = True
358  
359      def _transcribe_video(
360          self,
361          video_path: Path,
362          output_dir: Path
363      ) -> List[TranscriptSegment]:
364          """Transcribe audio from video."""
365          segments = []
366          audio_path = output_dir / 'audio.wav'
367  
368          try:
369              # Extract audio with ffmpeg
370              subprocess.run([
371                  'ffmpeg', '-y',
372                  '-i', str(video_path),
373                  '-vn',  # No video
374                  '-acodec', 'pcm_s16le',
375                  '-ar', '16000',  # 16kHz sample rate
376                  '-ac', '1',  # Mono
377                  str(audio_path)
378              ], capture_output=True, check=True, timeout=120)
379  
380              # Transcribe with whisper
381              if self.has_whisper and audio_path.exists():
382                  result = subprocess.run([
383                      'whisper', str(audio_path),
384                      '--model', self.whisper_model,
385                      '--output_format', 'json',
386                      '--output_dir', str(output_dir)
387                  ], capture_output=True, text=True, timeout=600)
388  
389                  # Parse whisper output
390                  json_path = output_dir / 'audio.json'
391                  if json_path.exists():
392                      with open(json_path) as f:
393                          whisper_data = json.load(f)
394  
395                      for seg in whisper_data.get('segments', []):
396                          segments.append(TranscriptSegment(
397                              start_time=seg.get('start', 0),
398                              end_time=seg.get('end', 0),
399                              text=seg.get('text', '').strip(),
400                              confidence=seg.get('avg_logprob', 0)
401                          ))
402  
403          except subprocess.TimeoutExpired:
404              print("[VideoProcessor] Transcription timed out")
405          except Exception as e:
406              print(f"[VideoProcessor] Transcription error: {e}")
407  
408          return segments
409  
410      def _build_full_transcript(self, segments: List[TranscriptSegment]) -> str:
411          """Build full transcript with timestamps."""
412          lines = []
413          for seg in segments:
414              timestamp = self._format_timestamp(seg.start_time)
415              lines.append(f"[{timestamp}] {seg.text}")
416          return '\n'.join(lines)
417  
418      def _format_timestamp(self, seconds: float) -> str:
419          """Format seconds as MM:SS or HH:MM:SS."""
420          minutes, secs = divmod(int(seconds), 60)
421          hours, minutes = divmod(minutes, 60)
422          if hours > 0:
423              return f"{hours:02d}:{minutes:02d}:{secs:02d}"
424          return f"{minutes:02d}:{secs:02d}"
425  
426      def _generate_atoms(self, result: VideoProcessingResult) -> List[Dict[str, Any]]:
427          """Generate graph atoms from video processing result."""
428          atoms = []
429          base_timestamp = datetime.now()
430  
431          # Create atom for each transcript segment
432          for i, seg in enumerate(result.transcript_segments):
433              atom = {
434                  'uuid': f"{result.video_id}-seg-{i:04d}",
435                  'content': seg.text,
436                  'source_type': 'video_transcript',
437                  'session_id': result.job_id or result.video_id,
438                  'timestamp': (base_timestamp + timedelta(seconds=seg.start_time)).isoformat(),
439                  'visible_tags': ['transcript'],
440                  'meta_tags': {
441                      'altitude': 'operational',
442                      'resonance': 0.5,
443                      'video_timestamp': seg.start_time,
444                      'source': 'video'
445                  },
446                  'edges': []
447              }
448  
449              # Link to adjacent segments
450              if i > 0:
451                  atom['edges'].append({
452                      'type': 'follows',
453                      'target': f"{result.video_id}-seg-{i-1:04d}"
454                  })
455  
456              atoms.append(atom)
457  
458          # Create atoms for significant keyframes
459          for kf in result.significant_keyframes:
460              atom = {
461                  'uuid': f"{result.video_id}-kf-{kf.frame_number:04d}",
462                  'content': kf.ocr_text or kf.scene_description or f"Keyframe at {self._format_timestamp(kf.timestamp)}",
463                  'source_type': 'video_keyframe',
464                  'session_id': result.job_id or result.video_id,
465                  'timestamp': (base_timestamp + timedelta(seconds=kf.timestamp)).isoformat(),
466                  'visible_tags': ['keyframe', 'visual'],
467                  'meta_tags': {
468                      'altitude': 'operational',
469                      'resonance': 0.7 if kf.ocr_text else 0.5,
470                      'video_timestamp': kf.timestamp,
471                      'frame_path': kf.file_path,
472                      'source': 'video'
473                  },
474                  'edges': []
475              }
476  
477              # Link to nearest transcript segment
478              nearest_seg = self._find_nearest_segment(
479                  kf.timestamp, result.transcript_segments
480              )
481              if nearest_seg is not None:
482                  atom['edges'].append({
483                      'type': 'relates_to',
484                      'target': f"{result.video_id}-seg-{nearest_seg:04d}"
485                  })
486  
487              atoms.append(atom)
488  
489          return atoms
490  
491      def _find_nearest_segment(
492          self,
493          timestamp: float,
494          segments: List[TranscriptSegment]
495      ) -> Optional[int]:
496          """Find index of nearest transcript segment to a timestamp."""
497          if not segments:
498              return None
499  
500          best_idx = 0
501          best_diff = abs(segments[0].start_time - timestamp)
502  
503          for i, seg in enumerate(segments[1:], 1):
504              diff = abs(seg.start_time - timestamp)
505              if diff < best_diff:
506                  best_diff = diff
507                  best_idx = i
508  
509          return best_idx
510  
511      def _save_result(self, result: VideoProcessingResult, output_dir: Path):
512          """Save processing result metadata."""
513          metadata_path = output_dir / 'metadata.json'
514          with open(metadata_path, 'w') as f:
515              json.dump(result.to_dict(), f, indent=2)
516  
517          # Save full transcript
518          transcript_path = output_dir / 'transcript.txt'
519          transcript_path.write_text(result.full_transcript)
520  
521          # Save atoms for graph ingestion
522          atoms_path = output_dir / 'atoms.jsonl'
523          with open(atoms_path, 'w') as f:
524              for atom in result.atoms:
525                  f.write(json.dumps(atom) + '\n')
526  
527      def get_video_summary(self, video_id: str) -> Optional[Dict[str, Any]]:
528          """Get summary of a processed video."""
529          video_dir = self.output_dir / video_id
530          metadata_path = video_dir / 'metadata.json'
531  
532          if not metadata_path.exists():
533              return None
534  
535          with open(metadata_path) as f:
536              return json.load(f)
537  
538  
539  # Singleton
540  _video_processor: Optional[VideoProcessor] = None
541  
542  
543  def get_video_processor() -> VideoProcessor:
544      """Get or create video processor singleton."""
545      global _video_processor
546      if _video_processor is None:
547          _video_processor = VideoProcessor()
548      return _video_processor
549  
550  
551  if __name__ == '__main__':
552      print("=== Video Processor Test ===\n")
553  
554      processor = VideoProcessor()
555      print(f"FFmpeg available: {processor.has_ffmpeg}")
556      print(f"Whisper available: {processor.has_whisper}")
557      print(f"Tesseract available: {processor.has_tesseract}")
558      print(f"\nOutput directory: {processor.output_dir}")