loop.py
1 """ 2 Attention Loop - The Actuator 3 4 This is what makes the attention system actually DO something. 5 The loop continuously: 6 1. Pulls attention signals from all sources 7 2. Records them in the tracker 8 3. Computes what should surface 9 4. Updates the daily note 10 5. Triggers follow-on actions (surfacing, notifications, etc.) 11 12 The key insight: attention tracking without actuation is just logging. 13 The loop is what closes the feedback cycle. 14 15 Integration points: 16 - God Database: persists attention events 17 - Resonance Engine: computes scores for surfacing decisions 18 - Daily Note: the single pane where everything surfaces 19 - Flight Protocol: adapts behavior based on cognitive phase 20 """ 21 22 from dataclasses import dataclass, field 23 from datetime import datetime, timedelta 24 from typing import Optional, List, Dict, Any, Callable 25 from pathlib import Path 26 import asyncio 27 28 from .tracker import ( 29 AttentionTracker, 30 AttentionEvent, 31 AttentionState, 32 DailyNoteIntegration, 33 ) 34 from .podcast_pipeline import OvercastParser 35 36 37 @dataclass 38 class AttentionSource: 39 """Configuration for an attention source.""" 40 name: str 41 source_type: str # 'podcast', 'browser', 'conversation', 'file' 42 path: Optional[str] = None 43 poll_interval_seconds: int = 300 # 5 minutes default 44 last_poll: Optional[datetime] = None 45 enabled: bool = True 46 47 48 @dataclass 49 class SurfacingAction: 50 """An action to take based on attention.""" 51 action_type: str # 'add_to_daily', 'notify', 'log', 'trigger_agent' 52 target: str # bullet ID, note path, etc. 53 content: str 54 priority: float # 0-1 55 reason: str # why this is surfacing 56 57 58 class AttentionLoop: 59 """ 60 The main attention processing loop. 61 62 Continuously: 63 1. Polls attention sources for new events 64 2. Records events in the tracker 65 3. Computes surfacing actions 66 4. Executes actions (update daily note, notify, etc.) 67 68 Usage: 69 loop = AttentionLoop( 70 daily_note_path="/path/to/daily/2026-01-13.md" 71 ) 72 73 # Add sources 74 loop.add_source(AttentionSource( 75 name="overcast", 76 source_type="podcast", 77 path="~/Downloads/overcast.opml" 78 )) 79 80 # Run the loop (blocks) 81 await loop.run() 82 83 # Or run one iteration 84 actions = await loop.tick() 85 """ 86 87 def __init__( 88 self, 89 daily_note_path: str, 90 history_size: int = 100, 91 min_resonance_to_surface: float = 0.5 92 ): 93 self.daily_note_path = Path(daily_note_path) 94 self.min_resonance_to_surface = min_resonance_to_surface 95 96 # Core components 97 self.tracker = AttentionTracker(history_size=history_size) 98 self.daily_integration = DailyNoteIntegration( 99 self.tracker, 100 str(self.daily_note_path) 101 ) 102 103 # Attention sources 104 self.sources: Dict[str, AttentionSource] = {} 105 106 # Action handlers 107 self._action_handlers: Dict[str, Callable] = { 108 'add_to_daily': self._handle_add_to_daily, 109 'log': self._handle_log, 110 } 111 112 # State 113 self._running = False 114 self._last_tick: Optional[datetime] = None 115 116 # Callbacks for external integration 117 self._on_attention: List[Callable[[AttentionEvent], None]] = [] 118 self._on_surface: List[Callable[[SurfacingAction], None]] = [] 119 120 def add_source(self, source: AttentionSource) -> None: 121 """Add an attention source to poll.""" 122 self.sources[source.name] = source 123 124 def remove_source(self, name: str) -> None: 125 """Remove an attention source.""" 126 self.sources.pop(name, None) 127 128 def on_attention(self, callback: Callable[[AttentionEvent], None]) -> None: 129 """Register callback for attention events.""" 130 self._on_attention.append(callback) 131 132 def on_surface(self, callback: Callable[[SurfacingAction], None]) -> None: 133 """Register callback for surfacing actions.""" 134 self._on_surface.append(callback) 135 136 async def run(self, tick_interval_seconds: int = 60) -> None: 137 """ 138 Run the attention loop continuously. 139 140 Args: 141 tick_interval_seconds: How often to check for new events 142 """ 143 self._running = True 144 print(f"[AttentionLoop] Starting loop, interval={tick_interval_seconds}s") 145 146 while self._running: 147 try: 148 actions = await self.tick() 149 if actions: 150 print(f"[AttentionLoop] Tick produced {len(actions)} actions") 151 except Exception as e: 152 print(f"[AttentionLoop] Error in tick: {e}") 153 154 await asyncio.sleep(tick_interval_seconds) 155 156 def stop(self) -> None: 157 """Stop the attention loop.""" 158 self._running = False 159 160 async def tick(self) -> List[SurfacingAction]: 161 """ 162 Run one iteration of the attention loop. 163 164 Returns: 165 List of surfacing actions taken 166 """ 167 self._last_tick = datetime.now() 168 all_actions = [] 169 170 # 1. Poll each source for new events 171 for source in self.sources.values(): 172 if not source.enabled: 173 continue 174 175 # Check if it's time to poll 176 if source.last_poll: 177 elapsed = (datetime.now() - source.last_poll).total_seconds() 178 if elapsed < source.poll_interval_seconds: 179 continue 180 181 # Poll the source 182 events = await self._poll_source(source) 183 source.last_poll = datetime.now() 184 185 # Record each event 186 for event in events: 187 self.tracker.record(event) 188 189 # Notify callbacks 190 for callback in self._on_attention: 191 try: 192 callback(event) 193 except Exception as e: 194 print(f"[AttentionLoop] Callback error: {e}") 195 196 # 2. Compute surfacing actions 197 state = self.tracker.get_state() 198 actions = self._compute_actions(state) 199 200 # 3. Execute actions 201 for action in actions: 202 await self._execute_action(action) 203 all_actions.append(action) 204 205 # Notify callbacks 206 for callback in self._on_surface: 207 try: 208 callback(action) 209 except Exception as e: 210 print(f"[AttentionLoop] Surface callback error: {e}") 211 212 return all_actions 213 214 async def _poll_source(self, source: AttentionSource) -> List[AttentionEvent]: 215 """Poll a source for new attention events.""" 216 events = [] 217 218 if source.source_type == 'podcast' and source.path: 219 events = await self._poll_podcast(source) 220 elif source.source_type == 'conversation': 221 # Would poll conversation history 222 pass 223 elif source.source_type == 'browser': 224 # Would poll browser history 225 pass 226 227 return events 228 229 async def _poll_podcast(self, source: AttentionSource) -> List[AttentionEvent]: 230 """Poll podcast source (Overcast OPML).""" 231 events = [] 232 path = Path(source.path).expanduser() 233 234 if not path.exists(): 235 return events 236 237 try: 238 parser = OvercastParser(str(path)) 239 recent = parser.get_recently_played(days=7, limit=20) 240 241 for episode in recent: 242 # Only create event if not already recorded 243 event = AttentionEvent( 244 timestamp=episode.user_updated_date or datetime.now(), 245 target_id=episode.overcast_id, 246 target_type='episode', 247 modality='listen', 248 duration_seconds=episode.progress_seconds if episode.played else None, 249 intensity=1.0 if episode.played else 0.5, 250 source='podcast' 251 ) 252 events.append(event) 253 254 except Exception as e: 255 print(f"[AttentionLoop] Error polling podcast: {e}") 256 257 return events 258 259 def _compute_actions(self, state: AttentionState) -> List[SurfacingAction]: 260 """Compute what actions to take based on attention state.""" 261 actions = [] 262 263 # Surface unresolved items 264 for target_id in state.unresolved: 265 actions.append(SurfacingAction( 266 action_type='add_to_daily', 267 target=target_id, 268 content=f"Unresolved: {target_id}", 269 priority=0.8, 270 reason="On nag list - unresolved" 271 )) 272 273 # Surface items ahead on trajectory 274 if state.trajectory and state.trajectory.ahead: 275 for target_id in state.trajectory.ahead[:3]: 276 actions.append(SurfacingAction( 277 action_type='add_to_daily', 278 target=target_id, 279 content=f"Ahead on trajectory: {target_id}", 280 priority=0.6, 281 reason="In direction of attention travel" 282 )) 283 284 # Log current focus 285 if state.focus: 286 actions.append(SurfacingAction( 287 action_type='log', 288 target=state.focus.target_id, 289 content=f"Focus: {state.focus.target_type} via {state.focus.modality}", 290 priority=0.3, 291 reason="Current attention focus" 292 )) 293 294 return actions 295 296 async def _execute_action(self, action: SurfacingAction) -> None: 297 """Execute a surfacing action.""" 298 handler = self._action_handlers.get(action.action_type) 299 if handler: 300 await handler(action) 301 302 async def _handle_add_to_daily(self, action: SurfacingAction) -> None: 303 """Add an item to the daily note.""" 304 # Read current daily note 305 if not self.daily_note_path.exists(): 306 return 307 308 content = self.daily_note_path.read_text() 309 310 # Check if we have an attention section 311 if "## Attention Surface" not in content: 312 # Add section after Action Items 313 if "## Action Items" in content: 314 content = content.replace( 315 "## Action Items", 316 "## Attention Surface\n\n*Auto-surfaced by attention system*\n\n---\n\n## Action Items" 317 ) 318 else: 319 # Add at top 320 lines = content.split('\n') 321 lines.insert(2, "\n## Attention Surface\n\n*Auto-surfaced by attention system*\n") 322 content = '\n'.join(lines) 323 324 # Add the item to attention surface section 325 item_line = f"- [{action.priority:.1f}] {action.content} *(reason: {action.reason})*\n" 326 327 # Insert after "## Attention Surface" header 328 marker = "## Attention Surface\n" 329 idx = content.find(marker) 330 if idx >= 0: 331 insert_point = idx + len(marker) 332 # Skip any existing content markers 333 while insert_point < len(content) and content[insert_point] == '\n': 334 insert_point += 1 335 if content[insert_point:insert_point + 1] == '*': 336 # Skip the italicized description line 337 next_newline = content.find('\n', insert_point) 338 if next_newline >= 0: 339 insert_point = next_newline + 1 340 341 content = content[:insert_point] + item_line + content[insert_point:] 342 self.daily_note_path.write_text(content) 343 344 async def _handle_log(self, action: SurfacingAction) -> None: 345 """Log an attention event.""" 346 print(f"[Attention] {action.content}") 347 348 349 def create_default_loop(daily_note_dir: str) -> AttentionLoop: 350 """ 351 Create an attention loop with default sources. 352 353 Args: 354 daily_note_dir: Directory containing daily notes (e.g., Sovereign_Estate/daily/) 355 356 Returns: 357 Configured AttentionLoop 358 """ 359 # Get today's date for daily note 360 today = datetime.now().strftime('%Y-%m-%d') 361 daily_note_path = Path(daily_note_dir) / f"{today}.md" 362 363 loop = AttentionLoop(daily_note_path=str(daily_note_path)) 364 365 # Add default sources 366 loop.add_source(AttentionSource( 367 name="overcast", 368 source_type="podcast", 369 path="~/Downloads/overcast.opml", 370 poll_interval_seconds=300 # 5 minutes 371 )) 372 373 # Would add more sources here: 374 # - Browser history 375 # - Conversation logs 376 # - File access logs 377 378 return loop 379 380 381 if __name__ == "__main__": 382 import sys 383 384 print("=== Attention Loop Test ===\n") 385 386 # Create loop with test daily note 387 test_daily = "/Users/rcerf/repos/Sovereign_Estate/daily/2026-01-13.md" 388 loop = AttentionLoop(daily_note_path=test_daily) 389 390 # Add Overcast source 391 loop.add_source(AttentionSource( 392 name="overcast", 393 source_type="podcast", 394 path="~/Downloads/overcast.opml", 395 poll_interval_seconds=60 396 )) 397 398 print(f"Daily note: {test_daily}") 399 print(f"Sources: {list(loop.sources.keys())}") 400 401 # Run one tick 402 async def test_tick(): 403 actions = await loop.tick() 404 print(f"\nTick produced {len(actions)} actions:") 405 for action in actions: 406 print(f" - [{action.action_type}] {action.content[:50]}...") 407 408 asyncio.run(test_tick()) 409 410 print("\n=== Test Complete ===")