visual_capture.py
1 #!/usr/bin/env python3 2 """ 3 Visual Capture Pipeline 4 5 Ingests photos, videos, and documents for correlation with audio/sessions. 6 7 Sources: 8 - iPhone camera (via Shortcut POST) 9 - Meta Glasses (via iCloud Photos sync) 10 - iPad scanner (document scans) 11 - Video clips (key frame extraction) 12 13 Processing: 14 - OCR for text extraction (Apple Vision / tesseract) 15 - Scene description (vision model) 16 - GPS/timestamp correlation 17 - Thumbnail generation 18 19 Usage: 20 from visual_capture import VisualCapture 21 22 capture = VisualCapture() 23 result = capture.ingest_image(image_data, metadata) 24 # Returns: {id, text, description, timestamp, correlations} 25 """ 26 27 import base64 28 import hashlib 29 import json 30 import os 31 import subprocess 32 from dataclasses import dataclass, field 33 from datetime import datetime, timedelta 34 from pathlib import Path 35 from typing import Dict, List, Optional, Any, Tuple 36 import sqlite3 37 38 39 @dataclass 40 class VisualAsset: 41 """A captured visual asset (photo, video frame, document).""" 42 id: str 43 asset_type: str # 'photo', 'video', 'document', 'screenshot' 44 timestamp: datetime 45 source: str # 'iphone', 'meta_glasses', 'ipad', 'screenshot' 46 47 # File info 48 file_path: Optional[str] = None 49 thumbnail_path: Optional[str] = None 50 file_size: int = 0 51 52 # Extracted content 53 ocr_text: str = "" 54 scene_description: str = "" 55 56 # Location 57 gps_lat: Optional[float] = None 58 gps_lon: Optional[float] = None 59 location_name: str = "" 60 61 # Work context 62 job_id: Optional[str] = None 63 property_id: Optional[str] = None 64 tags: List[str] = field(default_factory=list) 65 66 # Correlation 67 correlated_audio: List[str] = field(default_factory=list) # Audio segment IDs 68 correlated_atoms: List[str] = field(default_factory=list) # Graph atom UUIDs 69 70 def to_dict(self) -> Dict[str, Any]: 71 return { 72 'id': self.id, 73 'asset_type': self.asset_type, 74 'timestamp': self.timestamp.isoformat(), 75 'source': self.source, 76 'file_path': self.file_path, 77 'ocr_text': self.ocr_text[:500] if self.ocr_text else '', 78 'scene_description': self.scene_description, 79 'gps': {'lat': self.gps_lat, 'lon': self.gps_lon} if self.gps_lat else None, 80 'location_name': self.location_name, 81 'job_id': self.job_id, 82 'tags': self.tags, 83 'correlation_count': len(self.correlated_audio) + len(self.correlated_atoms) 84 } 85 86 87 @dataclass 88 class WorkSession: 89 """A work session (e.g., technician job at a property).""" 90 id: str 91 property_name: str 92 started_at: datetime 93 ended_at: Optional[datetime] = None 94 95 # Captured content 96 assets: List[str] = field(default_factory=list) # Asset IDs 97 audio_segments: List[str] = field(default_factory=list) 98 99 # Work data 100 issue_description: str = "" 101 resolution: str = "" 102 parts_used: List[str] = field(default_factory=list) 103 technician: str = "" 104 105 # Generated 106 work_notes: str = "" 107 108 @property 109 def duration_minutes(self) -> float: 110 if self.ended_at: 111 return (self.ended_at - self.started_at).total_seconds() / 60 112 return (datetime.now() - self.started_at).total_seconds() / 60 113 114 115 class VisualCapture: 116 """ 117 Manages visual capture and correlation. 118 """ 119 120 DEFAULT_DATA_DIR = Path.home() / 'repos/Sovereign_OS/data/visual' 121 122 def __init__(self, data_dir: Optional[str] = None): 123 self.data_dir = Path(data_dir) if data_dir else self.DEFAULT_DATA_DIR 124 self.data_dir.mkdir(parents=True, exist_ok=True) 125 126 # Subdirectories 127 self.images_dir = self.data_dir / 'images' 128 self.thumbnails_dir = self.data_dir / 'thumbnails' 129 self.images_dir.mkdir(exist_ok=True) 130 self.thumbnails_dir.mkdir(exist_ok=True) 131 132 # Database 133 self.db_path = self.data_dir / 'visual.db' 134 self._init_db() 135 136 # Active work sessions 137 self._active_sessions: Dict[str, WorkSession] = {} 138 139 def _init_db(self): 140 """Initialize SQLite database.""" 141 conn = sqlite3.connect(str(self.db_path)) 142 conn.execute(''' 143 CREATE TABLE IF NOT EXISTS assets ( 144 id TEXT PRIMARY KEY, 145 asset_type TEXT, 146 timestamp TEXT, 147 source TEXT, 148 file_path TEXT, 149 thumbnail_path TEXT, 150 ocr_text TEXT, 151 scene_description TEXT, 152 gps_lat REAL, 153 gps_lon REAL, 154 location_name TEXT, 155 job_id TEXT, 156 property_id TEXT, 157 tags TEXT, 158 metadata TEXT, 159 created_at TEXT 160 ) 161 ''') 162 conn.execute(''' 163 CREATE TABLE IF NOT EXISTS work_sessions ( 164 id TEXT PRIMARY KEY, 165 property_name TEXT, 166 started_at TEXT, 167 ended_at TEXT, 168 assets TEXT, 169 audio_segments TEXT, 170 issue_description TEXT, 171 resolution TEXT, 172 parts_used TEXT, 173 technician TEXT, 174 work_notes TEXT 175 ) 176 ''') 177 conn.execute(''' 178 CREATE INDEX IF NOT EXISTS idx_assets_timestamp 179 ON assets(timestamp) 180 ''') 181 conn.execute(''' 182 CREATE INDEX IF NOT EXISTS idx_assets_job 183 ON assets(job_id) 184 ''') 185 conn.commit() 186 conn.close() 187 188 def ingest_image( 189 self, 190 image_data: bytes, 191 source: str = 'iphone', 192 asset_type: str = 'photo', 193 timestamp: Optional[datetime] = None, 194 gps: Optional[Tuple[float, float]] = None, 195 job_id: Optional[str] = None, 196 tags: Optional[List[str]] = None, 197 metadata: Optional[Dict[str, Any]] = None 198 ) -> VisualAsset: 199 """ 200 Ingest an image and process it. 201 202 Args: 203 image_data: Raw image bytes (JPEG/PNG) 204 source: Where it came from 205 asset_type: Type of asset 206 timestamp: When captured (default: now) 207 gps: (lat, lon) tuple 208 job_id: Associated work session 209 tags: Manual tags 210 metadata: Additional metadata 211 212 Returns: 213 VisualAsset with extracted content 214 """ 215 timestamp = timestamp or datetime.now() 216 tags = tags or [] 217 metadata = metadata or {} 218 219 # Generate ID from content hash 220 content_hash = hashlib.sha256(image_data).hexdigest()[:12] 221 asset_id = f"{timestamp.strftime('%Y%m%d%H%M%S')}-{content_hash}" 222 223 # Save image 224 date_dir = self.images_dir / timestamp.strftime('%Y-%m-%d') 225 date_dir.mkdir(exist_ok=True) 226 227 # Detect format from magic bytes 228 ext = 'jpg' 229 if image_data[:4] == b'\x89PNG': 230 ext = 'png' 231 elif image_data[:4] == b'%PDF': 232 ext = 'pdf' 233 234 file_path = date_dir / f"{asset_id}.{ext}" 235 file_path.write_bytes(image_data) 236 237 # Create thumbnail 238 thumbnail_path = self._create_thumbnail(file_path, asset_id) 239 240 # Run OCR 241 ocr_text = self._extract_text(file_path) 242 243 # Scene description (placeholder - would use vision model) 244 scene_description = self._describe_scene(file_path) 245 246 # Create asset 247 asset = VisualAsset( 248 id=asset_id, 249 asset_type=asset_type, 250 timestamp=timestamp, 251 source=source, 252 file_path=str(file_path), 253 thumbnail_path=str(thumbnail_path) if thumbnail_path else None, 254 file_size=len(image_data), 255 ocr_text=ocr_text, 256 scene_description=scene_description, 257 gps_lat=gps[0] if gps else None, 258 gps_lon=gps[1] if gps else None, 259 job_id=job_id, 260 tags=tags 261 ) 262 263 # Add to active session if any 264 if job_id and job_id in self._active_sessions: 265 self._active_sessions[job_id].assets.append(asset_id) 266 267 # Store in database 268 self._store_asset(asset, metadata) 269 270 return asset 271 272 def _create_thumbnail(self, file_path: Path, asset_id: str) -> Optional[Path]: 273 """Create thumbnail using sips (macOS).""" 274 try: 275 thumb_path = self.thumbnails_dir / f"{asset_id}_thumb.jpg" 276 277 # Use sips for macOS thumbnail generation 278 subprocess.run([ 279 'sips', '-Z', '300', 280 '--setProperty', 'format', 'jpeg', 281 str(file_path), 282 '--out', str(thumb_path) 283 ], capture_output=True, check=True, timeout=10) 284 285 return thumb_path 286 except Exception as e: 287 print(f"[VisualCapture] Thumbnail error: {e}") 288 return None 289 290 def _extract_text(self, file_path: Path) -> str: 291 """Extract text using macOS Vision framework via shortcuts.""" 292 try: 293 # Use macOS screencapture with OCR or Vision framework 294 # For now, try tesseract if available 295 result = subprocess.run( 296 ['tesseract', str(file_path), 'stdout', '-l', 'eng'], 297 capture_output=True, 298 text=True, 299 timeout=30 300 ) 301 if result.returncode == 0: 302 return result.stdout.strip() 303 except FileNotFoundError: 304 pass # tesseract not installed 305 except Exception as e: 306 print(f"[VisualCapture] OCR error: {e}") 307 308 return "" 309 310 def _describe_scene(self, file_path: Path) -> str: 311 """ 312 Describe the scene in the image. 313 314 This would ideally use a vision model. For now, return placeholder. 315 """ 316 # Future: Use local vision model or API 317 return "" 318 319 def _store_asset(self, asset: VisualAsset, metadata: Dict[str, Any]): 320 """Store asset in database.""" 321 conn = sqlite3.connect(str(self.db_path)) 322 conn.execute(''' 323 INSERT OR REPLACE INTO assets 324 (id, asset_type, timestamp, source, file_path, thumbnail_path, 325 ocr_text, scene_description, gps_lat, gps_lon, location_name, 326 job_id, property_id, tags, metadata, created_at) 327 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 328 ''', ( 329 asset.id, 330 asset.asset_type, 331 asset.timestamp.isoformat(), 332 asset.source, 333 asset.file_path, 334 asset.thumbnail_path, 335 asset.ocr_text, 336 asset.scene_description, 337 asset.gps_lat, 338 asset.gps_lon, 339 asset.location_name, 340 asset.job_id, 341 asset.property_id, 342 json.dumps(asset.tags), 343 json.dumps(metadata), 344 datetime.now().isoformat() 345 )) 346 conn.commit() 347 conn.close() 348 349 # ==================== Work Sessions ==================== 350 351 def start_work_session( 352 self, 353 property_name: str, 354 technician: str = "" 355 ) -> WorkSession: 356 """Start a new work session.""" 357 session_id = f"job-{datetime.now().strftime('%Y%m%d%H%M%S')}" 358 359 session = WorkSession( 360 id=session_id, 361 property_name=property_name, 362 started_at=datetime.now(), 363 technician=technician 364 ) 365 366 self._active_sessions[session_id] = session 367 368 # Store in database 369 self._store_session(session) 370 371 return session 372 373 def end_work_session(self, session_id: str) -> Optional[WorkSession]: 374 """End a work session and generate work notes.""" 375 session = self._active_sessions.get(session_id) 376 if not session: 377 return None 378 379 session.ended_at = datetime.now() 380 381 # Generate work notes 382 session.work_notes = self._generate_work_notes(session) 383 384 # Update database 385 self._store_session(session) 386 387 # Remove from active 388 del self._active_sessions[session_id] 389 390 return session 391 392 def add_to_session( 393 self, 394 session_id: str, 395 content: str, 396 content_type: str = 'narration' 397 ): 398 """Add content to a work session.""" 399 session = self._active_sessions.get(session_id) 400 if not session: 401 return 402 403 if content_type == 'issue': 404 session.issue_description += f"\n{content}" 405 elif content_type == 'resolution': 406 session.resolution += f"\n{content}" 407 elif content_type == 'part': 408 session.parts_used.append(content) 409 410 def _generate_work_notes(self, session: WorkSession) -> str: 411 """Generate markdown work notes from session.""" 412 lines = [ 413 f"## Work Order: {session.started_at.strftime('%Y-%m-%d')} - {session.property_name}", 414 "", 415 f"**Technician**: {session.technician or 'Unknown'}", 416 f"**Duration**: {session.duration_minutes:.0f} minutes", 417 "", 418 ] 419 420 if session.issue_description: 421 lines.append("### Issue Identified") 422 lines.append(f"> {session.issue_description.strip()}") 423 lines.append("") 424 425 if session.assets: 426 lines.append("### Visual Documentation") 427 for asset_id in session.assets: 428 lines.append(f"- [{asset_id}]") 429 lines.append("") 430 431 if session.resolution: 432 lines.append("### Resolution") 433 lines.append(f"> {session.resolution.strip()}") 434 lines.append("") 435 436 if session.parts_used: 437 lines.append("### Parts Used") 438 for part in session.parts_used: 439 lines.append(f"- {part}") 440 lines.append("") 441 442 lines.append("---") 443 lines.append("*Auto-generated from field capture*") 444 445 return '\n'.join(lines) 446 447 def _store_session(self, session: WorkSession): 448 """Store work session in database.""" 449 conn = sqlite3.connect(str(self.db_path)) 450 conn.execute(''' 451 INSERT OR REPLACE INTO work_sessions 452 (id, property_name, started_at, ended_at, assets, audio_segments, 453 issue_description, resolution, parts_used, technician, work_notes) 454 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 455 ''', ( 456 session.id, 457 session.property_name, 458 session.started_at.isoformat(), 459 session.ended_at.isoformat() if session.ended_at else None, 460 json.dumps(session.assets), 461 json.dumps(session.audio_segments), 462 session.issue_description, 463 session.resolution, 464 json.dumps(session.parts_used), 465 session.technician, 466 session.work_notes 467 )) 468 conn.commit() 469 conn.close() 470 471 # ==================== Queries ==================== 472 473 def get_recent_assets( 474 self, 475 hours_back: float = 24, 476 asset_type: Optional[str] = None, 477 job_id: Optional[str] = None, 478 limit: int = 50 479 ) -> List[VisualAsset]: 480 """Get recent visual assets.""" 481 cutoff = (datetime.now() - timedelta(hours=hours_back)).isoformat() 482 483 conn = sqlite3.connect(str(self.db_path)) 484 485 query = "SELECT * FROM assets WHERE timestamp > ?" 486 params = [cutoff] 487 488 if asset_type: 489 query += " AND asset_type = ?" 490 params.append(asset_type) 491 492 if job_id: 493 query += " AND job_id = ?" 494 params.append(job_id) 495 496 query += " ORDER BY timestamp DESC LIMIT ?" 497 params.append(limit) 498 499 cursor = conn.execute(query, params) 500 rows = cursor.fetchall() 501 conn.close() 502 503 assets = [] 504 for row in rows: 505 assets.append(VisualAsset( 506 id=row[0], 507 asset_type=row[1], 508 timestamp=datetime.fromisoformat(row[2]), 509 source=row[3], 510 file_path=row[4], 511 thumbnail_path=row[5], 512 ocr_text=row[6] or '', 513 scene_description=row[7] or '', 514 gps_lat=row[8], 515 gps_lon=row[9], 516 location_name=row[10] or '', 517 job_id=row[11], 518 property_id=row[12], 519 tags=json.loads(row[13]) if row[13] else [] 520 )) 521 522 return assets 523 524 def search_ocr(self, query: str, limit: int = 20) -> List[VisualAsset]: 525 """Search assets by OCR text.""" 526 conn = sqlite3.connect(str(self.db_path)) 527 528 cursor = conn.execute(''' 529 SELECT * FROM assets 530 WHERE ocr_text LIKE ? 531 ORDER BY timestamp DESC 532 LIMIT ? 533 ''', (f'%{query}%', limit)) 534 535 rows = cursor.fetchall() 536 conn.close() 537 538 assets = [] 539 for row in rows: 540 assets.append(VisualAsset( 541 id=row[0], 542 asset_type=row[1], 543 timestamp=datetime.fromisoformat(row[2]), 544 source=row[3], 545 file_path=row[4], 546 thumbnail_path=row[5], 547 ocr_text=row[6] or '', 548 scene_description=row[7] or '', 549 gps_lat=row[8], 550 gps_lon=row[9], 551 job_id=row[11], 552 tags=json.loads(row[13]) if row[13] else [] 553 )) 554 555 return assets 556 557 def correlate_with_audio( 558 self, 559 timestamp: datetime, 560 window_seconds: float = 30 561 ) -> List[Dict[str, Any]]: 562 """Find audio events near a visual capture.""" 563 # This would query the multi_source_correlator 564 # Placeholder for now 565 return [] 566 567 568 # Singleton 569 _visual_capture: Optional[VisualCapture] = None 570 571 572 def get_visual_capture() -> VisualCapture: 573 """Get or create visual capture singleton.""" 574 global _visual_capture 575 if _visual_capture is None: 576 _visual_capture = VisualCapture() 577 return _visual_capture 578 579 580 if __name__ == '__main__': 581 print("=== Visual Capture Test ===\n") 582 583 vc = VisualCapture() 584 585 # Simulate a work session 586 session = vc.start_work_session("Smith Residence", "John") 587 print(f"Started session: {session.id}") 588 589 # Simulate adding content 590 vc.add_to_session(session.id, "Condensate line backup observed", "issue") 591 vc.add_to_session(session.id, "Cleared with compressed air", "resolution") 592 593 # End session 594 session = vc.end_work_session(session.id) 595 print("\nGenerated Work Notes:") 596 print(session.work_notes)