/ core / attention / visual_capture.py
visual_capture.py
  1  #!/usr/bin/env python3
  2  """
  3  Visual Capture Pipeline
  4  
  5  Ingests photos, videos, and documents for correlation with audio/sessions.
  6  
  7  Sources:
  8  - iPhone camera (via Shortcut POST)
  9  - Meta Glasses (via iCloud Photos sync)
 10  - iPad scanner (document scans)
 11  - Video clips (key frame extraction)
 12  
 13  Processing:
 14  - OCR for text extraction (Apple Vision / tesseract)
 15  - Scene description (vision model)
 16  - GPS/timestamp correlation
 17  - Thumbnail generation
 18  
 19  Usage:
 20      from visual_capture import VisualCapture
 21  
 22      capture = VisualCapture()
 23      result = capture.ingest_image(image_data, metadata)
 24      # Returns: {id, text, description, timestamp, correlations}
 25  """
 26  
 27  import base64
 28  import hashlib
 29  import json
 30  import os
 31  import subprocess
 32  from dataclasses import dataclass, field
 33  from datetime import datetime, timedelta
 34  from pathlib import Path
 35  from typing import Dict, List, Optional, Any, Tuple
 36  import sqlite3
 37  
 38  
 39  @dataclass
 40  class VisualAsset:
 41      """A captured visual asset (photo, video frame, document)."""
 42      id: str
 43      asset_type: str  # 'photo', 'video', 'document', 'screenshot'
 44      timestamp: datetime
 45      source: str  # 'iphone', 'meta_glasses', 'ipad', 'screenshot'
 46  
 47      # File info
 48      file_path: Optional[str] = None
 49      thumbnail_path: Optional[str] = None
 50      file_size: int = 0
 51  
 52      # Extracted content
 53      ocr_text: str = ""
 54      scene_description: str = ""
 55  
 56      # Location
 57      gps_lat: Optional[float] = None
 58      gps_lon: Optional[float] = None
 59      location_name: str = ""
 60  
 61      # Work context
 62      job_id: Optional[str] = None
 63      property_id: Optional[str] = None
 64      tags: List[str] = field(default_factory=list)
 65  
 66      # Correlation
 67      correlated_audio: List[str] = field(default_factory=list)  # Audio segment IDs
 68      correlated_atoms: List[str] = field(default_factory=list)  # Graph atom UUIDs
 69  
 70      def to_dict(self) -> Dict[str, Any]:
 71          return {
 72              'id': self.id,
 73              'asset_type': self.asset_type,
 74              'timestamp': self.timestamp.isoformat(),
 75              'source': self.source,
 76              'file_path': self.file_path,
 77              'ocr_text': self.ocr_text[:500] if self.ocr_text else '',
 78              'scene_description': self.scene_description,
 79              'gps': {'lat': self.gps_lat, 'lon': self.gps_lon} if self.gps_lat else None,
 80              'location_name': self.location_name,
 81              'job_id': self.job_id,
 82              'tags': self.tags,
 83              'correlation_count': len(self.correlated_audio) + len(self.correlated_atoms)
 84          }
 85  
 86  
 87  @dataclass
 88  class WorkSession:
 89      """A work session (e.g., technician job at a property)."""
 90      id: str
 91      property_name: str
 92      started_at: datetime
 93      ended_at: Optional[datetime] = None
 94  
 95      # Captured content
 96      assets: List[str] = field(default_factory=list)  # Asset IDs
 97      audio_segments: List[str] = field(default_factory=list)
 98  
 99      # Work data
