/ integrations / talon / test_gaze_bridge.py
test_gaze_bridge.py
  1  #!/usr/bin/env python3
  2  """
  3  Test script for Talon gaze bridge.
  4  
  5  Run this after setting up Talon with sovereign_gaze_bridge.py
  6  to verify gaze data is flowing correctly.
  7  
  8  Usage:
  9      python test_gaze_bridge.py
 10  """
 11  
 12  import sys
 13  import time
 14  from pathlib import Path
 15  
 16  # Add parent paths for imports
 17  sys.path.insert(0, str(Path(__file__).parent.parent.parent))
 18  
 19  from core.attention.eye_tracking import (
 20      TalonIntegration,
 21      EyeTrackingAttentionBridge,
 22      ScreenRegion,
 23      create_eye_tracking_system,
 24  )
 25  
 26  
 27  def test_connection():
 28      """Test basic connection to Talon bridge."""
 29      print("=" * 50)
 30      print("Testing Talon Gaze Bridge Connection")
 31      print("=" * 50)
 32  
 33      tracker = TalonIntegration()
 34  
 35      print("\n1. Attempting connection...")
 36      if tracker.connect():
 37          print("   ✓ Connected to Talon bridge")
 38          tracker.stop()
 39          return True
 40      else:
 41          print("   ✗ Could not connect")
 42          print("\n   Checklist:")
 43          print("   [ ] Is Talon running?")
 44          print("   [ ] Is sovereign_gaze_bridge.py in ~/.talon/user/?")
 45          print("   [ ] Is Tobii connected and calibrated?")
 46          return False
 47  
 48  
 49  def test_gaze_stream(duration_seconds: float = 10):
 50      """Test receiving gaze data stream."""
 51      print("\n" + "=" * 50)
 52      print(f"Testing Gaze Stream ({duration_seconds}s)")
 53      print("=" * 50)
 54      print("\nLook around the screen to generate gaze data...")
 55  
 56      tracker, bridge = create_eye_tracking_system()
 57  
 58      if not tracker:
 59          print("✗ Could not create eye tracking system")
 60          return False
 61  
 62      # Counters
 63      gaze_count = 0
 64      fixation_count = 0
 65      last_point = None
 66  
 67      def on_gaze(point):
 68          nonlocal gaze_count, last_point
 69          gaze_count += 1
 70          last_point = point
 71  
 72      def on_fixation(fixation):
 73          nonlocal fixation_count
 74          fixation_count += 1
 75          print(f"   Fixation #{fixation_count}: "
 76                f"({fixation.x:.2f}, {fixation.y:.2f}) "
 77                f"{fixation.duration_ms:.0f}ms "
 78                f"intensity={fixation.intensity:.2f}")
 79  
 80      tracker.on_gaze_point(on_gaze)
 81      tracker.on_fixation(on_fixation)
 82  
 83      print("\n2. Starting gaze tracking...")
 84      tracker.start()
 85  
 86      # Collect data for duration
 87      start = time.time()
 88      last_report = start
 89      while time.time() - start < duration_seconds:
 90          time.sleep(0.1)
 91  
 92          # Progress report every 2 seconds
 93          if time.time() - last_report > 2:
 94              if last_point:
 95                  print(f"   ... received {gaze_count} gaze points, "
 96                        f"last at ({last_point.x:.2f}, {last_point.y:.2f})")
 97              else:
 98                  print(f"   ... waiting for gaze data...")
 99              last_report = time.time()
