/ core / attention / head_pose.py
head_pose.py
  1  """
  2  Head Pose Tracking via MacBook Camera
  3  
  4  Uses Apple's Vision framework to detect head orientation (yaw, pitch, roll)
  5  from the FaceTime camera. Combined with Tobii eye tracking, this enables
  6  multi-monitor attention detection.
  7  
  8  Architecture:
  9      MacBook Camera → Vision Framework → Head Pose → Screen Inference
 10 11      Tobii 5 → Precise Gaze (main monitor) ──────→ Attention Fusion
 12  
 13  Screen layout (configurable):
 14      [MacBook]  [Main Monitor]  [iPad]
 15         -45°         0°          +45°
 16         LEFT       CENTER       RIGHT
 17  
 18  When Tobii gaze goes invalid and head yaw < -20°, infer "looking at MacBook"
 19  When Tobii gaze goes invalid and head yaw > +20°, infer "looking at iPad"
 20  When Tobii gaze is valid, use precise coordinates on main monitor
 21  """
 22  
 23  import subprocess
 24  import json
 25  import threading
 26  import time
 27  from dataclasses import dataclass
 28  from datetime import datetime
 29  from typing import Optional, List, Callable
 30  from enum import Enum
 31  from pathlib import Path
 32  
 33  
 34  class AttentionScreen(Enum):
 35      """Which screen the operator is attending to."""
 36      LEFT = "left"        # MacBook (to the left)
 37      CENTER = "center"    # Main monitor (Tobii tracked)
 38      RIGHT = "right"      # iPad (to the right)
 39      UNKNOWN = "unknown"  # Can't determine
 40  
 41  
 42  @dataclass
 43  class HeadPose:
 44      """Head orientation from Vision framework."""
 45      timestamp: datetime
 46      yaw: float    # Left/right rotation (-1 to 1, negative = looking left)
 47      pitch: float  # Up/down tilt (-1 to 1, negative = looking down)
 48      roll: float   # Head tilt (-1 to 1)
 49      valid: bool   # Whether face was detected
 50  
 51      @property
 52      def yaw_degrees(self) -> float:
 53          """Convert yaw to approximate degrees (-90 to +90)."""
 54          return self.yaw * 90
 55  
 56      def infer_screen(
 57          self,
 58          left_threshold: float = -0.25,   # -22.5 degrees
 59          right_threshold: float = 0.25    # +22.5 degrees
 60      ) -> AttentionScreen:
 61          """Infer which screen based on head yaw."""
 62          if not self.valid:
 63              return AttentionScreen.UNKNOWN
 64          if self.yaw < left_threshold:
 65              return AttentionScreen.LEFT
 66          elif self.yaw > right_threshold:
 67              return AttentionScreen.RIGHT
 68          else:
 69              return AttentionScreen.CENTER
 70  
 71  
 72  # Swift code for Vision framework head pose detection
 73  # This runs as a subprocess since Vision framework requires Swift/ObjC
 74  SWIFT_HEAD_POSE_TRACKER = '''
 75  import Foundation
 76  import AVFoundation
 77  import Vision
 78  
 79  class HeadPoseTracker: NSObject, AVCaptureVideoDataOutputSampleBufferDelegate {
 80      let session = AVCaptureSession()
 81      let outputQueue = DispatchQueue(label: "HeadPoseOutput")
 82      var isRunning = false
 83  
 84      override init() {
 85          super.init()
 86          setupCamera()
 87      }
 88  
 89      func setupCamera() {
 90          session.sessionPreset = .medium
 91  
 92          guard let device = AVCaptureDevice.default(.builtInWideAngleCamera, for: .video, position: .front),
 93                let input = try? AVCaptureDeviceInput(device: device) else {
 94              fputs("ERROR: Could not access camera\\n", stderr)
 95              return
 96          }
 97  
 98          session.addInput(input)
 99  
100          let output = AVCaptureVideoDataOutput()
101          output.setSampleBufferDelegate(self, queue: outputQueue)
102          output.alwaysDiscardsLateVideoFrames = true
103          session.addOutput(output)
104      }
105  
106      func start() {
107          isRunning = true
108          session.startRunning()
109      }
110  
111      func stop() {
112          isRunning = false
113          session.stopRunning()
114      }
115  
116      func captureOutput(_ output: AVCaptureOutput, didOutput sampleBuffer: CMSampleBuffer, from connection: AVCaptureConnection) {
117          guard isRunning else { return }
118  
119          guard let pixelBuffer = CMSampleBufferGetImageBuffer(sampleBuffer) else { return }
120  
121          let request = VNDetectFaceRectanglesRequest { [weak self] request, error in
122              self?.handleFaceDetection(request: request, error: error)
123          }
124  
125          // Enable head pose detection
126          let faceRequest = VNDetectFaceLandmarksRequest { [weak self] request, error in
127              self?.handleFaceLandmarks(request: request, error: error)
128          }
129  
130          let handler = VNImageRequestHandler(cvPixelBuffer: pixelBuffer, options: [:])
131          try? handler.perform([request, faceRequest])
132      }
133  
134      func handleFaceDetection(request: VNRequest, error: Error?) {
135          guard let observations = request.results as? [VNFaceObservation],
136                let face = observations.first else {
137              outputPose(yaw: 0, pitch: 0, roll: 0, valid: false)
138              return
139          }
140  
141          // VNFaceObservation has yaw, pitch, roll as optional NSNumber
142          let yaw = face.yaw?.floatValue ?? 0
143          let pitch = face.pitch?.floatValue ?? 0
144          let roll = face.roll?.floatValue ?? 0
145  
146          outputPose(yaw: yaw, pitch: pitch, roll: roll, valid: true)
147      }
148  
149      func handleFaceLandmarks(request: VNRequest, error: Error?) {
150          // Can be extended to get more precise eye positions
151      }
152  
153      func outputPose(yaw: Float, pitch: Float, roll: Float, valid: Bool) {
154          let timestamp = ISO8601DateFormatter().string(from: Date())
155          let json: [String: Any] = [
156              "type": "head_pose",
157              "timestamp": timestamp,
158              "yaw": yaw,
159              "pitch": pitch,
160              "roll": roll,
161              "valid": valid
162          ]
163  
164          if let data = try? JSONSerialization.data(withJSONObject: json),
165             let str = String(data: data, encoding: .utf8) {
166              print(str)
167              fflush(stdout)
168          }
169      }
170  }
171  
172  // Main
173  let tracker = HeadPoseTracker()
174  tracker.start()
175  
176  // Run for a while (will be killed by parent process)
177  RunLoop.main.run()
178  '''
179  
180  
181  class HeadPoseTracker:
182      """
183      Tracks head pose using MacBook's FaceTime camera.
184  
185      Uses Vision framework via a Swift subprocess to detect
186      face orientation (yaw, pitch, roll).
187  
188      Usage:
189          tracker = HeadPoseTracker()
190          tracker.start()
191  
192          # Get current pose
193          pose = tracker.get_pose()
194          if pose and pose.valid:
195              screen = pose.infer_screen()
196              print(f"Looking at: {screen.value}")
197  
198          tracker.stop()
199      """
200  
201      def __init__(
202          self,
203          left_threshold: float = -0.25,
204          right_threshold: float = 0.25,
205      ):
206          self.left_threshold = left_threshold
207          self.right_threshold = right_threshold
208  
209          self._process: Optional[subprocess.Popen] = None
210          self._reader_thread: Optional[threading.Thread] = None
211          self._stop_event = threading.Event()
212  
213          self._current_pose: Optional[HeadPose] = None
214          self._pose_lock = threading.Lock()
215  
216          # Callbacks
217          self._on_pose: List[Callable[[HeadPose], None]] = []
218          self._on_screen_change: List[Callable[[AttentionScreen, AttentionScreen], None]] = []
219  
220          self._last_screen = AttentionScreen.UNKNOWN
221  
222          # Path to compiled Swift tracker
223          self._swift_binary = Path.home() / ".sovereign" / "head_pose_tracker"
224  
225      def start(self) -> bool:
226          """Start head pose tracking."""
227          # Ensure Swift binary exists
228          if not self._ensure_swift_binary():
229              print("[HeadPoseTracker] Failed to compile Swift tracker")
230              return False
231  
232          try:
233              self._process = subprocess.Popen(
234                  [str(self._swift_binary)],
235                  stdout=subprocess.PIPE,
236                  stderr=subprocess.PIPE,
237                  text=True,
238                  bufsize=1,
239              )
240  
241              self._stop_event.clear()
242              self._reader_thread = threading.Thread(
243                  target=self._read_output,
244                  daemon=True,
245                  name="HeadPoseReader"
246              )
247              self._reader_thread.start()
248  
249              print("[HeadPoseTracker] Started")
250              return True
251  
252          except Exception as e:
253              print(f"[HeadPoseTracker] Failed to start: {e}")
254              return False
255  
256      def stop(self):
257          """Stop head pose tracking."""
258          self._stop_event.set()
259  
260          if self._process:
261              self._process.terminate()
262              try:
263                  self._process.wait(timeout=2)
264              except subprocess.TimeoutExpired:
265                  self._process.kill()
266              self._process = None
267  
268          if self._reader_thread:
269              self._reader_thread.join(timeout=1)
270              self._reader_thread = None
271  
272          print("[HeadPoseTracker] Stopped")
273  
274      def get_pose(self) -> Optional[HeadPose]:
275          """Get the most recent head pose."""
276          with self._pose_lock:
277              return self._current_pose
278  
279      def get_screen(self) -> AttentionScreen:
280          """Get which screen the operator is looking at."""
281          pose = self.get_pose()
282          if pose:
283              return pose.infer_screen(self.left_threshold, self.right_threshold)
284          return AttentionScreen.UNKNOWN
285  
286      def on_pose(self, callback: Callable[[HeadPose], None]):
287          """Register callback for pose updates."""
288          self._on_pose.append(callback)
289  
290      def on_screen_change(self, callback: Callable[[AttentionScreen, AttentionScreen], None]):
291          """Register callback for screen changes (old_screen, new_screen)."""
292          self._on_screen_change.append(callback)
293  
294      def _ensure_swift_binary(self) -> bool:
295          """Compile Swift tracker if needed."""
296          if self._swift_binary.exists():
297              return True
298  
299          print("[HeadPoseTracker] Compiling Swift head pose tracker...")
300  
301          # Write Swift source
302          swift_source = self._swift_binary.with_suffix('.swift')
303          swift_source.parent.mkdir(parents=True, exist_ok=True)
304          swift_source.write_text(SWIFT_HEAD_POSE_TRACKER)
305  
306          # Compile
307          try:
308              result = subprocess.run(
309                  [
310                      'swiftc',
311                      '-O',
312                      '-o', str(self._swift_binary),
313                      '-framework', 'AVFoundation',
314                      '-framework', 'Vision',
315                      '-framework', 'CoreMedia',
316                      str(swift_source),
317                  ],
318                  capture_output=True,
319                  text=True,
320                  timeout=60,
321              )
322  
323              if result.returncode != 0:
324                  print(f"[HeadPoseTracker] Compile failed: {result.stderr}")
325                  return False
326  
327              print("[HeadPoseTracker] Compiled successfully")
328              return True
329  
330          except Exception as e:
331              print(f"[HeadPoseTracker] Compile error: {e}")
332              return False
333  
334      def _read_output(self):
335          """Read head pose data from Swift subprocess."""
336          while not self._stop_event.is_set() and self._process:
337              try:
338                  line = self._process.stdout.readline()
339                  if not line:
340                      time.sleep(0.01)
341                      continue
342  
343                  self._process_line(line.strip())
344  
345              except Exception as e:
346                  if not self._stop_event.is_set():
347                      print(f"[HeadPoseTracker] Read error: {e}")
348                  break
349  
350      def _process_line(self, line: str):
351          """Process a JSON line from Swift tracker."""
352          try:
353              data = json.loads(line)
354  
355              if data.get("type") == "head_pose":
356                  pose = HeadPose(
357                      timestamp=datetime.fromisoformat(data["timestamp"].replace('Z', '+00:00')),
358                      yaw=data["yaw"],
359                      pitch=data["pitch"],
360                      roll=data["roll"],
361                      valid=data["valid"],
362                  )
363  
364                  with self._pose_lock:
365                      self._current_pose = pose
366  
367                  # Notify callbacks
368                  for callback in self._on_pose:
369                      try:
370                          callback(pose)
371                      except Exception as e:
372                          print(f"[HeadPoseTracker] Callback error: {e}")
373  
374                  # Check for screen change
375                  new_screen = pose.infer_screen(self.left_threshold, self.right_threshold)
376                  if new_screen != self._last_screen:
377                      for callback in self._on_screen_change:
378                          try:
379                              callback(self._last_screen, new_screen)
380                          except Exception as e:
381                              print(f"[HeadPoseTracker] Screen change callback error: {e}")
382                      self._last_screen = new_screen
383  
384          except json.JSONDecodeError:
385              pass
386          except Exception as e:
387              print(f"[HeadPoseTracker] Process error: {e}")
388  
389  
390  class MultiMonitorAttention:
391      """
392      Fuses Tobii eye tracking with head pose for multi-monitor attention.
393  
394      When Tobii has valid gaze → use precise coordinates on main monitor
395      When Tobii gaze invalid + head left → infer MacBook attention
396      When Tobii gaze invalid + head right → infer iPad attention
397      """
398  
399      def __init__(
400          self,
401          head_tracker: HeadPoseTracker,
402          screen_names: dict = None,
403      ):
404          self.head_tracker = head_tracker
405          self.screen_names = screen_names or {
406              AttentionScreen.LEFT: "MacBook",
407              AttentionScreen.CENTER: "Main Monitor",
408              AttentionScreen.RIGHT: "iPad",
409          }
410  
411          self._tobii_valid = False
412          self._current_screen = AttentionScreen.CENTER
413          self._callbacks: List[Callable[[AttentionScreen, str], None]] = []
414  
415      def update_tobii_validity(self, valid: bool):
416          """Called when Tobii gaze validity changes."""
417          was_valid = self._tobii_valid
418          self._tobii_valid = valid
419  
420          if was_valid and not valid:
421              # Lost Tobii tracking, use head pose to determine screen
422              screen = self.head_tracker.get_screen()
423              if screen != AttentionScreen.UNKNOWN:
424                  self._update_screen(screen)
425          elif not was_valid and valid:
426              # Regained Tobii tracking, back to main monitor
427              self._update_screen(AttentionScreen.CENTER)
428  
429      def _update_screen(self, screen: AttentionScreen):
430          """Update current screen and notify."""
431          if screen != self._current_screen:
432              self._current_screen = screen
433              name = self.screen_names.get(screen, screen.value)
434  
435              for callback in self._callbacks:
436                  try:
437                      callback(screen, name)
438                  except Exception as e:
439                      print(f"[MultiMonitorAttention] Callback error: {e}")
440  
441      def on_screen_change(self, callback: Callable[[AttentionScreen, str], None]):
442          """Register callback for screen changes."""
443          self._callbacks.append(callback)
444  
445      def get_current_screen(self) -> tuple[AttentionScreen, str]:
446          """Get current attention screen."""
447          name = self.screen_names.get(self._current_screen, self._current_screen.value)
448          return self._current_screen, name
449  
450  
451  if __name__ == "__main__":
452      print("=== Head Pose Tracker Test ===\n")
453  
454      tracker = HeadPoseTracker()
455  
456      def on_pose(pose: HeadPose):
457          if pose.valid:
458              screen = pose.infer_screen()
459              print(f"Yaw: {pose.yaw_degrees:+.1f}° → {screen.value}")
460  
461      def on_screen_change(old: AttentionScreen, new: AttentionScreen):
462          print(f"\n*** Screen changed: {old.value} → {new.value} ***\n")
463  
464      tracker.on_pose(on_pose)
465      tracker.on_screen_change(on_screen_change)
466  
467      print("Starting head pose tracking...")
468      print("Turn your head left/right to see screen detection")
469      print("Press Ctrl+C to stop\n")
470  
471      if tracker.start():
472          try:
473              while True:
474                  time.sleep(0.1)
475          except KeyboardInterrupt:
476              pass
477  
478      tracker.stop()
479      print("\nDone.")