100      issue_description: str = ""
101      resolution: str = ""
102      parts_used: List[str] = field(default_factory=list)
103      technician: str = ""
104  
105      # Generated
106      work_notes: str = ""
107  
108      @property
109      def duration_minutes(self) -> float:
110          if self.ended_at:
111              return (self.ended_at - self.started_at).total_seconds() / 60
112          return (datetime.now() - self.started_at).total_seconds() / 60
113  
114  
115  class VisualCapture:
116      """
117      Manages visual capture and correlation.
118      """
119  
120      DEFAULT_DATA_DIR = Path.home() / 'repos/Sovereign_OS/data/visual'
121  
122      def __init__(self, data_dir: Optional[str] = None):
123          self.data_dir = Path(data_dir) if data_dir else self.DEFAULT_DATA_DIR
124          self.data_dir.mkdir(parents=True, exist_ok=True)
125  
126          # Subdirectories
127          self.images_dir = self.data_dir / 'images'
128          self.thumbnails_dir = self.data_dir / 'thumbnails'
129          self.images_dir.mkdir(exist_ok=True)
130          self.thumbnails_dir.mkdir(exist_ok=True)
131  
132          # Database
133          self.db_path = self.data_dir / 'visual.db'
134          self._init_db()
135  
136          # Active work sessions
137          self._active_sessions: Dict[str, WorkSession] = {}
138  
139      def _init_db(self):
140          """Initialize SQLite database."""
141          conn = sqlite3.connect(str(self.db_path))
142          conn.execute('''
143              CREATE TABLE IF NOT EXISTS assets (
144                  id TEXT PRIMARY KEY,
145                  asset_type TEXT,
146                  timestamp TEXT,
147                  source TEXT,
148                  file_path TEXT,
149                  thumbnail_path TEXT,
150                  ocr_text TEXT,
151                  scene_description TEXT,
152                  gps_lat REAL,
153                  gps_lon REAL,
154                  location_name TEXT,
155                  job_id TEXT,
156                  property_id TEXT,
157                  tags TEXT,
158                  metadata TEXT,
159                  created_at TEXT
160              )
161          ''')
162          conn.execute('''
163              CREATE TABLE IF NOT EXISTS work_sessions (
164                  id TEXT PRIMARY KEY,
165                  property_name TEXT,
166                  started_at TEXT,
167                  ended_at TEXT,
168                  assets TEXT,
169                  audio_segments TEXT,
170                  issue_description TEXT,
171                  resolution TEXT,
172                  parts_used TEXT,
173                  technician TEXT,
174                  work_notes TEXT
175              )
176          ''')
177          conn.execute('''
178              CREATE INDEX IF NOT EXISTS idx_assets_timestamp
179              ON assets(timestamp)
180          ''')
181          conn.execute('''
182              CREATE INDEX IF NOT EXISTS idx_assets_job
183              ON assets(job_id)
184          ''')
185          conn.commit()
186          conn.close()
187  
188      def ingest_image(
189          self,
190          image_data: bytes,
191          source: str = 'iphone',
192          asset_type: str = 'photo',
193          timestamp: Optional[datetime] = None,
194          gps: Optional[Tuple[float, float]] = None,
195          job_id: Optional[str] = None,
196          tags: Optional[List[str]] = None,
197          metadata: Optional[Dict[str, Any]] = None
198      ) -> VisualAsset:
199          """
200          Ingest an image and process it.
201  
202          Args:
203              image_data: Raw image bytes (JPEG/PNG)
204              source: Where it came from
205              asset_type: Type of asset
206              timestamp: When captured (default: now)
207              gps: (lat, lon) tuple
208              job_id: Associated work session
209              tags: Manual tags
210              metadata: Additional metadata
211  
212          Returns:
213              VisualAsset with extracted content
214          """
215          timestamp = timestamp or datetime.now()
216          tags = tags or []
217          metadata = metadata or {}
218  
219          # Generate ID from content hash
220          content_hash = hashlib.sha256(image_data).hexdigest()[:12]
221          asset_id = f"{timestamp.strftime('%Y%m%d%H%M%S')}-{content_hash}"
222  
223          # Save image
224          date_dir = self.images_dir / timestamp.strftime('%Y-%m-%d')
225          date_dir.mkdir(exist_ok=True)
226  
227          # Detect format from magic bytes
228          ext = 'jpg'
229          if image_data[:4] == b'\x89PNG':
230              ext = 'png'
231          elif image_data[:4] == b'%PDF':
232              ext = 'pdf'
233  
234          file_path = date_dir / f"{asset_id}.{ext}"
235          file_path.write_bytes(image_data)
236  
237          # Create thumbnail
238          thumbnail_path = self._create_thumbnail(file_path, asset_id)
239  
240          # Run OCR
241          ocr_text = self._extract_text(file_path)
242  
243          # Scene description (placeholder - would use vision model)
244          scene_description = self._describe_scene(file_path)
245  
246          # Create asset
247          asset = VisualAsset(
248              id=asset_id,
249              asset_type=asset_type,
250              timestamp=timestamp,
251              source=source,
252              file_path=str(file_path),
253              thumbnail_path=str(thumbnail_path) if thumbnail_path else None,
254              file_size=len(image_data),
255              ocr_text=ocr_text,
256              scene_description=scene_description,
257              gps_lat=gps[0] if gps else None,
258              gps_lon=gps[1] if gps else None,
259              job_id=job_id,
260              tags=tags
261          )
262  
263          # Add to active session if any
264          if job_id and job_id in self._active_sessions:
265              self._active_sessions[job_id].assets.append(asset_id)
266  
267          # Store in database
268          self._store_asset(asset, metadata)
269  
270          return asset
271  
272      def _create_thumbnail(self, file_path: Path, asset_id: str) -> Optional[Path]:
273          """Create thumbnail using sips (macOS)."""
274          try:
275              thumb_path = self.thumbnails_dir / f"{asset_id}_thumb.jpg"
276  
277              # Use sips for macOS thumbnail generation
278              subprocess.run([
279                  'sips', '-Z', '300',
280                  '--setProperty', 'format', 'jpeg',
281                  str(file_path),
282                  '--out', str(thumb_path)
283              ], capture_output=True, check=True, timeout=10)
284  
285              return thumb_path
286          except Exception as e:
287              print(f"[VisualCapture] Thumbnail error: {e}")
288              return None
289  
290      def _extract_text(self, file_path: Path) -> str:
291          """Extract text using macOS Vision framework via shortcuts."""
292          try:
293              # Use macOS screencapture with OCR or Vision framework
294              # For now, try tesseract if available
295              result = subprocess.run(
296                  ['tesseract', str(file_path), 'stdout', '-l', 'eng'],
297                  capture_output=True,
298                  text=True,
299                  timeout=30
300              )
301              if result.returncode == 0:
302                  return result.stdout.strip()
303          except FileNotFoundError:
304              pass  # tesseract not installed
305          except Exception as e:
306              print(f"[VisualCapture] OCR error: {e}")
307  
308          return ""
309  
310      def _describe_scene(self, file_path: Path) -> str:
311          """
312          Describe the scene in the image.
313  
314          This would ideally use a vision model. For now, return placeholder.
315          """
316          # Future: Use local vision model or API
317          return ""
318  
319      def _store_asset(self, asset: VisualAsset, metadata: Dict[str, Any]):
320          """Store asset in database."""
321          conn = sqlite3.connect(str(self.db_path))
322          conn.execute('''
323              INSERT OR REPLACE INTO assets
324              (id, asset_type, timestamp, source, file_path, thumbnail_path,
325               ocr_text, scene_description, gps_lat, gps_lon, location_name,
326               job_id, property_id, tags, metadata, created_at)
327              VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
328          ''', (
329              asset.id,
330              asset.asset_type,
331              asset.timestamp.isoformat(),
332              asset.source,
333              asset.file_path,
334              asset.thumbnail_path,
335              asset.ocr_text,
336              asset.scene_description,
337              asset.gps_lat,
338              asset.gps_lon,
339              asset.location_name,
340              asset.job_id,
341              asset.property_id,
342              json.dumps(asset.tags),
343              json.dumps(metadata),
344              datetime.now().isoformat()
345          ))
346          conn.commit()
347          conn.close()
348  
349      # ==================== Work Sessions ====================
350  
351      def start_work_session(
352          self,
353          property_name: str,
354          technician: str = ""
355      ) -> WorkSession:
356          """Start a new work session."""
357          session_id = f"job-{datetime.now().strftime('%Y%m%d%H%M%S')}"
358  
359          session = WorkSession(
360              id=session_id,
361              property_name=property_name,
362              started_at=datetime.now(),
363              technician=technician
364          )
365  
366          self._active_sessions[session_id] = session
367  
368          # Store in database
369          self._store_session(session)
370  
371          return session
372  
373      def end_work_session(self, session_id: str) -> Optional[WorkSession]:
374          """End a work session and generate work notes."""
375          session = self._active_sessions.get(session_id)
376          if not session:
377              return None
378  
379          session.ended_at = datetime.now()
380  
381          # Generate work notes
382          session.work_notes = self._generate_work_notes(session)
383  
384          # Update database
385          self._store_session(session)
386  
387          # Remove from active
388          del self._active_sessions[session_id]
389  
390          return session
391  
392      def add_to_session(
393          self,
394          session_id: str,
395          content: str,
396          content_type: str = 'narration'
397      ):
398          """Add content to a work session."""
399          session = self._active_sessions.get(session_id)
400          if not session:
401              return
402  
403          if content_type == 'issue':
404              session.issue_description += f"\n{content}"
405          elif content_type == 'resolution':
406              session.resolution += f"\n{content}"
407          elif content_type == 'part':
408              session.parts_used.append(content)
409  
410      def _generate_work_notes(self, session: WorkSession) -> str:
411          """Generate markdown work notes from session."""
412          lines = [
413              f"## Work Order: {session.started_at.strftime('%Y-%m-%d')} - {session.property_name}",
414              "",
415              f"**Technician**: {session.technician or 'Unknown'}",
416              f"**Duration**: {session.duration_minutes:.0f} minutes",
417              "",
418          ]
419  
420          if session.issue_description:
421              lines.append("### Issue Identified")
422              lines.append(f"> {session.issue_description.strip()}")
423              lines.append("")
424  
425          if session.assets:
426              lines.append("### Visual Documentation")
427              for asset_id in session.assets:
428                  lines.append(f"- [{asset_id}]")
429              lines.append("")
430  
431          if session.resolution:
432              lines.append("### Resolution")
433              lines.append(f"> {session.resolution.strip()}")
434              lines.append("")
435  
436          if session.parts_used:
437              lines.append("### Parts Used")
438              for part in session.parts_used:
439                  lines.append(f"- {part}")
440              lines.append("")
441  
442          lines.append("---")
443          lines.append("*Auto-generated from field capture*")
444  
445          return '\n'.join(lines)
446  
447      def _store_session(self, session: WorkSession):
448          """Store work session in database."""
449          conn = sqlite3.connect(str(self.db_path))
450          conn.execute('''
451              INSERT OR REPLACE INTO work_sessions
452              (id, property_name, started_at, ended_at, assets, audio_segments,
453               issue_description, resolution, parts_used, technician, work_notes)
454              VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
455          ''', (
456              session.id,
457              session.property_name,
458              session.started_at.isoformat(),
459              session.ended_at.isoformat() if session.ended_at else None,
460              json.dumps(session.assets),
461              json.dumps(session.audio_segments),
462              session.issue_description,
463              session.resolution,
464              json.dumps(session.parts_used),
465              session.technician,
466              session.work_notes
467          ))
468          conn.commit()
469          conn.close()
470  
471      # ==================== Queries ====================
472  
473      def get_recent_assets(
474          self,
475          hours_back: float = 24,
476          asset_type: Optional[str] = None,
477          job_id: Optional[str] = None,
478          limit: int = 50
479      ) -> List[VisualAsset]:
480          """Get recent visual assets."""
481          cutoff = (datetime.now() - timedelta(hours=hours_back)).isoformat()
482  
483          conn = sqlite3.connect(str(self.db_path))
484  
485          query = "SELECT * FROM assets WHERE timestamp > ?"
486          params = [cutoff]
487  
488          if asset_type:
489              query += " AND asset_type = ?"
490              params.append(asset_type)
491  
492          if job_id:
493              query += " AND job_id = ?"
494              params.append(job_id)
495  
496          query += " ORDER BY timestamp DESC LIMIT ?"
497          params.append(limit)
498  
499          cursor = conn.execute(query, params)
500          rows = cursor.fetchall()
501          conn.close()
502  
503          assets = []
504          for row in rows:
505              assets.append(VisualAsset(
506                  id=row[0],
507                  asset_type=row[1],
508                  timestamp=datetime.fromisoformat(row[2]),
509                  source=row[3],
510                  file_path=row[4],
511                  thumbnail_path=row[5],
512                  ocr_text=row[6] or '',
513                  scene_description=row[7] or '',
514                  gps_lat=row[8],
515                  gps_lon=row[9],
516                  location_name=row[10] or '',
517                  job_id=row[11],
518                  property_id=row[12],
519                  tags=json.loads(row[13]) if row[13] else []
520              ))
521  
522          return assets
523  
524      def search_ocr(self, query: str, limit: int = 20) -> List[VisualAsset]:
525          """Search assets by OCR text."""
526          conn = sqlite3.connect(str(self.db_path))
527  
528          cursor = conn.execute('''
529              SELECT * FROM assets
530              WHERE ocr_text LIKE ?
531              ORDER BY timestamp DESC
532              LIMIT ?
533          ''', (f'%{query}%', limit))
534  
535          rows = cursor.fetchall()
536          conn.close()
537  
538          assets = []
539          for row in rows:
540              assets.append(VisualAsset(
541                  id=row[0],
542                  asset_type=row[1],
543                  timestamp=datetime.fromisoformat(row[2]),
544                  source=row[3],
545                  file_path=row[4],
546                  thumbnail_path=row[5],
547                  ocr_text=row[6] or '',
548                  scene_description=row[7] or '',
549                  gps_lat=row[8],
550                  gps_lon=row[9],
551                  job_id=row[11],
552                  tags=json.loads(row[13]) if row[13] else []
553              ))
554  
555          return assets
556  
557      def correlate_with_audio(
558          self,
559          timestamp: datetime,
560          window_seconds: float = 30
561      ) -> List[Dict[str, Any]]:
562          """Find audio events near a visual capture."""
563          # This would query the multi_source_correlator
564          # Placeholder for now
565          return []
566  
567  
568  # Singleton
569  _visual_capture: Optional[VisualCapture] = None
570  
571  
572  def get_visual_capture() -> VisualCapture:
573      """Get or create visual capture singleton."""
574      global _visual_capture
575      if _visual_capture is None:
576          _visual_capture = VisualCapture()
577      return _visual_capture
578  
579  
580  if __name__ == '__main__':
581      print("=== Visual Capture Test ===\n")
582  
583      vc = VisualCapture()
584  
585      # Simulate a work session
586      session = vc.start_work_session("Smith Residence", "John")
587      print(f"Started session: {session.id}")
588  
589      # Simulate adding content
590      vc.add_to_session(session.id, "Condensate line backup observed", "issue")
591      vc.add_to_session(session.id, "Cleared with compressed air", "resolution")
592  
593      # End session
594      session = vc.end_work_session(session.id)
595      print("\nGenerated Work Notes:")
596      print(session.work_notes)