100  
101      tracker.stop()
102  
103      print(f"\n3. Results:")
104      print(f"   Gaze points received: {gaze_count}")
105      print(f"   Fixations detected: {fixation_count}")
106  
107      if gaze_count > 0:
108          rate = gaze_count / duration_seconds
109          print(f"   Sample rate: {rate:.1f} Hz")
110          print("   ✓ Gaze stream working!")
111          return True
112      else:
113          print("   ✗ No gaze data received")
114          return False
115  
116  
117  def test_regions():
118      """Test screen region detection."""
119      print("\n" + "=" * 50)
120      print("Testing Screen Region Detection")
121      print("=" * 50)
122  
123      tracker, bridge = create_eye_tracking_system()
124      if not tracker:
125          print("✗ Could not create eye tracking system")
126          return False
127  
128      # Define test regions (quarters of screen)
129      regions = [
130          ScreenRegion("top-left", 0.0, 0.0, 0.5, 0.5, "tl", "quadrant"),
131          ScreenRegion("top-right", 0.5, 0.0, 1.0, 0.5, "tr", "quadrant"),
132          ScreenRegion("bottom-left", 0.0, 0.5, 0.5, 1.0, "bl", "quadrant"),
133          ScreenRegion("bottom-right", 0.5, 0.5, 1.0, 1.0, "br", "quadrant"),
134      ]
135  
136      for region in regions:
137          tracker.register_region(region)
138  
139      region_hits = {r.name: 0 for r in regions}
140  
141      def on_region(region, fixation):
142          region_hits[region.name] += 1
143          print(f"   Looked at {region.name} "
144                f"({fixation.duration_ms:.0f}ms)")
145  
146      tracker.on_region_enter(on_region)
147  
148      print("\n4. Starting region tracking (10s)...")
149      print("   Look at different corners of the screen")
150      tracker.start()
151  
152      time.sleep(10)
153      tracker.stop()
154  
155      print(f"\n5. Region hits:")
156      for name, count in region_hits.items():
157          print(f"   {name}: {count}")
158  
159      total = sum(region_hits.values())
160      if total > 0:
161          print("   ✓ Region detection working!")
162          return True
163      else:
164          print("   ✗ No regions detected")
165          return False
166  
167  
168  def test_attention_events():
169      """Test AttentionEvent generation."""
170      print("\n" + "=" * 50)
171      print("Testing AttentionEvent Generation")
172      print("=" * 50)
173  
174      tracker, bridge = create_eye_tracking_system()
175      if not tracker:
176          print("✗ Could not create eye tracking system")
177          return False
178  
179      # Register a target
180      bridge.register_target(
181          ScreenRegion("center", 0.25, 0.25, 0.75, 0.75),
182          target_id="test-target-001",
183          target_type="test"
184      )
185  
186      events = []
187  
188      def on_fixation(fixation):
189          event = bridge.fixation_to_attention_event(fixation)
190          if event:
191              events.append(event)
192              print(f"   AttentionEvent: target={event.target_id} "
193                    f"modality={event.modality} "
194                    f"intensity={event.intensity:.2f}")
195  
196      tracker.on_fixation(on_fixation)
197  
198      print("\n6. Starting attention tracking (10s)...")
199      print("   Look at the CENTER of your screen")
200      tracker.start()
201  
202      time.sleep(10)
203      tracker.stop()
204  
205      print(f"\n7. AttentionEvents generated: {len(events)}")
206      if events:
207          print("   ✓ AttentionEvent generation working!")
208          return True
209      else:
210          print("   (No events - look at screen center to trigger)")
211          return False
212  
213  
214  def main():
215      print("\n" + "=" * 50)
216      print("SOVEREIGN OS - TALON GAZE BRIDGE TEST")
217      print("=" * 50)
218      print("\nPrerequisites:")
219      print("  - Talon installed and running")
220      print("  - sovereign_gaze_bridge.py in ~/.talon/user/")
221      print("  - Tobii Eye Tracker 5 connected and calibrated")
222  
223      results = {}
224  
225      # Test 1: Connection
226      results['connection'] = test_connection()
227      if not results['connection']:
228          print("\n" + "=" * 50)
229          print("FAILED: Cannot proceed without connection")
230          print("=" * 50)
231          return 1
232  
233      # Test 2: Gaze stream
234      results['gaze_stream'] = test_gaze_stream(10)
235  
236      # Test 3: Regions
237      results['regions'] = test_regions()
238  
239      # Test 4: Attention events
240      results['attention'] = test_attention_events()
241  
242      # Summary
243      print("\n" + "=" * 50)
244      print("TEST SUMMARY")
245      print("=" * 50)
246      for test, passed in results.items():
247          status = "✓ PASS" if passed else "✗ FAIL"
248          print(f"  {test}: {status}")
249  
250      all_passed = all(results.values())
251      if all_passed:
252          print("\n✓ All tests passed! Gaze tracking is ready.")
253          return 0
254      else:
255          print("\n✗ Some tests failed. Check setup and try again.")
256          return 1
257  
258  
259  if __name__ == "__main__":
260      sys.exit(main())