tracker.py
1 """ 2 Attention Tracker - The Cooperative Eye System 3 4 Implements the core attention tracking loop: 5 1. CAPTURE - Record where operator's gaze lands 6 2. TRAJECTORY - Compute direction through concept space 7 3. PREDICT - Anticipate where gaze is heading 8 4. SURFACE - Bring related items into peripheral vision 9 5. RESPOND - Adapt system behavior based on attention 10 11 The daily note is the single pane of attention - everything surfaces there. 12 13 Key insight: Attention has velocity. It's not just WHERE you looked, 14 it's WHERE YOU'RE HEADING. The trajectory is the predictive signal. 15 """ 16 17 from dataclasses import dataclass, field 18 from datetime import datetime, timedelta 19 from typing import Optional, List, Dict, Any, Tuple 20 from collections import deque 21 import math 22 23 24 @dataclass 25 class AttentionEvent: 26 """A single attention event - operator looked at something.""" 27 timestamp: datetime 28 target_id: str # UUID of bullet/concept 29 target_type: str # 'bullet', 'concept', 'episode', 'link', etc. 30 modality: str # 'read', 'listen', 'highlight', 'search', 'mention' 31 duration_seconds: Optional[float] = None 32 intensity: float = 1.0 # 0-1, highlight > read > glance 33 source: str = "unknown" # 'podcast', 'conversation', 'browser', etc. 34 35 # Concept space coordinates (set by trajectory calculator) 36 coordinates: Optional[Tuple[float, ...]] = None 37 38 39 @dataclass 40 class AttentionTrajectory: 41 """Direction and velocity through concept space.""" 42 # Current position (centroid of recent attention) 43 position: Tuple[float, ...] 44 45 # Velocity vector (direction * speed) 46 velocity: Tuple[float, ...] 47 48 # Speed magnitude (how fast attention is moving) 49 speed: float 50 51 # Confidence in trajectory (based on consistency) 52 confidence: float 53 54 # Predicted next positions 55 predictions: List[Tuple[float, ...]] = field(default_factory=list) 56 57 # Related concept IDs in the direction of travel 58 ahead: List[str] = field(default_factory=list) 59 60 # Related concept IDs in peripheral vision 61 peripheral: List[str] = field(default_factory=list) 62 63 64 @dataclass 65 class AttentionState: 66 """Current attention state - the single pane view.""" 67 # What operator is focused on right now 68 focus: Optional[AttentionEvent] = None 69 70 # Recent attention history (last N events) 71 history: List[AttentionEvent] = field(default_factory=list) 72 73 # Computed trajectory 74 trajectory: Optional[AttentionTrajectory] = None 75 76 # Active gravity wells being orbited 77 active_wells: List[str] = field(default_factory=list) 78 79 # Items that should surface (based on trajectory) 80 should_surface: List[str] = field(default_factory=list) 81 82 # Unresolved items that persist (nag list) 83 unresolved: List[str] = field(default_factory=list) 84 85 # Current cognitive altitude 86 altitude: str = "tactical" 87 88 # Last update timestamp 89 updated_at: datetime = field(default_factory=datetime.now) 90 91 92 class AttentionTracker: 93 """ 94 Tracks operator attention and computes trajectories. 95 96 The tracker maintains a sliding window of attention events, 97 computes the trajectory through concept space, and predicts 98 what should surface next. 99 100 Usage: 101 tracker = AttentionTracker() 102 103 # Record attention events 104 tracker.record(AttentionEvent( 105 timestamp=datetime.now(), 106 target_id="abc123", 107 target_type="bullet", 108 modality="read" 109 )) 110 111 # Get current state 112 state = tracker.get_state() 113 print(f"Focus: {state.focus}") 114 print(f"Trajectory confidence: {state.trajectory.confidence}") 115 print(f"Should surface: {state.should_surface}") 116 """ 117 118 def __init__( 119 self, 120 history_size: int = 50, 121 trajectory_window: int = 10, 122 concept_space_dims: int = 64 # embedding dimension 123 ): 124 self.history_size = history_size 125 self.trajectory_window = trajectory_window 126 self.concept_space_dims = concept_space_dims 127 128 # Event history (circular buffer) 129 self._history: deque = deque(maxlen=history_size) 130 131 # Current state 132 self._state = AttentionState() 133 134 # Concept embeddings (would be loaded from model) 135 self._embeddings: Dict[str, Tuple[float, ...]] = {} 136 137 # Unresolved items (persist until explicitly closed) 138 self._unresolved: Dict[str, datetime] = {} 139 140 # Gravity well activations 141 self._well_activations: Dict[str, float] = {} 142 143 def record(self, event: AttentionEvent) -> AttentionState: 144 """ 145 Record an attention event and update state. 146 147 Args: 148 event: The attention event to record 149 150 Returns: 151 Updated attention state 152 """ 153 # Add to history 154 self._history.append(event) 155 156 # Update focus 157 self._state.focus = event 158 self._state.history = list(self._history) 159 160 # Compute trajectory 161 self._state.trajectory = self._compute_trajectory() 162 163 # Update gravity wells 164 self._update_wells(event) 165 self._state.active_wells = self._get_active_wells() 166 167 # Compute what should surface 168 self._state.should_surface = self._compute_surfacing() 169 170 # Update unresolved 171 self._state.unresolved = list(self._unresolved.keys()) 172 173 self._state.updated_at = datetime.now() 174 175 return self._state 176 177 def get_state(self) -> AttentionState: 178 """Get current attention state.""" 179 return self._state 180 181 def mark_unresolved(self, target_id: str) -> None: 182 """Mark an item as unresolved (will persist until closed).""" 183 self._unresolved[target_id] = datetime.now() 184 185 def resolve(self, target_id: str) -> None: 186 """Mark an item as resolved (removes from nag list).""" 187 self._unresolved.pop(target_id, None) 188 189 def set_embedding(self, target_id: str, embedding: Tuple[float, ...]) -> None: 190 """Set the concept space embedding for a target.""" 191 self._embeddings[target_id] = embedding 192 193 def _compute_trajectory(self) -> AttentionTrajectory: 194 """ 195 Compute attention trajectory from recent history. 196 197 Uses exponentially-weighted moving average of concept positions 198 to determine direction and velocity. 199 """ 200 if len(self._history) < 2: 201 return AttentionTrajectory( 202 position=tuple([0.0] * self.concept_space_dims), 203 velocity=tuple([0.0] * self.concept_space_dims), 204 speed=0.0, 205 confidence=0.0 206 ) 207 208 # Get recent events with embeddings 209 recent = list(self._history)[-self.trajectory_window:] 210 embedded = [ 211 (e, self._embeddings.get(e.target_id)) 212 for e in recent 213 if e.target_id in self._embeddings 214 ] 215 216 if len(embedded) < 2: 217 # Not enough embedded points for trajectory 218 return AttentionTrajectory( 219 position=tuple([0.0] * self.concept_space_dims), 220 velocity=tuple([0.0] * self.concept_space_dims), 221 speed=0.0, 222 confidence=0.0 223 ) 224 225 # Compute exponentially-weighted centroid (position) 226 # More recent events have higher weight 227 weights = [math.exp(-0.3 * i) for i in range(len(embedded))] 228 total_weight = sum(weights) 229 230 position = [0.0] * self.concept_space_dims 231 for i, (event, emb) in enumerate(embedded): 232 w = weights[i] / total_weight 233 for d in range(len(emb)): 234 position[d] += emb[d] * w 235 236 # Compute velocity (difference between recent and older centroids) 237 if len(embedded) >= 4: 238 # Split into halves 239 mid = len(embedded) // 2 240 older = embedded[:mid] 241 newer = embedded[mid:] 242 243 older_centroid = self._centroid([e for _, e in older]) 244 newer_centroid = self._centroid([e for _, e in newer]) 245 246 # Time delta 247 time_span = (newer[-1][0].timestamp - older[0][0].timestamp).total_seconds() 248 if time_span > 0: 249 velocity = tuple( 250 (newer_centroid[d] - older_centroid[d]) / time_span 251 for d in range(self.concept_space_dims) 252 ) 253 else: 254 velocity = tuple([0.0] * self.concept_space_dims) 255 else: 256 velocity = tuple([0.0] * self.concept_space_dims) 257 258 # Compute speed (magnitude of velocity) 259 speed = math.sqrt(sum(v**2 for v in velocity)) 260 261 # Compute confidence (consistency of direction) 262 # Higher if attention is moving consistently in one direction 263 confidence = min(1.0, len(embedded) / self.trajectory_window) 264 265 # Predict next positions (extrapolate along velocity) 266 predictions = [] 267 for t in [1, 2, 5]: # seconds ahead 268 pred = tuple( 269 position[d] + velocity[d] * t 270 for d in range(self.concept_space_dims) 271 ) 272 predictions.append(pred) 273 274 return AttentionTrajectory( 275 position=tuple(position), 276 velocity=velocity, 277 speed=speed, 278 confidence=confidence, 279 predictions=predictions, 280 ahead=[], # Would be filled by nearest neighbor lookup 281 peripheral=[] # Would be filled by peripheral lookup 282 ) 283 284 def _centroid(self, embeddings: List[Tuple[float, ...]]) -> Tuple[float, ...]: 285 """Compute centroid of embeddings.""" 286 if not embeddings: 287 return tuple([0.0] * self.concept_space_dims) 288 289 result = [0.0] * len(embeddings[0]) 290 for emb in embeddings: 291 for d, v in enumerate(emb): 292 result[d] += v 293 294 n = len(embeddings) 295 return tuple(v / n for v in result) 296 297 def _update_wells(self, event: AttentionEvent) -> None: 298 """Update gravity well activations based on attention.""" 299 # Decay existing activations 300 decay = 0.95 301 for well_id in list(self._well_activations.keys()): 302 self._well_activations[well_id] *= decay 303 if self._well_activations[well_id] < 0.01: 304 del self._well_activations[well_id] 305 306 # Would check if target is near any gravity wells 307 # and increase their activation 308 # For now, just track that we looked at the target 309 self._well_activations[event.target_id] = self._well_activations.get( 310 event.target_id, 0.0 311 ) + event.intensity 312 313 def _get_active_wells(self) -> List[str]: 314 """Get currently active gravity wells (sorted by activation).""" 315 sorted_wells = sorted( 316 self._well_activations.items(), 317 key=lambda x: x[1], 318 reverse=True 319 ) 320 return [well_id for well_id, activation in sorted_wells[:5]] 321 322 def _compute_surfacing(self) -> List[str]: 323 """ 324 Compute what should surface based on trajectory and wells. 325 326 Items surface if they are: 327 1. In the direction of travel (ahead on trajectory) 328 2. Near active gravity wells 329 3. In the nag list (unresolved) 330 """ 331 should_surface = set() 332 333 # Add unresolved items 334 should_surface.update(self._unresolved.keys()) 335 336 # Add items ahead on trajectory 337 if self._state.trajectory: 338 should_surface.update(self._state.trajectory.ahead) 339 340 # Would add items near gravity wells 341 # This requires concept space proximity queries 342 343 return list(should_surface) 344 345 346 class DailyNoteIntegration: 347 """ 348 Integrates attention tracking with the daily note. 349 350 The daily note is the single pane of attention. 351 This class handles: 352 - Surfacing high-resonance items to the daily note 353 - Tracking which items were surfaced and when 354 - Updating the Action Items section based on attention 355 """ 356 357 def __init__(self, tracker: AttentionTracker, daily_note_path: str): 358 self.tracker = tracker 359 self.daily_note_path = daily_note_path 360 self._surfaced: Dict[str, datetime] = {} 361 362 def surface_to_daily(self, items: List[Dict[str, Any]]) -> str: 363 """ 364 Add items to the daily note's attention section. 365 366 Args: 367 items: List of items with 'id', 'content', 'resonance', 'source' 368 369 Returns: 370 Markdown to append to daily note 371 """ 372 if not items: 373 return "" 374 375 lines = ["\n## Surfaced by Attention System\n"] 376 lines.append(f"*Updated: {datetime.now().strftime('%H:%M')}*\n") 377 378 for item in sorted(items, key=lambda x: x.get('resonance', 0), reverse=True): 379 resonance = item.get('resonance', 0) 380 source = item.get('source', 'unknown') 381 content = item.get('content', '')[:100] 382 383 # Use different markers based on resonance 384 if resonance > 0.8: 385 marker = "**" 386 elif resonance > 0.5: 387 marker = "*" 388 else: 389 marker = "" 390 391 lines.append(f"- {marker}{content}{marker}") 392 lines.append(f" - resonance: {resonance:.2f}, source: {source}") 393 394 self._surfaced[item['id']] = datetime.now() 395 396 return "\n".join(lines) 397 398 def get_attention_summary(self) -> str: 399 """Get a summary of current attention state for the daily note.""" 400 state = self.tracker.get_state() 401 402 lines = ["\n## Current Attention State\n"] 403 404 # Focus 405 if state.focus: 406 lines.append(f"**Focus:** {state.focus.target_type} ({state.focus.modality})") 407 408 # Trajectory 409 if state.trajectory and state.trajectory.confidence > 0.3: 410 lines.append(f"**Direction:** confidence {state.trajectory.confidence:.2f}, speed {state.trajectory.speed:.3f}") 411 412 # Active wells 413 if state.active_wells: 414 lines.append(f"**Active wells:** {', '.join(state.active_wells[:3])}") 415 416 # Unresolved 417 if state.unresolved: 418 lines.append(f"**Nagging:** {len(state.unresolved)} unresolved items") 419 420 return "\n".join(lines) 421 422 423 # Factory function for easy initialization 424 def create_attention_system( 425 daily_note_path: str, 426 history_size: int = 50 427 ) -> Tuple[AttentionTracker, DailyNoteIntegration]: 428 """ 429 Create an attention tracking system with daily note integration. 430 431 Args: 432 daily_note_path: Path to the daily note markdown file 433 history_size: Number of attention events to retain 434 435 Returns: 436 Tuple of (tracker, daily_integration) 437 """ 438 tracker = AttentionTracker(history_size=history_size) 439 integration = DailyNoteIntegration(tracker, daily_note_path) 440 return tracker, integration 441 442 443 if __name__ == "__main__": 444 # Test the attention tracker 445 print("=== Attention Tracker Test ===\n") 446 447 tracker = AttentionTracker() 448 449 # Simulate some attention events 450 events = [ 451 AttentionEvent( 452 timestamp=datetime.now() - timedelta(minutes=5), 453 target_id="bullet_001", 454 target_type="bullet", 455 modality="read", 456 source="conversation" 457 ), 458 AttentionEvent( 459 timestamp=datetime.now() - timedelta(minutes=3), 460 target_id="episode_acquired", 461 target_type="episode", 462 modality="listen", 463 duration_seconds=3600, 464 source="podcast" 465 ), 466 AttentionEvent( 467 timestamp=datetime.now() - timedelta(minutes=1), 468 target_id="concept_markov", 469 target_type="concept", 470 modality="highlight", 471 intensity=0.9, 472 source="conversation" 473 ), 474 ] 475 476 for event in events: 477 state = tracker.record(event) 478 print(f"Recorded: {event.target_type} via {event.modality}") 479 480 print(f"\nCurrent focus: {state.focus.target_id}") 481 print(f"History size: {len(state.history)}") 482 print(f"Trajectory confidence: {state.trajectory.confidence:.2f}") 483 print(f"Active wells: {state.active_wells}") 484 485 # Mark something as unresolved 486 tracker.mark_unresolved("open_question_001") 487 state = tracker.get_state() 488 print(f"Unresolved items: {state.unresolved}") 489 490 print("\n=== Test Complete ===")