/ core / attention / tracker.py
tracker.py
  1  """
  2  Attention Tracker - The Cooperative Eye System
  3  
  4  Implements the core attention tracking loop:
  5  1. CAPTURE - Record where operator's gaze lands
  6  2. TRAJECTORY - Compute direction through concept space
  7  3. PREDICT - Anticipate where gaze is heading
  8  4. SURFACE - Bring related items into peripheral vision
  9  5. RESPOND - Adapt system behavior based on attention
 10  
 11  The daily note is the single pane of attention - everything surfaces there.
 12  
 13  Key insight: Attention has velocity. It's not just WHERE you looked,
 14  it's WHERE YOU'RE HEADING. The trajectory is the predictive signal.
 15  """
 16  
 17  from dataclasses import dataclass, field
 18  from datetime import datetime, timedelta
 19  from typing import Optional, List, Dict, Any, Tuple
 20  from collections import deque
 21  import math
 22  
 23  
 24  @dataclass
 25  class AttentionEvent:
 26      """A single attention event - operator looked at something."""
 27      timestamp: datetime
 28      target_id: str  # UUID of bullet/concept
 29      target_type: str  # 'bullet', 'concept', 'episode', 'link', etc.
 30      modality: str  # 'read', 'listen', 'highlight', 'search', 'mention'
 31      duration_seconds: Optional[float] = None
 32      intensity: float = 1.0  # 0-1, highlight > read > glance
 33      source: str = "unknown"  # 'podcast', 'conversation', 'browser', etc.
 34  
 35      # Concept space coordinates (set by trajectory calculator)
 36      coordinates: Optional[Tuple[float, ...]] = None
 37  
 38  
 39  @dataclass
 40  class AttentionTrajectory:
 41      """Direction and velocity through concept space."""
 42      # Current position (centroid of recent attention)
 43      position: Tuple[float, ...]
 44  
 45      # Velocity vector (direction * speed)
 46      velocity: Tuple[float, ...]
 47  
 48      # Speed magnitude (how fast attention is moving)
 49      speed: float
 50  
 51      # Confidence in trajectory (based on consistency)
 52      confidence: float
 53  
 54      # Predicted next positions
 55      predictions: List[Tuple[float, ...]] = field(default_factory=list)
 56  
 57      # Related concept IDs in the direction of travel
 58      ahead: List[str] = field(default_factory=list)
 59  
 60      # Related concept IDs in peripheral vision
 61      peripheral: List[str] = field(default_factory=list)
 62  
 63  
 64  @dataclass
 65  class AttentionState:
 66      """Current attention state - the single pane view."""
 67      # What operator is focused on right now
 68      focus: Optional[AttentionEvent] = None
 69  
 70      # Recent attention history (last N events)
 71      history: List[AttentionEvent] = field(default_factory=list)
 72  
 73      # Computed trajectory
 74      trajectory: Optional[AttentionTrajectory] = None
 75  
 76      # Active gravity wells being orbited
 77      active_wells: List[str] = field(default_factory=list)
 78  
 79      # Items that should surface (based on trajectory)
 80      should_surface: List[str] = field(default_factory=list)
 81  
 82      # Unresolved items that persist (nag list)
 83      unresolved: List[str] = field(default_factory=list)
 84  
 85      # Current cognitive altitude
 86      altitude: str = "tactical"
 87  
 88      # Last update timestamp
 89      updated_at: datetime = field(default_factory=datetime.now)
 90  
 91  
 92  class AttentionTracker:
 93      """
 94      Tracks operator attention and computes trajectories.
 95  
 96      The tracker maintains a sliding window of attention events,
 97      computes the trajectory through concept space, and predicts
 98      what should surface next.
 99  
100      Usage:
101          tracker = AttentionTracker()
102  
103          # Record attention events
104          tracker.record(AttentionEvent(
105              timestamp=datetime.now(),
106              target_id="abc123",
107              target_type="bullet",
108              modality="read"
109          ))
110  
111          # Get current state
112          state = tracker.get_state()
113          print(f"Focus: {state.focus}")
114          print(f"Trajectory confidence: {state.trajectory.confidence}")
115          print(f"Should surface: {state.should_surface}")
116      """
117  
118      def __init__(
119          self,
120          history_size: int = 50,
121          trajectory_window: int = 10,
122          concept_space_dims: int = 64  # embedding dimension
123      ):
124          self.history_size = history_size
125          self.trajectory_window = trajectory_window
126          self.concept_space_dims = concept_space_dims
127  
128          # Event history (circular buffer)
129          self._history: deque = deque(maxlen=history_size)
130  
131          # Current state
132          self._state = AttentionState()
133  
134          # Concept embeddings (would be loaded from model)
135          self._embeddings: Dict[str, Tuple[float, ...]] = {}
136  
137          # Unresolved items (persist until explicitly closed)
138          self._unresolved: Dict[str, datetime] = {}
139  
140          # Gravity well activations
141          self._well_activations: Dict[str, float] = {}
142  
143      def record(self, event: AttentionEvent) -> AttentionState:
144          """
145          Record an attention event and update state.
146  
147          Args:
148              event: The attention event to record
149  
150          Returns:
151              Updated attention state
152          """
153          # Add to history
154          self._history.append(event)
155  
156          # Update focus
157          self._state.focus = event
158          self._state.history = list(self._history)
159  
160          # Compute trajectory
161          self._state.trajectory = self._compute_trajectory()
162  
163          # Update gravity wells
164          self._update_wells(event)
165          self._state.active_wells = self._get_active_wells()
166  
167          # Compute what should surface
168          self._state.should_surface = self._compute_surfacing()
169  
170          # Update unresolved
171          self._state.unresolved = list(self._unresolved.keys())
172  
173          self._state.updated_at = datetime.now()
174  
175          return self._state
176  
177      def get_state(self) -> AttentionState:
178          """Get current attention state."""
179          return self._state
180  
181      def mark_unresolved(self, target_id: str) -> None:
182          """Mark an item as unresolved (will persist until closed)."""
183          self._unresolved[target_id] = datetime.now()
184  
185      def resolve(self, target_id: str) -> None:
186          """Mark an item as resolved (removes from nag list)."""
187          self._unresolved.pop(target_id, None)
188  
189      def set_embedding(self, target_id: str, embedding: Tuple[float, ...]) -> None:
190          """Set the concept space embedding for a target."""
191          self._embeddings[target_id] = embedding
192  
193      def _compute_trajectory(self) -> AttentionTrajectory:
194          """
195          Compute attention trajectory from recent history.
196  
197          Uses exponentially-weighted moving average of concept positions
198          to determine direction and velocity.
199          """
200          if len(self._history) < 2:
201              return AttentionTrajectory(
202                  position=tuple([0.0] * self.concept_space_dims),
203                  velocity=tuple([0.0] * self.concept_space_dims),
204                  speed=0.0,
205                  confidence=0.0
206              )
207  
208          # Get recent events with embeddings
209          recent = list(self._history)[-self.trajectory_window:]
210          embedded = [
211              (e, self._embeddings.get(e.target_id))
212              for e in recent
213              if e.target_id in self._embeddings
214          ]
215  
216          if len(embedded) < 2:
217              # Not enough embedded points for trajectory
218              return AttentionTrajectory(
219                  position=tuple([0.0] * self.concept_space_dims),
220                  velocity=tuple([0.0] * self.concept_space_dims),
221                  speed=0.0,
222                  confidence=0.0
223              )
224  
225          # Compute exponentially-weighted centroid (position)
226          # More recent events have higher weight
227          weights = [math.exp(-0.3 * i) for i in range(len(embedded))]
228          total_weight = sum(weights)
229  
230          position = [0.0] * self.concept_space_dims
231          for i, (event, emb) in enumerate(embedded):
232              w = weights[i] / total_weight
233              for d in range(len(emb)):
234                  position[d] += emb[d] * w
235  
236          # Compute velocity (difference between recent and older centroids)
237          if len(embedded) >= 4:
238              # Split into halves
239              mid = len(embedded) // 2
240              older = embedded[:mid]
241              newer = embedded[mid:]
242  
243              older_centroid = self._centroid([e for _, e in older])
244              newer_centroid = self._centroid([e for _, e in newer])
245  
246              # Time delta
247              time_span = (newer[-1][0].timestamp - older[0][0].timestamp).total_seconds()
248              if time_span > 0:
249                  velocity = tuple(
250                      (newer_centroid[d] - older_centroid[d]) / time_span
251                      for d in range(self.concept_space_dims)
252                  )
253              else:
254                  velocity = tuple([0.0] * self.concept_space_dims)
255          else:
256              velocity = tuple([0.0] * self.concept_space_dims)
257  
258          # Compute speed (magnitude of velocity)
259          speed = math.sqrt(sum(v**2 for v in velocity))
260  
261          # Compute confidence (consistency of direction)
262          # Higher if attention is moving consistently in one direction
263          confidence = min(1.0, len(embedded) / self.trajectory_window)
264  
265          # Predict next positions (extrapolate along velocity)
266          predictions = []
267          for t in [1, 2, 5]:  # seconds ahead
268              pred = tuple(
269                  position[d] + velocity[d] * t
270                  for d in range(self.concept_space_dims)
271              )
272              predictions.append(pred)
273  
274          return AttentionTrajectory(
275              position=tuple(position),
276              velocity=velocity,
277              speed=speed,
278              confidence=confidence,
279              predictions=predictions,
280              ahead=[],  # Would be filled by nearest neighbor lookup
281              peripheral=[]  # Would be filled by peripheral lookup
282          )
283  
284      def _centroid(self, embeddings: List[Tuple[float, ...]]) -> Tuple[float, ...]:
285          """Compute centroid of embeddings."""
286          if not embeddings:
287              return tuple([0.0] * self.concept_space_dims)
288  
289          result = [0.0] * len(embeddings[0])
290          for emb in embeddings:
291              for d, v in enumerate(emb):
292                  result[d] += v
293  
294          n = len(embeddings)
295          return tuple(v / n for v in result)
296  
297      def _update_wells(self, event: AttentionEvent) -> None:
298          """Update gravity well activations based on attention."""
299          # Decay existing activations
300          decay = 0.95
301          for well_id in list(self._well_activations.keys()):
302              self._well_activations[well_id] *= decay
303              if self._well_activations[well_id] < 0.01:
304                  del self._well_activations[well_id]
305  
306          # Would check if target is near any gravity wells
307          # and increase their activation
308          # For now, just track that we looked at the target
309          self._well_activations[event.target_id] = self._well_activations.get(
310              event.target_id, 0.0
311          ) + event.intensity
312  
313      def _get_active_wells(self) -> List[str]:
314          """Get currently active gravity wells (sorted by activation)."""
315          sorted_wells = sorted(
316              self._well_activations.items(),
317              key=lambda x: x[1],
318              reverse=True
319          )
320          return [well_id for well_id, activation in sorted_wells[:5]]
321  
322      def _compute_surfacing(self) -> List[str]:
323          """
324          Compute what should surface based on trajectory and wells.
325  
326          Items surface if they are:
327          1. In the direction of travel (ahead on trajectory)
328          2. Near active gravity wells
329          3. In the nag list (unresolved)
330          """
331          should_surface = set()
332  
333          # Add unresolved items
334          should_surface.update(self._unresolved.keys())
335  
336          # Add items ahead on trajectory
337          if self._state.trajectory:
338              should_surface.update(self._state.trajectory.ahead)
339  
340          # Would add items near gravity wells
341          # This requires concept space proximity queries
342  
343          return list(should_surface)
344  
345  
346  class DailyNoteIntegration:
347      """
348      Integrates attention tracking with the daily note.
349  
350      The daily note is the single pane of attention.
351      This class handles:
352      - Surfacing high-resonance items to the daily note
353      - Tracking which items were surfaced and when
354      - Updating the Action Items section based on attention
355      """
356  
357      def __init__(self, tracker: AttentionTracker, daily_note_path: str):
358          self.tracker = tracker
359          self.daily_note_path = daily_note_path
360          self._surfaced: Dict[str, datetime] = {}
361  
362      def surface_to_daily(self, items: List[Dict[str, Any]]) -> str:
363          """
364          Add items to the daily note's attention section.
365  
366          Args:
367              items: List of items with 'id', 'content', 'resonance', 'source'
368  
369          Returns:
370              Markdown to append to daily note
371          """
372          if not items:
373              return ""
374  
375          lines = ["\n## Surfaced by Attention System\n"]
376          lines.append(f"*Updated: {datetime.now().strftime('%H:%M')}*\n")
377  
378          for item in sorted(items, key=lambda x: x.get('resonance', 0), reverse=True):
379              resonance = item.get('resonance', 0)
380              source = item.get('source', 'unknown')
381              content = item.get('content', '')[:100]
382  
383              # Use different markers based on resonance
384              if resonance > 0.8:
385                  marker = "**"
386              elif resonance > 0.5:
387                  marker = "*"
388              else:
389                  marker = ""
390  
391              lines.append(f"- {marker}{content}{marker}")
392              lines.append(f"  - resonance: {resonance:.2f}, source: {source}")
393  
394              self._surfaced[item['id']] = datetime.now()
395  
396          return "\n".join(lines)
397  
398      def get_attention_summary(self) -> str:
399          """Get a summary of current attention state for the daily note."""
400          state = self.tracker.get_state()
401  
402          lines = ["\n## Current Attention State\n"]
403  
404          # Focus
405          if state.focus:
406              lines.append(f"**Focus:** {state.focus.target_type} ({state.focus.modality})")
407  
408          # Trajectory
409          if state.trajectory and state.trajectory.confidence > 0.3:
410              lines.append(f"**Direction:** confidence {state.trajectory.confidence:.2f}, speed {state.trajectory.speed:.3f}")
411  
412          # Active wells
413          if state.active_wells:
414              lines.append(f"**Active wells:** {', '.join(state.active_wells[:3])}")
415  
416          # Unresolved
417          if state.unresolved:
418              lines.append(f"**Nagging:** {len(state.unresolved)} unresolved items")
419  
420          return "\n".join(lines)
421  
422  
423  # Factory function for easy initialization
424  def create_attention_system(
425      daily_note_path: str,
426      history_size: int = 50
427  ) -> Tuple[AttentionTracker, DailyNoteIntegration]:
428      """
429      Create an attention tracking system with daily note integration.
430  
431      Args:
432          daily_note_path: Path to the daily note markdown file
433          history_size: Number of attention events to retain
434  
435      Returns:
436          Tuple of (tracker, daily_integration)
437      """
438      tracker = AttentionTracker(history_size=history_size)
439      integration = DailyNoteIntegration(tracker, daily_note_path)
440      return tracker, integration
441  
442  
443  if __name__ == "__main__":
444      # Test the attention tracker
445      print("=== Attention Tracker Test ===\n")
446  
447      tracker = AttentionTracker()
448  
449      # Simulate some attention events
450      events = [
451          AttentionEvent(
452              timestamp=datetime.now() - timedelta(minutes=5),
453              target_id="bullet_001",
454              target_type="bullet",
455              modality="read",
456              source="conversation"
457          ),
458          AttentionEvent(
459              timestamp=datetime.now() - timedelta(minutes=3),
460              target_id="episode_acquired",
461              target_type="episode",
462              modality="listen",
463              duration_seconds=3600,
464              source="podcast"
465          ),
466          AttentionEvent(
467              timestamp=datetime.now() - timedelta(minutes=1),
468              target_id="concept_markov",
469              target_type="concept",
470              modality="highlight",
471              intensity=0.9,
472              source="conversation"
473          ),
474      ]
475  
476      for event in events:
477          state = tracker.record(event)
478          print(f"Recorded: {event.target_type} via {event.modality}")
479  
480      print(f"\nCurrent focus: {state.focus.target_id}")
481      print(f"History size: {len(state.history)}")
482      print(f"Trajectory confidence: {state.trajectory.confidence:.2f}")
483      print(f"Active wells: {state.active_wells}")
484  
485      # Mark something as unresolved
486      tracker.mark_unresolved("open_question_001")
487      state = tracker.get_state()
488      print(f"Unresolved items: {state.unresolved}")
489  
490      print("\n=== Test Complete ===")