multi_monitor.py
1 """ 2 Multi-Monitor Attention Detection 3 4 Detects which screen the operator is attending to by combining: 5 1. Tobii gaze validity (valid = main monitor, invalid = looking away) 6 2. Gaze edge detection (coordinates at screen edges suggest looking past) 7 3. (Future) Head pose from MacBook camera 8 9 Screen Layout: 10 [MacBook] [Main Monitor] [iPad] 11 LEFT CENTER RIGHT 12 13 When Tobii gaze is invalid or at left edge → likely looking at MacBook 14 When Tobii gaze is valid in center → looking at main monitor 15 When Tobii gaze is invalid or at right edge → likely looking at iPad 16 """ 17 18 from dataclasses import dataclass, field 19 from datetime import datetime, timedelta 20 from typing import Optional, Callable, List, Dict 21 from enum import Enum 22 import threading 23 import time 24 25 # Import shared AttentionScreen from head_pose 26 from .head_pose import AttentionScreen 27 28 29 @dataclass 30 class ScreenConfig: 31 """Configuration for multi-monitor setup.""" 32 left_name: str = "MacBook" 33 center_name: str = "Main Monitor" 34 right_name: str = "iPad" 35 36 # Thresholds for edge detection (normalized 0-1) 37 left_edge: float = 0.05 # x < this = looking left 38 right_edge: float = 0.95 # x > this = looking right 39 40 # How long gaze must be invalid/at-edge before switching screens 41 switch_delay_ms: float = 300 42 43 44 @dataclass 45 class AttentionState: 46 """Current multi-monitor attention state.""" 47 screen: AttentionScreen 48 screen_name: str 49 confidence: float # 0-1, how confident we are 50 since: datetime 51 gaze_valid: bool 52 last_gaze_x: Optional[float] = None 53 last_gaze_y: Optional[float] = None 54 55 56 class MultiMonitorTracker: 57 """ 58 Tracks attention across multiple monitors. 59 60 Uses gaze validity and edge detection to infer which 61 screen the operator is looking at. 62 63 Usage: 64 tracker = MultiMonitorTracker() 65 66 # Feed gaze data 67 tracker.update_gaze(x=0.5, y=0.5, valid=True) 68 69 # Get current screen 70 state = tracker.get_state() 71 print(f"Looking at: {state.screen_name}") 72 73 # Register callback for screen changes 74 tracker.on_screen_change(lambda old, new: print(f"{old} -> {new}")) 75 """ 76 77 def __init__(self, config: ScreenConfig = None): 78 self.config = config or ScreenConfig() 79 80 self._current_screen = AttentionScreen.CENTER 81 self._screen_since = datetime.now() 82 self._gaze_valid = True 83 self._last_x: Optional[float] = None 84 self._last_y: Optional[float] = None 85 86 # Edge/invalid tracking for debouncing 87 self._edge_start: Optional[datetime] = None 88 self._edge_direction: Optional[AttentionScreen] = None 89 90 # Callbacks 91 self._on_change: List[Callable[[AttentionState, AttentionState], None]] = [] 92 93 self._lock = threading.Lock() 94 95 def update_gaze( 96 self, 97 x: Optional[float], 98 y: Optional[float], 99 valid: bool, 100 left_valid: bool = True, 101 right_valid: bool = True, 102 ) -> Optional[AttentionScreen]: 103 """ 104 Update with new gaze data. 105 106 Returns the new screen if it changed, None otherwise. 107 """ 108 with self._lock: 109 self._gaze_valid = valid and (left_valid or right_valid) 110 self._last_x = x 111 self._last_y = y 112 113 # Determine intended screen from gaze 114 intended = self._infer_screen(x, y, valid) 115 116 # Debounce screen changes 117 now = datetime.now() 118 119 if intended != self._current_screen: 120 if self._edge_direction != intended: 121 # New direction, start timing 122 self._edge_start = now 123 self._edge_direction = intended 124 elif self._edge_start: 125 # Same direction, check if enough time passed 126 elapsed_ms = (now - self._edge_start).total_seconds() * 1000 127 if elapsed_ms >= self.config.switch_delay_ms: 128 # Switch screens 129 old_state = self._make_state() 130 self._current_screen = intended 131 self._screen_since = now 132 new_state = self._make_state() 133 134 # Notify callbacks 135 for callback in self._on_change: 136 try: 137 callback(old_state, new_state) 138 except Exception as e: 139 print(f"[MultiMonitor] Callback error: {e}") 140 141 self._edge_start = None 142 self._edge_direction = None 143 return intended 144 else: 145 # Back to current screen, reset timing 146 self._edge_start = None 147 self._edge_direction = None 148 149 return None 150 151 def _infer_screen( 152 self, 153 x: Optional[float], 154 y: Optional[float], 155 valid: bool, 156 ) -> AttentionScreen: 157 """Infer which screen based on gaze data.""" 158 159 if not valid or x is None: 160 # Gaze invalid - we're looking away from main monitor 161 # Use last known position to guess direction 162 if self._last_x is not None: 163 if self._last_x < 0.3: 164 return AttentionScreen.LEFT 165 elif self._last_x > 0.7: 166 return AttentionScreen.RIGHT 167 # Default to unknown if we can't tell 168 return AttentionScreen.UNKNOWN 169 170 # Gaze is valid - check if at edges 171 if x < self.config.left_edge: 172 return AttentionScreen.LEFT 173 elif x > self.config.right_edge: 174 return AttentionScreen.RIGHT 175 else: 176 return AttentionScreen.CENTER 177 178 def _make_state(self) -> AttentionState: 179 """Create current attention state.""" 180 screen = self._current_screen 181 names = { 182 AttentionScreen.LEFT: self.config.left_name, 183 AttentionScreen.CENTER: self.config.center_name, 184 AttentionScreen.RIGHT: self.config.right_name, 185 AttentionScreen.UNKNOWN: "Unknown", 186 } 187 188 # Confidence based on gaze validity 189 if self._gaze_valid and screen == AttentionScreen.CENTER: 190 confidence = 1.0 191 elif not self._gaze_valid: 192 confidence = 0.6 # Less confident when gaze is lost 193 else: 194 confidence = 0.8 195 196 return AttentionState( 197 screen=screen, 198 screen_name=names.get(screen, screen.value), 199 confidence=confidence, 200 since=self._screen_since, 201 gaze_valid=self._gaze_valid, 202 last_gaze_x=self._last_x, 203 last_gaze_y=self._last_y, 204 ) 205 206 def get_state(self) -> AttentionState: 207 """Get current attention state.""" 208 with self._lock: 209 return self._make_state() 210 211 def on_screen_change( 212 self, 213 callback: Callable[[AttentionState, AttentionState], None] 214 ): 215 """Register callback for screen changes.""" 216 self._on_change.append(callback) 217 218 def get_screen(self) -> tuple[AttentionScreen, str]: 219 """Get current screen (enum, name).""" 220 state = self.get_state() 221 return state.screen, state.screen_name 222 223 224 class IntegratedAttentionTracker: 225 """ 226 Integrates Tobii gaze tracking with multi-monitor detection. 227 228 Wraps TalonIntegration and adds multi-monitor awareness. 229 """ 230 231 def __init__(self, talon_tracker, config: ScreenConfig = None): 232 self.talon = talon_tracker 233 self.multi_monitor = MultiMonitorTracker(config) 234 235 # Wire up gaze updates 236 self.talon.on_gaze_point(self._on_gaze) 237 238 def _on_gaze(self, point): 239 """Forward gaze data to multi-monitor tracker.""" 240 self.multi_monitor.update_gaze( 241 x=point.x, 242 y=point.y, 243 valid=point.left_valid or point.right_valid, 244 left_valid=point.left_valid, 245 right_valid=point.right_valid, 246 ) 247 248 def start(self): 249 """Start tracking.""" 250 self.talon.start() 251 252 def stop(self): 253 """Stop tracking.""" 254 self.talon.stop() 255 256 def get_attention_state(self) -> AttentionState: 257 """Get full attention state including screen.""" 258 return self.multi_monitor.get_state() 259 260 def on_screen_change(self, callback): 261 """Register callback for screen changes.""" 262 self.multi_monitor.on_screen_change(callback) 263 264 265 @dataclass 266 class ScreenAttention: 267 """Attention on a specific screen.""" 268 screen: AttentionScreen 269 screen_name: str 270 confidence: float 271 gaze_x: Optional[float] = None 272 gaze_y: Optional[float] = None 273 since: datetime = field(default_factory=datetime.now) 274 275 276 class MultiMonitorDetector: 277 """ 278 Simplified multi-monitor detector for use with orchestrator. 279 280 Just processes gaze points and determines which screen 281 the operator is looking at. 282 """ 283 284 def __init__( 285 self, 286 screen_names: Dict[AttentionScreen, str] = None, 287 left_edge: float = 0.05, 288 right_edge: float = 0.95, 289 switch_delay_ms: float = 200 290 ): 291 self.screen_names = screen_names or { 292 AttentionScreen.LEFT: "MacBook", 293 AttentionScreen.CENTER: "Main Monitor", 294 AttentionScreen.RIGHT: "iPad", 295 } 296 self.left_edge = left_edge 297 self.right_edge = right_edge 298 self.switch_delay_ms = switch_delay_ms 299 300 self._current = AttentionScreen.CENTER 301 self._pending: Optional[AttentionScreen] = None 302 self._pending_start: Optional[datetime] = None 303 self._last_x: Optional[float] = None 304 self._last_y: Optional[float] = None 305 self._gaze_valid = True 306 307 self._on_change: List[Callable[[AttentionScreen, str], None]] = [] 308 309 def update_gaze(self, gaze_point) -> Optional[AttentionScreen]: 310 """ 311 Update with a gaze point. 312 313 Compatible with TalonIntegration's on_gaze_point callback. 314 """ 315 x = gaze_point.x 316 y = gaze_point.y 317 valid = gaze_point.left_valid or gaze_point.right_valid 318 319 self._gaze_valid = valid 320 self._last_x = x 321 self._last_y = y 322 323 # Infer intended screen 324 intended = self._infer_screen(x, y, valid) 325 326 if intended == self._current: 327 self._pending = None 328 self._pending_start = None 329 return None 330 331 # Screen change pending 332 now = datetime.now() 333 334 if self._pending != intended: 335 self._pending = intended 336 self._pending_start = now 337 return None 338 339 # Check if delay elapsed 340 if self._pending_start: 341 elapsed_ms = (now - self._pending_start).total_seconds() * 1000 342 if elapsed_ms >= self.switch_delay_ms: 343 old = self._current 344 self._current = intended 345 self._pending = None 346 self._pending_start = None 347 348 # Notify 349 name = self.screen_names.get(intended, intended.value) 350 for callback in self._on_change: 351 try: 352 callback(intended, name) 353 except Exception as e: 354 print(f"[MultiMonitorDetector] Callback error: {e}") 355 356 return intended 357 358 return None 359 360 def _infer_screen( 361 self, 362 x: Optional[float], 363 y: Optional[float], 364 valid: bool 365 ) -> AttentionScreen: 366 """Infer screen from gaze data.""" 367 if not valid or x is None: 368 # Lost tracking - use last position to guess 369 if self._last_x is not None and self._last_x < 0.3: 370 return AttentionScreen.LEFT 371 elif self._last_x is not None and self._last_x > 0.7: 372 return AttentionScreen.RIGHT 373 return self._current # Stay on current 374 375 if x < self.left_edge: 376 return AttentionScreen.LEFT 377 elif x > self.right_edge: 378 return AttentionScreen.RIGHT 379 return AttentionScreen.CENTER 380 381 def get_current(self) -> tuple[AttentionScreen, str]: 382 """Get current screen and name.""" 383 name = self.screen_names.get(self._current, self._current.value) 384 return self._current, name 385 386 def on_screen_change(self, callback: Callable[[AttentionScreen, str], None]): 387 """Register callback for screen changes.""" 388 self._on_change.append(callback) 389 390 391 if __name__ == "__main__": 392 print("=== Multi-Monitor Attention Tracker ===\n") 393 394 # Test with simulated gaze data 395 tracker = MultiMonitorTracker(ScreenConfig( 396 left_name="MacBook Pro", 397 center_name="Studio Display", 398 right_name="iPad Pro", 399 )) 400 401 def on_change(old: AttentionState, new: AttentionState): 402 print(f"\n*** SCREEN CHANGE: {old.screen_name} → {new.screen_name} ***\n") 403 404 tracker.on_screen_change(on_change) 405 406 # Simulate gaze movement 407 print("Simulating gaze sequence...") 408 sequences = [ 409 (0.5, 0.5, True, "Center of main monitor"), 410 (0.5, 0.5, True, "Still center"), 411 (0.1, 0.5, True, "Moving left"), 412 (0.02, 0.5, True, "At left edge"), 413 (0.02, 0.5, True, "Still at left edge"), 414 (None, None, False, "Gaze lost (looking at MacBook)"), 415 (None, None, False, "Still lost"), 416 (0.5, 0.5, True, "Back to center"), 417 (0.9, 0.5, True, "Moving right"), 418 (0.98, 0.5, True, "At right edge"), 419 (None, None, False, "Gaze lost (looking at iPad)"), 420 ] 421 422 for x, y, valid, desc in sequences: 423 result = tracker.update_gaze(x, y, valid) 424 state = tracker.get_state() 425 print(f"{desc:40} → {state.screen_name:15} (conf={state.confidence:.1f})") 426 time.sleep(0.15) # Simulate time passing 427 428 print("\nDone.")