compaction.py
1 """ 2 Attention-Based Compaction 3 4 "Attention is all you need" - applied to memory and context management. 5 6 The insight: What you attend to should survive. What you ignore should fade. 7 This is how human memory works. This is how transformer attention works. 8 This is how Sovereign OS should work. 9 10 Compaction is not housekeeping - it's the core intelligence of the system. 11 The compression decisions ARE the priorities. 12 13 Principles: 14 1. High attention → slow decay → stays in context 15 2. Low attention → fast decay → gracefully fades 16 3. Unresolved → never decays → persists until closed 17 4. Cross-session attractors → boosted → clearly important 18 19 The compaction algorithm runs during BIRTH phase (nightly) but 20 the attention signals accumulate continuously. 21 """ 22 23 from dataclasses import dataclass, field 24 from datetime import datetime, timedelta 25 from typing import Optional, List, Dict, Any, Tuple 26 from enum import Enum 27 import math 28 29 30 class RetentionTier(Enum): 31 """Tiers of retention based on attention.""" 32 CORE = "core" # Never compacts - fundamental to operator 33 HOT = "hot" # High attention - full fidelity 34 WARM = "warm" # Medium attention - compressed but accessible 35 COOL = "cool" # Low attention - archived summary 36 COLD = "cold" # No attention - candidate for removal 37 38 39 @dataclass 40 class AttentionScore: 41 """Attention score for an item.""" 42 item_id: str 43 raw_score: float # 0-1 based on attention events 44 decay_rate: float # How fast it fades (lower = slower) 45 last_attended: datetime 46 attend_count: int = 0 47 cross_session_boost: float = 0.0 # Boost for cross-session attractors 48 unresolved: bool = False # If true, never decays 49 50 @property 51 def effective_score(self) -> float: 52 """Compute effective score with decay and boosts.""" 53 if self.unresolved: 54 return 1.0 # Unresolved items stay at max 55 56 # Time decay 57 elapsed = (datetime.now() - self.last_attended).total_seconds() 58 decay_factor = math.exp(-self.decay_rate * elapsed / 86400) # Per day 59 60 # Cross-session boost 61 boost = 1.0 + self.cross_session_boost 62 63 return min(1.0, self.raw_score * decay_factor * boost) 64 65 @property 66 def tier(self) -> RetentionTier: 67 """Determine retention tier from effective score.""" 68 score = self.effective_score 69 70 if self.unresolved: 71 return RetentionTier.CORE 72 elif score > 0.8: 73 return RetentionTier.HOT 74 elif score > 0.5: 75 return RetentionTier.WARM 76 elif score > 0.2: 77 return RetentionTier.COOL 78 else: 79 return RetentionTier.COLD 80 81 82 @dataclass 83 class CompactionDecision: 84 """A decision about how to compact an item.""" 85 item_id: str 86 tier: RetentionTier 87 action: str # 'keep', 'compress', 'archive', 'remove' 88 reason: str 89 attention_score: float 90 content_summary: Optional[str] = None # For compressed items 91 92 93 @dataclass 94 class CompactionResult: 95 """Results of a compaction run.""" 96 timestamp: datetime 97 items_processed: int 98 decisions: List[CompactionDecision] = field(default_factory=list) 99 bytes_before: int = 0 100 bytes_after: int = 0 101 102 @property 103 def compression_ratio(self) -> float: 104 """How much we compressed.""" 105 if self.bytes_before == 0: 106 return 1.0 107 return self.bytes_after / self.bytes_before 108 109 def summary(self) -> str: 110 """Human-readable summary.""" 111 by_tier = {} 112 for d in self.decisions: 113 tier = d.tier.value 114 by_tier[tier] = by_tier.get(tier, 0) + 1 115 116 lines = [ 117 f"Compaction at {self.timestamp.strftime('%Y-%m-%d %H:%M')}", 118 f" Processed: {self.items_processed} items", 119 f" Compression: {self.compression_ratio:.1%}", 120 " By tier:" 121 ] 122 for tier, count in sorted(by_tier.items()): 123 lines.append(f" {tier}: {count}") 124 125 return "\n".join(lines) 126 127 128 class AttentionCompactor: 129 """ 130 Compacts items based on attention scores. 131 132 The compactor: 133 1. Takes attention signals from the tracker 134 2. Computes retention scores for all items 135 3. Decides what to keep, compress, archive, or remove 136 4. Applies compression strategies per tier 137 138 Run during BIRTH phase for daily compaction. 139 """ 140 141 def __init__( 142 self, 143 decay_rates: Dict[str, float] = None, 144 tier_thresholds: Dict[str, float] = None 145 ): 146 # Default decay rates by item type 147 self.decay_rates = decay_rates or { 148 'bullet': 0.1, # Slow decay for ideas 149 'episode': 0.2, # Medium decay for podcast episodes 150 'link': 0.3, # Faster decay for links 151 'session': 0.05, # Very slow decay for session context 152 'default': 0.15 153 } 154 155 # Thresholds for tier assignment (can be tuned) 156 self.tier_thresholds = tier_thresholds or { 157 'hot': 0.8, 158 'warm': 0.5, 159 'cool': 0.2 160 } 161 162 # Attention scores by item 163 self._scores: Dict[str, AttentionScore] = {} 164 165 # Items marked as unresolved (nag list) 166 self._unresolved: set = set() 167 168 # Cross-session attractors (get a boost) 169 self._attractors: set = set() 170 171 def record_attention( 172 self, 173 item_id: str, 174 item_type: str = 'default', 175 intensity: float = 1.0 176 ) -> None: 177 """ 178 Record an attention event for an item. 179 180 Args: 181 item_id: The item that received attention 182 item_type: Type of item (for decay rate selection) 183 intensity: How intense the attention was (0-1) 184 """ 185 decay_rate = self.decay_rates.get(item_type, self.decay_rates['default']) 186 187 if item_id in self._scores: 188 score = self._scores[item_id] 189 # Blend new attention with existing 190 score.raw_score = min(1.0, score.raw_score * 0.8 + intensity * 0.2) 191 score.last_attended = datetime.now() 192 score.attend_count += 1 193 else: 194 self._scores[item_id] = AttentionScore( 195 item_id=item_id, 196 raw_score=intensity, 197 decay_rate=decay_rate, 198 last_attended=datetime.now(), 199 attend_count=1 200 ) 201 202 # Check if it's an attractor 203 if item_id in self._attractors: 204 self._scores[item_id].cross_session_boost = 0.3 205 206 def mark_unresolved(self, item_id: str) -> None: 207 """Mark an item as unresolved (will never decay).""" 208 self._unresolved.add(item_id) 209 if item_id in self._scores: 210 self._scores[item_id].unresolved = True 211 212 def resolve(self, item_id: str) -> None: 213 """Mark an item as resolved (can now decay).""" 214 self._unresolved.discard(item_id) 215 if item_id in self._scores: 216 self._scores[item_id].unresolved = False 217 218 def set_attractors(self, attractor_ids: List[str]) -> None: 219 """Set the current cross-session attractors.""" 220 self._attractors = set(attractor_ids) 221 for item_id in attractor_ids: 222 if item_id in self._scores: 223 self._scores[item_id].cross_session_boost = 0.3 224 225 def get_tier(self, item_id: str) -> RetentionTier: 226 """Get the retention tier for an item.""" 227 if item_id not in self._scores: 228 return RetentionTier.COLD 229 230 return self._scores[item_id].tier 231 232 def run_compaction( 233 self, 234 items: List[Dict[str, Any]], 235 dry_run: bool = False 236 ) -> CompactionResult: 237 """ 238 Run compaction on a list of items. 239 240 Args: 241 items: List of dicts with 'id', 'type', 'content', 'size' 242 dry_run: If True, don't actually modify anything 243 244 Returns: 245 CompactionResult with decisions and stats 246 """ 247 result = CompactionResult( 248 timestamp=datetime.now(), 249 items_processed=len(items) 250 ) 251 252 result.bytes_before = sum(item.get('size', len(item.get('content', ''))) 253 for item in items) 254 255 for item in items: 256 item_id = item['id'] 257 item_type = item.get('type', 'default') 258 content = item.get('content', '') 259 260 # Get or create attention score 261 if item_id not in self._scores: 262 # No attention record = cold 263 score = 0.0 264 tier = RetentionTier.COLD 265 else: 266 score = self._scores[item_id].effective_score 267 tier = self._scores[item_id].tier 268 269 # Decide action based on tier 270 decision = self._make_decision(item_id, tier, content) 271 result.decisions.append(decision) 272 273 # Track compressed size 274 if decision.action == 'keep': 275 result.bytes_after += len(content) 276 elif decision.action == 'compress': 277 result.bytes_after += len(decision.content_summary or '') 278 # 'archive' and 'remove' don't count toward active size 279 280 return result 281 282 def _make_decision( 283 self, 284 item_id: str, 285 tier: RetentionTier, 286 content: str 287 ) -> CompactionDecision: 288 """Make a compaction decision for an item.""" 289 290 if tier == RetentionTier.CORE: 291 return CompactionDecision( 292 item_id=item_id, 293 tier=tier, 294 action='keep', 295 reason='Unresolved - must persist', 296 attention_score=1.0 297 ) 298 299 elif tier == RetentionTier.HOT: 300 return CompactionDecision( 301 item_id=item_id, 302 tier=tier, 303 action='keep', 304 reason='High attention - full fidelity', 305 attention_score=self._scores[item_id].effective_score 306 ) 307 308 elif tier == RetentionTier.WARM: 309 # Compress to summary 310 summary = self._compress_content(content) 311 return CompactionDecision( 312 item_id=item_id, 313 tier=tier, 314 action='compress', 315 reason='Medium attention - compressed', 316 attention_score=self._scores[item_id].effective_score, 317 content_summary=summary 318 ) 319 320 elif tier == RetentionTier.COOL: 321 # Archive with minimal summary 322 summary = self._minimal_summary(content) 323 return CompactionDecision( 324 item_id=item_id, 325 tier=tier, 326 action='archive', 327 reason='Low attention - archived', 328 attention_score=self._scores.get(item_id, AttentionScore( 329 item_id=item_id, raw_score=0, decay_rate=0.1, 330 last_attended=datetime.now() 331 )).effective_score, 332 content_summary=summary 333 ) 334 335 else: # COLD 336 return CompactionDecision( 337 item_id=item_id, 338 tier=tier, 339 action='remove', 340 reason='No attention - candidate for removal', 341 attention_score=0.0 342 ) 343 344 def _compress_content(self, content: str) -> str: 345 """Compress content while preserving essence.""" 346 # Simple strategy: first 200 chars + last 100 chars 347 # Real implementation would use LLM summarization 348 if len(content) <= 300: 349 return content 350 351 return f"{content[:200]}... [{len(content)} chars] ...{content[-100:]}" 352 353 def _minimal_summary(self, content: str) -> str: 354 """Create minimal summary for archival.""" 355 # Just first 100 chars 356 if len(content) <= 100: 357 return content 358 return f"{content[:100]}..." 359 360 361 class DailyNoteCompactor: 362 """ 363 Applies compaction to the daily note. 364 365 During BIRTH phase, this: 366 1. Reads the day's accumulated content 367 2. Applies attention-based compaction 368 3. Writes compressed version for tomorrow's context 369 4. Archives full version for retrieval if needed 370 """ 371 372 def __init__( 373 self, 374 compactor: AttentionCompactor, 375 daily_dir: str, 376 archive_dir: str 377 ): 378 from pathlib import Path 379 380 self.compactor = compactor 381 self.daily_dir = Path(daily_dir) 382 self.archive_dir = Path(archive_dir) 383 self.archive_dir.mkdir(parents=True, exist_ok=True) 384 385 def compact_day(self, date: datetime) -> CompactionResult: 386 """ 387 Compact a day's daily note. 388 389 Args: 390 date: The date to compact 391 392 Returns: 393 CompactionResult 394 """ 395 date_str = date.strftime('%Y-%m-%d') 396 daily_note = self.daily_dir / f"{date_str}.md" 397 398 if not daily_note.exists(): 399 return CompactionResult( 400 timestamp=datetime.now(), 401 items_processed=0 402 ) 403 404 content = daily_note.read_text() 405 406 # Parse into items (sections) 407 items = self._parse_sections(content, date_str) 408 409 # Run compaction 410 result = self.compactor.run_compaction(items) 411 412 # Archive original 413 archive_path = self.archive_dir / f"{date_str}-full.md" 414 archive_path.write_text(content) 415 416 # Write compacted version (for context carryover) 417 compacted_content = self._rebuild_note(result) 418 compacted_path = self.daily_dir / f"{date_str}-compacted.md" 419 compacted_path.write_text(compacted_content) 420 421 return result 422 423 def _parse_sections( 424 self, 425 content: str, 426 date_str: str 427 ) -> List[Dict[str, Any]]: 428 """Parse daily note into sections.""" 429 items = [] 430 current_section = None 431 current_content = [] 432 433 for line in content.split('\n'): 434 if line.startswith('## '): 435 # New section 436 if current_section: 437 items.append({ 438 'id': f"{date_str}:{current_section}", 439 'type': 'section', 440 'name': current_section, 441 'content': '\n'.join(current_content), 442 'size': len('\n'.join(current_content)) 443 }) 444 current_section = line[3:].strip() 445 current_content = [] 446 else: 447 current_content.append(line) 448 449 # Last section 450 if current_section: 451 items.append({ 452 'id': f"{date_str}:{current_section}", 453 'type': 'section', 454 'name': current_section, 455 'content': '\n'.join(current_content), 456 'size': len('\n'.join(current_content)) 457 }) 458 459 return items 460 461 def _rebuild_note(self, result: CompactionResult) -> str: 462 """Rebuild note from compaction decisions.""" 463 lines = [f"# Compacted Daily Note\n"] 464 lines.append(f"*Compacted at {result.timestamp.strftime('%Y-%m-%d %H:%M')}*") 465 lines.append(f"*Compression ratio: {result.compression_ratio:.1%}*\n") 466 467 for decision in result.decisions: 468 if decision.action in ('keep', 'compress'): 469 section_name = decision.item_id.split(':')[-1] 470 lines.append(f"## {section_name}") 471 472 if decision.action == 'keep': 473 lines.append(f"*[HOT - full fidelity]*\n") 474 else: 475 lines.append(f"*[WARM - compressed]*") 476 lines.append(decision.content_summary or "") 477 lines.append("") 478 479 return '\n'.join(lines) 480 481 482 # Factory function 483 def create_compaction_system( 484 daily_dir: str, 485 archive_dir: str = None 486 ) -> Tuple[AttentionCompactor, DailyNoteCompactor]: 487 """ 488 Create the compaction system. 489 490 Args: 491 daily_dir: Path to daily notes 492 archive_dir: Path for archives (defaults to daily_dir/archive) 493 494 Returns: 495 (AttentionCompactor, DailyNoteCompactor) 496 """ 497 from pathlib import Path 498 499 if archive_dir is None: 500 archive_dir = str(Path(daily_dir) / 'archive') 501 502 compactor = AttentionCompactor() 503 daily_compactor = DailyNoteCompactor(compactor, daily_dir, archive_dir) 504 505 return compactor, daily_compactor 506 507 508 if __name__ == "__main__": 509 print("=== Attention-Based Compaction Test ===\n") 510 511 compactor = AttentionCompactor() 512 513 # Simulate attention events 514 compactor.record_attention("idea_001", item_type="bullet", intensity=0.9) 515 compactor.record_attention("idea_001", item_type="bullet", intensity=0.8) 516 compactor.record_attention("idea_002", item_type="bullet", intensity=0.5) 517 compactor.record_attention("idea_003", item_type="bullet", intensity=0.2) 518 # idea_004 has no attention 519 520 # Mark one as unresolved 521 compactor.mark_unresolved("idea_005") 522 compactor.record_attention("idea_005", item_type="bullet", intensity=0.3) 523 524 # Set cross-session attractors 525 compactor.set_attractors(["idea_002"]) 526 527 # Check tiers 528 print("Retention tiers:") 529 for item_id in ["idea_001", "idea_002", "idea_003", "idea_004", "idea_005"]: 530 tier = compactor.get_tier(item_id) 531 print(f" {item_id}: {tier.value}") 532 533 # Run compaction 534 items = [ 535 {"id": "idea_001", "type": "bullet", "content": "High attention idea with lots of detail " * 20}, 536 {"id": "idea_002", "type": "bullet", "content": "Medium attention idea " * 10}, 537 {"id": "idea_003", "type": "bullet", "content": "Low attention idea " * 5}, 538 {"id": "idea_004", "type": "bullet", "content": "No attention idea"}, 539 {"id": "idea_005", "type": "bullet", "content": "Unresolved - must keep"}, 540 ] 541 542 result = compactor.run_compaction(items) 543 544 print(f"\n{result.summary()}") 545 546 print("\nDecisions:") 547 for d in result.decisions: 548 print(f" {d.item_id}: {d.action} ({d.reason})") 549 550 print("\n'Attention is all you need'")