/ core / attention / loop.py
loop.py
  1  """
  2  Attention Loop - The Actuator
  3  
  4  This is what makes the attention system actually DO something.
  5  The loop continuously:
  6  1. Pulls attention signals from all sources
  7  2. Records them in the tracker
  8  3. Computes what should surface
  9  4. Updates the daily note
 10  5. Triggers follow-on actions (surfacing, notifications, etc.)
 11  
 12  The key insight: attention tracking without actuation is just logging.
 13  The loop is what closes the feedback cycle.
 14  
 15  Integration points:
 16  - God Database: persists attention events
 17  - Resonance Engine: computes scores for surfacing decisions
 18  - Daily Note: the single pane where everything surfaces
 19  - Flight Protocol: adapts behavior based on cognitive phase
 20  """
 21  
 22  from dataclasses import dataclass, field
 23  from datetime import datetime, timedelta
 24  from typing import Optional, List, Dict, Any, Callable
 25  from pathlib import Path
 26  import asyncio
 27  
 28  from .tracker import (
 29      AttentionTracker,
 30      AttentionEvent,
 31      AttentionState,
 32      DailyNoteIntegration,
 33  )
 34  from .podcast_pipeline import OvercastParser
 35  
 36  
 37  @dataclass
 38  class AttentionSource:
 39      """Configuration for an attention source."""
 40      name: str
 41      source_type: str  # 'podcast', 'browser', 'conversation', 'file'
 42      path: Optional[str] = None
 43      poll_interval_seconds: int = 300  # 5 minutes default
 44      last_poll: Optional[datetime] = None
 45      enabled: bool = True
 46  
 47  
 48  @dataclass
 49  class SurfacingAction:
 50      """An action to take based on attention."""
 51      action_type: str  # 'add_to_daily', 'notify', 'log', 'trigger_agent'
 52      target: str  # bullet ID, note path, etc.
 53      content: str
 54      priority: float  # 0-1
 55      reason: str  # why this is surfacing
 56  
 57  
 58  class AttentionLoop:
 59      """
 60      The main attention processing loop.
 61  
 62      Continuously:
 63      1. Polls attention sources for new events
 64      2. Records events in the tracker
 65      3. Computes surfacing actions
 66      4. Executes actions (update daily note, notify, etc.)
 67  
 68      Usage:
 69          loop = AttentionLoop(
 70              daily_note_path="/path/to/daily/2026-01-13.md"
 71          )
 72  
 73          # Add sources
 74          loop.add_source(AttentionSource(
 75              name="overcast",
 76              source_type="podcast",
 77              path="~/Downloads/overcast.opml"
 78          ))
 79  
 80          # Run the loop (blocks)
 81          await loop.run()
 82  
 83          # Or run one iteration
 84          actions = await loop.tick()
 85      """
 86  
 87      def __init__(
 88          self,
 89          daily_note_path: str,
 90          history_size: int = 100,
 91          min_resonance_to_surface: float = 0.5
 92      ):
 93          self.daily_note_path = Path(daily_note_path)
 94          self.min_resonance_to_surface = min_resonance_to_surface
 95  
 96          # Core components
 97          self.tracker = AttentionTracker(history_size=history_size)
 98          self.daily_integration = DailyNoteIntegration(
 99              self.tracker,
100              str(self.daily_note_path)
101          )
102  
103          # Attention sources
104          self.sources: Dict[str, AttentionSource] = {}
105  
106          # Action handlers
107          self._action_handlers: Dict[str, Callable] = {
108              'add_to_daily': self._handle_add_to_daily,
109              'log': self._handle_log,
110          }
111  
112          # State
113          self._running = False
114          self._last_tick: Optional[datetime] = None
115  
116          # Callbacks for external integration
117          self._on_attention: List[Callable[[AttentionEvent], None]] = []
118          self._on_surface: List[Callable[[SurfacingAction], None]] = []
119  
120      def add_source(self, source: AttentionSource) -> None:
121          """Add an attention source to poll."""
122          self.sources[source.name] = source
123  
124      def remove_source(self, name: str) -> None:
125          """Remove an attention source."""
126          self.sources.pop(name, None)
127  
128      def on_attention(self, callback: Callable[[AttentionEvent], None]) -> None:
129          """Register callback for attention events."""
130          self._on_attention.append(callback)
131  
132      def on_surface(self, callback: Callable[[SurfacingAction], None]) -> None:
133          """Register callback for surfacing actions."""
134          self._on_surface.append(callback)
135  
136      async def run(self, tick_interval_seconds: int = 60) -> None:
137          """
138          Run the attention loop continuously.
139  
140          Args:
141              tick_interval_seconds: How often to check for new events
142          """
143          self._running = True
144          print(f"[AttentionLoop] Starting loop, interval={tick_interval_seconds}s")
145  
146          while self._running:
147              try:
148                  actions = await self.tick()
149                  if actions:
150                      print(f"[AttentionLoop] Tick produced {len(actions)} actions")
151              except Exception as e:
152                  print(f"[AttentionLoop] Error in tick: {e}")
153  
154              await asyncio.sleep(tick_interval_seconds)
155  
156      def stop(self) -> None:
157          """Stop the attention loop."""
158          self._running = False
159  
160      async def tick(self) -> List[SurfacingAction]:
161          """
162          Run one iteration of the attention loop.
163  
164          Returns:
165              List of surfacing actions taken
166          """
167          self._last_tick = datetime.now()
168          all_actions = []
169  
170          # 1. Poll each source for new events
171          for source in self.sources.values():
172              if not source.enabled:
173                  continue
174  
175              # Check if it's time to poll
176              if source.last_poll:
177                  elapsed = (datetime.now() - source.last_poll).total_seconds()
178                  if elapsed < source.poll_interval_seconds:
179                      continue
180  
181              # Poll the source
182              events = await self._poll_source(source)
183              source.last_poll = datetime.now()
184  
185              # Record each event
186              for event in events:
187                  self.tracker.record(event)
188  
189                  # Notify callbacks
190                  for callback in self._on_attention:
191                      try:
192                          callback(event)
193                      except Exception as e:
194                          print(f"[AttentionLoop] Callback error: {e}")
195  
196          # 2. Compute surfacing actions
197          state = self.tracker.get_state()
198          actions = self._compute_actions(state)
199  
200          # 3. Execute actions
201          for action in actions:
202              await self._execute_action(action)
203              all_actions.append(action)
204  
205              # Notify callbacks
206              for callback in self._on_surface:
207                  try:
208                      callback(action)
209                  except Exception as e:
210                      print(f"[AttentionLoop] Surface callback error: {e}")
211  
212          return all_actions
213  
214      async def _poll_source(self, source: AttentionSource) -> List[AttentionEvent]:
215          """Poll a source for new attention events."""
216          events = []
217  
218          if source.source_type == 'podcast' and source.path:
219              events = await self._poll_podcast(source)
220          elif source.source_type == 'conversation':
221              # Would poll conversation history
222              pass
223          elif source.source_type == 'browser':
224              # Would poll browser history
225              pass
226  
227          return events
228  
229      async def _poll_podcast(self, source: AttentionSource) -> List[AttentionEvent]:
230          """Poll podcast source (Overcast OPML)."""
231          events = []
232          path = Path(source.path).expanduser()
233  
234          if not path.exists():
235              return events
236  
237          try:
238              parser = OvercastParser(str(path))
239              recent = parser.get_recently_played(days=7, limit=20)
240  
241              for episode in recent:
242                  # Only create event if not already recorded
243                  event = AttentionEvent(
244                      timestamp=episode.user_updated_date or datetime.now(),
245                      target_id=episode.overcast_id,
246                      target_type='episode',
247                      modality='listen',
248                      duration_seconds=episode.progress_seconds if episode.played else None,
249                      intensity=1.0 if episode.played else 0.5,
250                      source='podcast'
251                  )
252                  events.append(event)
253  
254          except Exception as e:
255              print(f"[AttentionLoop] Error polling podcast: {e}")
256  
257          return events
258  
259      def _compute_actions(self, state: AttentionState) -> List[SurfacingAction]:
260          """Compute what actions to take based on attention state."""
261          actions = []
262  
263          # Surface unresolved items
264          for target_id in state.unresolved:
265              actions.append(SurfacingAction(
266                  action_type='add_to_daily',
267                  target=target_id,
268                  content=f"Unresolved: {target_id}",
269                  priority=0.8,
270                  reason="On nag list - unresolved"
271              ))
272  
273          # Surface items ahead on trajectory
274          if state.trajectory and state.trajectory.ahead:
275              for target_id in state.trajectory.ahead[:3]:
276                  actions.append(SurfacingAction(
277                      action_type='add_to_daily',
278                      target=target_id,
279                      content=f"Ahead on trajectory: {target_id}",
280                      priority=0.6,
281                      reason="In direction of attention travel"
282                  ))
283  
284          # Log current focus
285          if state.focus:
286              actions.append(SurfacingAction(
287                  action_type='log',
288                  target=state.focus.target_id,
289                  content=f"Focus: {state.focus.target_type} via {state.focus.modality}",
290                  priority=0.3,
291                  reason="Current attention focus"
292              ))
293  
294          return actions
295  
296      async def _execute_action(self, action: SurfacingAction) -> None:
297          """Execute a surfacing action."""
298          handler = self._action_handlers.get(action.action_type)
299          if handler:
300              await handler(action)
301  
302      async def _handle_add_to_daily(self, action: SurfacingAction) -> None:
303          """Add an item to the daily note."""
304          # Read current daily note
305          if not self.daily_note_path.exists():
306              return
307  
308          content = self.daily_note_path.read_text()
309  
310          # Check if we have an attention section
311          if "## Attention Surface" not in content:
312              # Add section after Action Items
313              if "## Action Items" in content:
314                  content = content.replace(
315                      "## Action Items",
316                      "## Attention Surface\n\n*Auto-surfaced by attention system*\n\n---\n\n## Action Items"
317                  )
318              else:
319                  # Add at top
320                  lines = content.split('\n')
321                  lines.insert(2, "\n## Attention Surface\n\n*Auto-surfaced by attention system*\n")
322                  content = '\n'.join(lines)
323  
324          # Add the item to attention surface section
325          item_line = f"- [{action.priority:.1f}] {action.content} *(reason: {action.reason})*\n"
326  
327          # Insert after "## Attention Surface" header
328          marker = "## Attention Surface\n"
329          idx = content.find(marker)
330          if idx >= 0:
331              insert_point = idx + len(marker)
332              # Skip any existing content markers
333              while insert_point < len(content) and content[insert_point] == '\n':
334                  insert_point += 1
335              if content[insert_point:insert_point + 1] == '*':
336                  # Skip the italicized description line
337                  next_newline = content.find('\n', insert_point)
338                  if next_newline >= 0:
339                      insert_point = next_newline + 1
340  
341              content = content[:insert_point] + item_line + content[insert_point:]
342              self.daily_note_path.write_text(content)
343  
344      async def _handle_log(self, action: SurfacingAction) -> None:
345          """Log an attention event."""
346          print(f"[Attention] {action.content}")
347  
348  
349  def create_default_loop(daily_note_dir: str) -> AttentionLoop:
350      """
351      Create an attention loop with default sources.
352  
353      Args:
354          daily_note_dir: Directory containing daily notes (e.g., Sovereign_Estate/daily/)
355  
356      Returns:
357          Configured AttentionLoop
358      """
359      # Get today's date for daily note
360      today = datetime.now().strftime('%Y-%m-%d')
361      daily_note_path = Path(daily_note_dir) / f"{today}.md"
362  
363      loop = AttentionLoop(daily_note_path=str(daily_note_path))
364  
365      # Add default sources
366      loop.add_source(AttentionSource(
367          name="overcast",
368          source_type="podcast",
369          path="~/Downloads/overcast.opml",
370          poll_interval_seconds=300  # 5 minutes
371      ))
372  
373      # Would add more sources here:
374      # - Browser history
375      # - Conversation logs
376      # - File access logs
377  
378      return loop
379  
380  
381  if __name__ == "__main__":
382      import sys
383  
384      print("=== Attention Loop Test ===\n")
385  
386      # Create loop with test daily note
387      test_daily = "/Users/rcerf/repos/Sovereign_Estate/daily/2026-01-13.md"
388      loop = AttentionLoop(daily_note_path=test_daily)
389  
390      # Add Overcast source
391      loop.add_source(AttentionSource(
392          name="overcast",
393          source_type="podcast",
394          path="~/Downloads/overcast.opml",
395          poll_interval_seconds=60
396      ))
397  
398      print(f"Daily note: {test_daily}")
399      print(f"Sources: {list(loop.sources.keys())}")
400  
401      # Run one tick
402      async def test_tick():
403          actions = await loop.tick()
404          print(f"\nTick produced {len(actions)} actions:")
405          for action in actions:
406              print(f"  - [{action.action_type}] {action.content[:50]}...")
407  
408      asyncio.run(test_tick())
409  
410      print("\n=== Test Complete ===")