head_pose.py
1 """ 2 Head Pose Tracking via MacBook Camera 3 4 Uses Apple's Vision framework to detect head orientation (yaw, pitch, roll) 5 from the FaceTime camera. Combined with Tobii eye tracking, this enables 6 multi-monitor attention detection. 7 8 Architecture: 9 MacBook Camera → Vision Framework → Head Pose → Screen Inference 10 ↓ 11 Tobii 5 → Precise Gaze (main monitor) ──────→ Attention Fusion 12 13 Screen layout (configurable): 14 [MacBook] [Main Monitor] [iPad] 15 -45° 0° +45° 16 LEFT CENTER RIGHT 17 18 When Tobii gaze goes invalid and head yaw < -20°, infer "looking at MacBook" 19 When Tobii gaze goes invalid and head yaw > +20°, infer "looking at iPad" 20 When Tobii gaze is valid, use precise coordinates on main monitor 21 """ 22 23 import subprocess 24 import json 25 import threading 26 import time 27 from dataclasses import dataclass 28 from datetime import datetime 29 from typing import Optional, List, Callable 30 from enum import Enum 31 from pathlib import Path 32 33 34 class AttentionScreen(Enum): 35 """Which screen the operator is attending to.""" 36 LEFT = "left" # MacBook (to the left) 37 CENTER = "center" # Main monitor (Tobii tracked) 38 RIGHT = "right" # iPad (to the right) 39 UNKNOWN = "unknown" # Can't determine 40 41 42 @dataclass 43 class HeadPose: 44 """Head orientation from Vision framework.""" 45 timestamp: datetime 46 yaw: float # Left/right rotation (-1 to 1, negative = looking left) 47 pitch: float # Up/down tilt (-1 to 1, negative = looking down) 48 roll: float # Head tilt (-1 to 1) 49 valid: bool # Whether face was detected 50 51 @property 52 def yaw_degrees(self) -> float: 53 """Convert yaw to approximate degrees (-90 to +90).""" 54 return self.yaw * 90 55 56 def infer_screen( 57 self, 58 left_threshold: float = -0.25, # -22.5 degrees 59 right_threshold: float = 0.25 # +22.5 degrees 60 ) -> AttentionScreen: 61 """Infer which screen based on head yaw.""" 62 if not self.valid: 63 return AttentionScreen.UNKNOWN 64 if self.yaw < left_threshold: 65 return AttentionScreen.LEFT 66 elif self.yaw > right_threshold: 67 return AttentionScreen.RIGHT 68 else: 69 return AttentionScreen.CENTER 70 71 72 # Swift code for Vision framework head pose detection 73 # This runs as a subprocess since Vision framework requires Swift/ObjC 74 SWIFT_HEAD_POSE_TRACKER = ''' 75 import Foundation 76 import AVFoundation 77 import Vision 78 79 class HeadPoseTracker: NSObject, AVCaptureVideoDataOutputSampleBufferDelegate { 80 let session = AVCaptureSession() 81 let outputQueue = DispatchQueue(label: "HeadPoseOutput") 82 var isRunning = false 83 84 override init() { 85 super.init() 86 setupCamera() 87 } 88 89 func setupCamera() { 90 session.sessionPreset = .medium 91 92 guard let device = AVCaptureDevice.default(.builtInWideAngleCamera, for: .video, position: .front), 93 let input = try? AVCaptureDeviceInput(device: device) else { 94 fputs("ERROR: Could not access camera\\n", stderr) 95 return 96 } 97 98 session.addInput(input) 99 100 let output = AVCaptureVideoDataOutput() 101 output.setSampleBufferDelegate(self, queue: outputQueue) 102 output.alwaysDiscardsLateVideoFrames = true 103 session.addOutput(output) 104 } 105 106 func start() { 107 isRunning = true 108 session.startRunning() 109 } 110 111 func stop() { 112 isRunning = false 113 session.stopRunning() 114 } 115 116 func captureOutput(_ output: AVCaptureOutput, didOutput sampleBuffer: CMSampleBuffer, from connection: AVCaptureConnection) { 117 guard isRunning else { return } 118 119 guard let pixelBuffer = CMSampleBufferGetImageBuffer(sampleBuffer) else { return } 120 121 let request = VNDetectFaceRectanglesRequest { [weak self] request, error in 122 self?.handleFaceDetection(request: request, error: error) 123 } 124 125 // Enable head pose detection 126 let faceRequest = VNDetectFaceLandmarksRequest { [weak self] request, error in 127 self?.handleFaceLandmarks(request: request, error: error) 128 } 129 130 let handler = VNImageRequestHandler(cvPixelBuffer: pixelBuffer, options: [:]) 131 try? handler.perform([request, faceRequest]) 132 } 133 134 func handleFaceDetection(request: VNRequest, error: Error?) { 135 guard let observations = request.results as? [VNFaceObservation], 136 let face = observations.first else { 137 outputPose(yaw: 0, pitch: 0, roll: 0, valid: false) 138 return 139 } 140 141 // VNFaceObservation has yaw, pitch, roll as optional NSNumber 142 let yaw = face.yaw?.floatValue ?? 0 143 let pitch = face.pitch?.floatValue ?? 0 144 let roll = face.roll?.floatValue ?? 0 145 146 outputPose(yaw: yaw, pitch: pitch, roll: roll, valid: true) 147 } 148 149 func handleFaceLandmarks(request: VNRequest, error: Error?) { 150 // Can be extended to get more precise eye positions 151 } 152 153 func outputPose(yaw: Float, pitch: Float, roll: Float, valid: Bool) { 154 let timestamp = ISO8601DateFormatter().string(from: Date()) 155 let json: [String: Any] = [ 156 "type": "head_pose", 157 "timestamp": timestamp, 158 "yaw": yaw, 159 "pitch": pitch, 160 "roll": roll, 161 "valid": valid 162 ] 163 164 if let data = try? JSONSerialization.data(withJSONObject: json), 165 let str = String(data: data, encoding: .utf8) { 166 print(str) 167 fflush(stdout) 168 } 169 } 170 } 171 172 // Main 173 let tracker = HeadPoseTracker() 174 tracker.start() 175 176 // Run for a while (will be killed by parent process) 177 RunLoop.main.run() 178 ''' 179 180 181 class HeadPoseTracker: 182 """ 183 Tracks head pose using MacBook's FaceTime camera. 184 185 Uses Vision framework via a Swift subprocess to detect 186 face orientation (yaw, pitch, roll). 187 188 Usage: 189 tracker = HeadPoseTracker() 190 tracker.start() 191 192 # Get current pose 193 pose = tracker.get_pose() 194 if pose and pose.valid: 195 screen = pose.infer_screen() 196 print(f"Looking at: {screen.value}") 197 198 tracker.stop() 199 """ 200 201 def __init__( 202 self, 203 left_threshold: float = -0.25, 204 right_threshold: float = 0.25, 205 ): 206 self.left_threshold = left_threshold 207 self.right_threshold = right_threshold 208 209 self._process: Optional[subprocess.Popen] = None 210 self._reader_thread: Optional[threading.Thread] = None 211 self._stop_event = threading.Event() 212 213 self._current_pose: Optional[HeadPose] = None 214 self._pose_lock = threading.Lock() 215 216 # Callbacks 217 self._on_pose: List[Callable[[HeadPose], None]] = [] 218 self._on_screen_change: List[Callable[[AttentionScreen, AttentionScreen], None]] = [] 219 220 self._last_screen = AttentionScreen.UNKNOWN 221 222 # Path to compiled Swift tracker 223 self._swift_binary = Path.home() / ".sovereign" / "head_pose_tracker" 224 225 def start(self) -> bool: 226 """Start head pose tracking.""" 227 # Ensure Swift binary exists 228 if not self._ensure_swift_binary(): 229 print("[HeadPoseTracker] Failed to compile Swift tracker") 230 return False 231 232 try: 233 self._process = subprocess.Popen( 234 [str(self._swift_binary)], 235 stdout=subprocess.PIPE, 236 stderr=subprocess.PIPE, 237 text=True, 238 bufsize=1, 239 ) 240 241 self._stop_event.clear() 242 self._reader_thread = threading.Thread( 243 target=self._read_output, 244 daemon=True, 245 name="HeadPoseReader" 246 ) 247 self._reader_thread.start() 248 249 print("[HeadPoseTracker] Started") 250 return True 251 252 except Exception as e: 253 print(f"[HeadPoseTracker] Failed to start: {e}") 254 return False 255 256 def stop(self): 257 """Stop head pose tracking.""" 258 self._stop_event.set() 259 260 if self._process: 261 self._process.terminate() 262 try: 263 self._process.wait(timeout=2) 264 except subprocess.TimeoutExpired: 265 self._process.kill() 266 self._process = None 267 268 if self._reader_thread: 269 self._reader_thread.join(timeout=1) 270 self._reader_thread = None 271 272 print("[HeadPoseTracker] Stopped") 273 274 def get_pose(self) -> Optional[HeadPose]: 275 """Get the most recent head pose.""" 276 with self._pose_lock: 277 return self._current_pose 278 279 def get_screen(self) -> AttentionScreen: 280 """Get which screen the operator is looking at.""" 281 pose = self.get_pose() 282 if pose: 283 return pose.infer_screen(self.left_threshold, self.right_threshold) 284 return AttentionScreen.UNKNOWN 285 286 def on_pose(self, callback: Callable[[HeadPose], None]): 287 """Register callback for pose updates.""" 288 self._on_pose.append(callback) 289 290 def on_screen_change(self, callback: Callable[[AttentionScreen, AttentionScreen], None]): 291 """Register callback for screen changes (old_screen, new_screen).""" 292 self._on_screen_change.append(callback) 293 294 def _ensure_swift_binary(self) -> bool: 295 """Compile Swift tracker if needed.""" 296 if self._swift_binary.exists(): 297 return True 298 299 print("[HeadPoseTracker] Compiling Swift head pose tracker...") 300 301 # Write Swift source 302 swift_source = self._swift_binary.with_suffix('.swift') 303 swift_source.parent.mkdir(parents=True, exist_ok=True) 304 swift_source.write_text(SWIFT_HEAD_POSE_TRACKER) 305 306 # Compile 307 try: 308 result = subprocess.run( 309 [ 310 'swiftc', 311 '-O', 312 '-o', str(self._swift_binary), 313 '-framework', 'AVFoundation', 314 '-framework', 'Vision', 315 '-framework', 'CoreMedia', 316 str(swift_source), 317 ], 318 capture_output=True, 319 text=True, 320 timeout=60, 321 ) 322 323 if result.returncode != 0: 324 print(f"[HeadPoseTracker] Compile failed: {result.stderr}") 325 return False 326 327 print("[HeadPoseTracker] Compiled successfully") 328 return True 329 330 except Exception as e: 331 print(f"[HeadPoseTracker] Compile error: {e}") 332 return False 333 334 def _read_output(self): 335 """Read head pose data from Swift subprocess.""" 336 while not self._stop_event.is_set() and self._process: 337 try: 338 line = self._process.stdout.readline() 339 if not line: 340 time.sleep(0.01) 341 continue 342 343 self._process_line(line.strip()) 344 345 except Exception as e: 346 if not self._stop_event.is_set(): 347 print(f"[HeadPoseTracker] Read error: {e}") 348 break 349 350 def _process_line(self, line: str): 351 """Process a JSON line from Swift tracker.""" 352 try: 353 data = json.loads(line) 354 355 if data.get("type") == "head_pose": 356 pose = HeadPose( 357 timestamp=datetime.fromisoformat(data["timestamp"].replace('Z', '+00:00')), 358 yaw=data["yaw"], 359 pitch=data["pitch"], 360 roll=data["roll"], 361 valid=data["valid"], 362 ) 363 364 with self._pose_lock: 365 self._current_pose = pose 366 367 # Notify callbacks 368 for callback in self._on_pose: 369 try: 370 callback(pose) 371 except Exception as e: 372 print(f"[HeadPoseTracker] Callback error: {e}") 373 374 # Check for screen change 375 new_screen = pose.infer_screen(self.left_threshold, self.right_threshold) 376 if new_screen != self._last_screen: 377 for callback in self._on_screen_change: 378 try: 379 callback(self._last_screen, new_screen) 380 except Exception as e: 381 print(f"[HeadPoseTracker] Screen change callback error: {e}") 382 self._last_screen = new_screen 383 384 except json.JSONDecodeError: 385 pass 386 except Exception as e: 387 print(f"[HeadPoseTracker] Process error: {e}") 388 389 390 class MultiMonitorAttention: 391 """ 392 Fuses Tobii eye tracking with head pose for multi-monitor attention. 393 394 When Tobii has valid gaze → use precise coordinates on main monitor 395 When Tobii gaze invalid + head left → infer MacBook attention 396 When Tobii gaze invalid + head right → infer iPad attention 397 """ 398 399 def __init__( 400 self, 401 head_tracker: HeadPoseTracker, 402 screen_names: dict = None, 403 ): 404 self.head_tracker = head_tracker 405 self.screen_names = screen_names or { 406 AttentionScreen.LEFT: "MacBook", 407 AttentionScreen.CENTER: "Main Monitor", 408 AttentionScreen.RIGHT: "iPad", 409 } 410 411 self._tobii_valid = False 412 self._current_screen = AttentionScreen.CENTER 413 self._callbacks: List[Callable[[AttentionScreen, str], None]] = [] 414 415 def update_tobii_validity(self, valid: bool): 416 """Called when Tobii gaze validity changes.""" 417 was_valid = self._tobii_valid 418 self._tobii_valid = valid 419 420 if was_valid and not valid: 421 # Lost Tobii tracking, use head pose to determine screen 422 screen = self.head_tracker.get_screen() 423 if screen != AttentionScreen.UNKNOWN: 424 self._update_screen(screen) 425 elif not was_valid and valid: 426 # Regained Tobii tracking, back to main monitor 427 self._update_screen(AttentionScreen.CENTER) 428 429 def _update_screen(self, screen: AttentionScreen): 430 """Update current screen and notify.""" 431 if screen != self._current_screen: 432 self._current_screen = screen 433 name = self.screen_names.get(screen, screen.value) 434 435 for callback in self._callbacks: 436 try: 437 callback(screen, name) 438 except Exception as e: 439 print(f"[MultiMonitorAttention] Callback error: {e}") 440 441 def on_screen_change(self, callback: Callable[[AttentionScreen, str], None]): 442 """Register callback for screen changes.""" 443 self._callbacks.append(callback) 444 445 def get_current_screen(self) -> tuple[AttentionScreen, str]: 446 """Get current attention screen.""" 447 name = self.screen_names.get(self._current_screen, self._current_screen.value) 448 return self._current_screen, name 449 450 451 if __name__ == "__main__": 452 print("=== Head Pose Tracker Test ===\n") 453 454 tracker = HeadPoseTracker() 455 456 def on_pose(pose: HeadPose): 457 if pose.valid: 458 screen = pose.infer_screen() 459 print(f"Yaw: {pose.yaw_degrees:+.1f}° → {screen.value}") 460 461 def on_screen_change(old: AttentionScreen, new: AttentionScreen): 462 print(f"\n*** Screen changed: {old.value} → {new.value} ***\n") 463 464 tracker.on_pose(on_pose) 465 tracker.on_screen_change(on_screen_change) 466 467 print("Starting head pose tracking...") 468 print("Turn your head left/right to see screen detection") 469 print("Press Ctrl+C to stop\n") 470 471 if tracker.start(): 472 try: 473 while True: 474 time.sleep(0.1) 475 except KeyboardInterrupt: 476 pass 477 478 tracker.stop() 479 print("\nDone.")