/ core / attention / multi_monitor.py
multi_monitor.py
  1  """
  2  Multi-Monitor Attention Detection
  3  
  4  Detects which screen the operator is attending to by combining:
  5  1. Tobii gaze validity (valid = main monitor, invalid = looking away)
  6  2. Gaze edge detection (coordinates at screen edges suggest looking past)
  7  3. (Future) Head pose from MacBook camera
  8  
  9  Screen Layout:
 10      [MacBook]  [Main Monitor]  [iPad]
 11         LEFT       CENTER        RIGHT
 12  
 13  When Tobii gaze is invalid or at left edge → likely looking at MacBook
 14  When Tobii gaze is valid in center → looking at main monitor
 15  When Tobii gaze is invalid or at right edge → likely looking at iPad
 16  """
 17  
 18  from dataclasses import dataclass, field
 19  from datetime import datetime, timedelta
 20  from typing import Optional, Callable, List, Dict
 21  from enum import Enum
 22  import threading
 23  import time
 24  
 25  # Import shared AttentionScreen from head_pose
 26  from .head_pose import AttentionScreen
 27  
 28  
 29  @dataclass
 30  class ScreenConfig:
 31      """Configuration for multi-monitor setup."""
 32      left_name: str = "MacBook"
 33      center_name: str = "Main Monitor"
 34      right_name: str = "iPad"
 35  
 36      # Thresholds for edge detection (normalized 0-1)
 37      left_edge: float = 0.05    # x < this = looking left
 38      right_edge: float = 0.95   # x > this = looking right
 39  
 40      # How long gaze must be invalid/at-edge before switching screens
 41      switch_delay_ms: float = 300
 42  
 43  
 44  @dataclass
 45  class AttentionState:
 46      """Current multi-monitor attention state."""
 47      screen: AttentionScreen
 48      screen_name: str
 49      confidence: float  # 0-1, how confident we are
 50      since: datetime
 51      gaze_valid: bool
 52      last_gaze_x: Optional[float] = None
 53      last_gaze_y: Optional[float] = None
 54  
 55  
 56  class MultiMonitorTracker:
 57      """
 58      Tracks attention across multiple monitors.
 59  
 60      Uses gaze validity and edge detection to infer which
 61      screen the operator is looking at.
 62  
 63      Usage:
 64          tracker = MultiMonitorTracker()
 65  
 66          # Feed gaze data
 67          tracker.update_gaze(x=0.5, y=0.5, valid=True)
 68  
 69          # Get current screen
 70          state = tracker.get_state()
 71          print(f"Looking at: {state.screen_name}")
 72  
 73          # Register callback for screen changes
 74          tracker.on_screen_change(lambda old, new: print(f"{old} -> {new}"))
 75      """
 76  
 77      def __init__(self, config: ScreenConfig = None):
 78          self.config = config or ScreenConfig()
 79  
 80          self._current_screen = AttentionScreen.CENTER
 81          self._screen_since = datetime.now()
 82          self._gaze_valid = True
 83          self._last_x: Optional[float] = None
 84          self._last_y: Optional[float] = None
 85  
 86          # Edge/invalid tracking for debouncing
 87          self._edge_start: Optional[datetime] = None
 88          self._edge_direction: Optional[AttentionScreen] = None
 89  
 90          # Callbacks
 91          self._on_change: List[Callable[[AttentionState, AttentionState], None]] = []
 92  
 93          self._lock = threading.Lock()
 94  
 95      def update_gaze(
 96          self,
 97          x: Optional[float],
 98          y: Optional[float],
 99          valid: bool,
100          left_valid: bool = True,
101          right_valid: bool = True,
102      ) -> Optional[AttentionScreen]:
103          """
104          Update with new gaze data.
105  
106          Returns the new screen if it changed, None otherwise.
107          """
108          with self._lock:
109              self._gaze_valid = valid and (left_valid or right_valid)
110              self._last_x = x
111              self._last_y = y
112  
113              # Determine intended screen from gaze
114              intended = self._infer_screen(x, y, valid)
115  
116              # Debounce screen changes
117              now = datetime.now()
118  
119              if intended != self._current_screen:
120                  if self._edge_direction != intended:
121                      # New direction, start timing
122                      self._edge_start = now
123                      self._edge_direction = intended
124                  elif self._edge_start:
125                      # Same direction, check if enough time passed
126                      elapsed_ms = (now - self._edge_start).total_seconds() * 1000
127                      if elapsed_ms >= self.config.switch_delay_ms:
128                          # Switch screens
129                          old_state = self._make_state()
130                          self._current_screen = intended
131                          self._screen_since = now
132                          new_state = self._make_state()
133  
134                          # Notify callbacks
135                          for callback in self._on_change:
136                              try:
137                                  callback(old_state, new_state)
138                              except Exception as e:
139                                  print(f"[MultiMonitor] Callback error: {e}")
140  
141                          self._edge_start = None
142                          self._edge_direction = None
143                          return intended
144              else:
145                  # Back to current screen, reset timing
146                  self._edge_start = None
147                  self._edge_direction = None
148  
149              return None
150  
151      def _infer_screen(
152          self,
153          x: Optional[float],
154          y: Optional[float],
155          valid: bool,
156      ) -> AttentionScreen:
157          """Infer which screen based on gaze data."""
158  
159          if not valid or x is None:
160              # Gaze invalid - we're looking away from main monitor
161              # Use last known position to guess direction
162              if self._last_x is not None:
163                  if self._last_x < 0.3:
164                      return AttentionScreen.LEFT
165                  elif self._last_x > 0.7:
166                      return AttentionScreen.RIGHT
167              # Default to unknown if we can't tell
168              return AttentionScreen.UNKNOWN
169  
170          # Gaze is valid - check if at edges
171          if x < self.config.left_edge:
172              return AttentionScreen.LEFT
173          elif x > self.config.right_edge:
174              return AttentionScreen.RIGHT
175          else:
176              return AttentionScreen.CENTER
177  
178      def _make_state(self) -> AttentionState:
179          """Create current attention state."""
180          screen = self._current_screen
181          names = {
182              AttentionScreen.LEFT: self.config.left_name,
183              AttentionScreen.CENTER: self.config.center_name,
184              AttentionScreen.RIGHT: self.config.right_name,
185              AttentionScreen.UNKNOWN: "Unknown",
186          }
187  
188          # Confidence based on gaze validity
189          if self._gaze_valid and screen == AttentionScreen.CENTER:
190              confidence = 1.0
191          elif not self._gaze_valid:
192              confidence = 0.6  # Less confident when gaze is lost
193          else:
194              confidence = 0.8
195  
196          return AttentionState(
197              screen=screen,
198              screen_name=names.get(screen, screen.value),
199              confidence=confidence,
200              since=self._screen_since,
201              gaze_valid=self._gaze_valid,
202              last_gaze_x=self._last_x,
203              last_gaze_y=self._last_y,
204          )
205  
206      def get_state(self) -> AttentionState:
207          """Get current attention state."""
208          with self._lock:
209              return self._make_state()
210  
211      def on_screen_change(
212          self,
213          callback: Callable[[AttentionState, AttentionState], None]
214      ):
215          """Register callback for screen changes."""
216          self._on_change.append(callback)
217  
218      def get_screen(self) -> tuple[AttentionScreen, str]:
219          """Get current screen (enum, name)."""
220          state = self.get_state()
221          return state.screen, state.screen_name
222  
223  
224  class IntegratedAttentionTracker:
225      """
226      Integrates Tobii gaze tracking with multi-monitor detection.
227  
228      Wraps TalonIntegration and adds multi-monitor awareness.
229      """
230  
231      def __init__(self, talon_tracker, config: ScreenConfig = None):
232          self.talon = talon_tracker
233          self.multi_monitor = MultiMonitorTracker(config)
234  
235          # Wire up gaze updates
236          self.talon.on_gaze_point(self._on_gaze)
237  
238      def _on_gaze(self, point):
239          """Forward gaze data to multi-monitor tracker."""
240          self.multi_monitor.update_gaze(
241              x=point.x,
242              y=point.y,
243              valid=point.left_valid or point.right_valid,
244              left_valid=point.left_valid,
245              right_valid=point.right_valid,
246          )
247  
248      def start(self):
249          """Start tracking."""
250          self.talon.start()
251  
252      def stop(self):
253          """Stop tracking."""
254          self.talon.stop()
255  
256      def get_attention_state(self) -> AttentionState:
257          """Get full attention state including screen."""
258          return self.multi_monitor.get_state()
259  
260      def on_screen_change(self, callback):
261          """Register callback for screen changes."""
262          self.multi_monitor.on_screen_change(callback)
263  
264  
265  @dataclass
266  class ScreenAttention:
267      """Attention on a specific screen."""
268      screen: AttentionScreen
269      screen_name: str
270      confidence: float
271      gaze_x: Optional[float] = None
272      gaze_y: Optional[float] = None
273      since: datetime = field(default_factory=datetime.now)
274  
275  
276  class MultiMonitorDetector:
277      """
278      Simplified multi-monitor detector for use with orchestrator.
279  
280      Just processes gaze points and determines which screen
281      the operator is looking at.
282      """
283  
284      def __init__(
285          self,
286          screen_names: Dict[AttentionScreen, str] = None,
287          left_edge: float = 0.05,
288          right_edge: float = 0.95,
289          switch_delay_ms: float = 200
290      ):
291          self.screen_names = screen_names or {
292              AttentionScreen.LEFT: "MacBook",
293              AttentionScreen.CENTER: "Main Monitor",
294              AttentionScreen.RIGHT: "iPad",
295          }
296          self.left_edge = left_edge
297          self.right_edge = right_edge
298          self.switch_delay_ms = switch_delay_ms
299  
300          self._current = AttentionScreen.CENTER
301          self._pending: Optional[AttentionScreen] = None
302          self._pending_start: Optional[datetime] = None
303          self._last_x: Optional[float] = None
304          self._last_y: Optional[float] = None
305          self._gaze_valid = True
306  
307          self._on_change: List[Callable[[AttentionScreen, str], None]] = []
308  
309      def update_gaze(self, gaze_point) -> Optional[AttentionScreen]:
310          """
311          Update with a gaze point.
312  
313          Compatible with TalonIntegration's on_gaze_point callback.
314          """
315          x = gaze_point.x
316          y = gaze_point.y
317          valid = gaze_point.left_valid or gaze_point.right_valid
318  
319          self._gaze_valid = valid
320          self._last_x = x
321          self._last_y = y
322  
323          # Infer intended screen
324          intended = self._infer_screen(x, y, valid)
325  
326          if intended == self._current:
327              self._pending = None
328              self._pending_start = None
329              return None
330  
331          # Screen change pending
332          now = datetime.now()
333  
334          if self._pending != intended:
335              self._pending = intended
336              self._pending_start = now
337              return None
338  
339          # Check if delay elapsed
340          if self._pending_start:
341              elapsed_ms = (now - self._pending_start).total_seconds() * 1000
342              if elapsed_ms >= self.switch_delay_ms:
343                  old = self._current
344                  self._current = intended
345                  self._pending = None
346                  self._pending_start = None
347  
348                  # Notify
349                  name = self.screen_names.get(intended, intended.value)
350                  for callback in self._on_change:
351                      try:
352                          callback(intended, name)
353                      except Exception as e:
354                          print(f"[MultiMonitorDetector] Callback error: {e}")
355  
356                  return intended
357  
358          return None
359  
360      def _infer_screen(
361          self,
362          x: Optional[float],
363          y: Optional[float],
364          valid: bool
365      ) -> AttentionScreen:
366          """Infer screen from gaze data."""
367          if not valid or x is None:
368              # Lost tracking - use last position to guess
369              if self._last_x is not None and self._last_x < 0.3:
370                  return AttentionScreen.LEFT
371              elif self._last_x is not None and self._last_x > 0.7:
372                  return AttentionScreen.RIGHT
373              return self._current  # Stay on current
374  
375          if x < self.left_edge:
376              return AttentionScreen.LEFT
377          elif x > self.right_edge:
378              return AttentionScreen.RIGHT
379          return AttentionScreen.CENTER
380  
381      def get_current(self) -> tuple[AttentionScreen, str]:
382          """Get current screen and name."""
383          name = self.screen_names.get(self._current, self._current.value)
384          return self._current, name
385  
386      def on_screen_change(self, callback: Callable[[AttentionScreen, str], None]):
387          """Register callback for screen changes."""
388          self._on_change.append(callback)
389  
390  
391  if __name__ == "__main__":
392      print("=== Multi-Monitor Attention Tracker ===\n")
393  
394      # Test with simulated gaze data
395      tracker = MultiMonitorTracker(ScreenConfig(
396          left_name="MacBook Pro",
397          center_name="Studio Display",
398          right_name="iPad Pro",
399      ))
400  
401      def on_change(old: AttentionState, new: AttentionState):
402          print(f"\n*** SCREEN CHANGE: {old.screen_name} → {new.screen_name} ***\n")
403  
404      tracker.on_screen_change(on_change)
405  
406      # Simulate gaze movement
407      print("Simulating gaze sequence...")
408      sequences = [
409          (0.5, 0.5, True, "Center of main monitor"),
410          (0.5, 0.5, True, "Still center"),
411          (0.1, 0.5, True, "Moving left"),
412          (0.02, 0.5, True, "At left edge"),
413          (0.02, 0.5, True, "Still at left edge"),
414          (None, None, False, "Gaze lost (looking at MacBook)"),
415          (None, None, False, "Still lost"),
416          (0.5, 0.5, True, "Back to center"),
417          (0.9, 0.5, True, "Moving right"),
418          (0.98, 0.5, True, "At right edge"),
419          (None, None, False, "Gaze lost (looking at iPad)"),
420      ]
421  
422      for x, y, valid, desc in sequences:
423          result = tracker.update_gaze(x, y, valid)
424          state = tracker.get_state()
425          print(f"{desc:40} → {state.screen_name:15} (conf={state.confidence:.1f})")
426          time.sleep(0.15)  # Simulate time passing
427  
428      print("\nDone.")