/ core / attention / compaction.py
compaction.py
  1  """
  2  Attention-Based Compaction
  3  
  4  "Attention is all you need" - applied to memory and context management.
  5  
  6  The insight: What you attend to should survive. What you ignore should fade.
  7  This is how human memory works. This is how transformer attention works.
  8  This is how Sovereign OS should work.
  9  
 10  Compaction is not housekeeping - it's the core intelligence of the system.
 11  The compression decisions ARE the priorities.
 12  
 13  Principles:
 14  1. High attention → slow decay → stays in context
 15  2. Low attention → fast decay → gracefully fades
 16  3. Unresolved → never decays → persists until closed
 17  4. Cross-session attractors → boosted → clearly important
 18  
 19  The compaction algorithm runs during BIRTH phase (nightly) but
 20  the attention signals accumulate continuously.
 21  """
 22  
 23  from dataclasses import dataclass, field
 24  from datetime import datetime, timedelta
 25  from typing import Optional, List, Dict, Any, Tuple
 26  from enum import Enum
 27  import math
 28  
 29  
 30  class RetentionTier(Enum):
 31      """Tiers of retention based on attention."""
 32      CORE = "core"          # Never compacts - fundamental to operator
 33      HOT = "hot"            # High attention - full fidelity
 34      WARM = "warm"          # Medium attention - compressed but accessible
 35      COOL = "cool"          # Low attention - archived summary
 36      COLD = "cold"          # No attention - candidate for removal
 37  
 38  
 39  @dataclass
 40  class AttentionScore:
 41      """Attention score for an item."""
 42      item_id: str
 43      raw_score: float  # 0-1 based on attention events
 44      decay_rate: float  # How fast it fades (lower = slower)
 45      last_attended: datetime
 46      attend_count: int = 0
 47      cross_session_boost: float = 0.0  # Boost for cross-session attractors
 48      unresolved: bool = False  # If true, never decays
 49  
 50      @property
 51      def effective_score(self) -> float:
 52          """Compute effective score with decay and boosts."""
 53          if self.unresolved:
 54              return 1.0  # Unresolved items stay at max
 55  
 56          # Time decay
 57          elapsed = (datetime.now() - self.last_attended).total_seconds()
 58          decay_factor = math.exp(-self.decay_rate * elapsed / 86400)  # Per day
 59  
 60          # Cross-session boost
 61          boost = 1.0 + self.cross_session_boost
 62  
 63          return min(1.0, self.raw_score * decay_factor * boost)
 64  
 65      @property
 66      def tier(self) -> RetentionTier:
 67          """Determine retention tier from effective score."""
 68          score = self.effective_score
 69  
 70          if self.unresolved:
 71              return RetentionTier.CORE
 72          elif score > 0.8:
 73              return RetentionTier.HOT
 74          elif score > 0.5:
 75              return RetentionTier.WARM
 76          elif score > 0.2:
 77              return RetentionTier.COOL
 78          else:
 79              return RetentionTier.COLD
 80  
 81  
 82  @dataclass
 83  class CompactionDecision:
 84      """A decision about how to compact an item."""
 85      item_id: str
 86      tier: RetentionTier
 87      action: str  # 'keep', 'compress', 'archive', 'remove'
 88      reason: str
 89      attention_score: float
 90      content_summary: Optional[str] = None  # For compressed items
 91  
 92  
 93  @dataclass
 94  class CompactionResult:
 95      """Results of a compaction run."""
 96      timestamp: datetime
 97      items_processed: int
 98      decisions: List[CompactionDecision] = field(default_factory=list)
 99      bytes_before: int = 0
100      bytes_after: int = 0
101  
102      @property
103      def compression_ratio(self) -> float:
104          """How much we compressed."""
105          if self.bytes_before == 0:
106              return 1.0
107          return self.bytes_after / self.bytes_before
108  
109      def summary(self) -> str:
110          """Human-readable summary."""
111          by_tier = {}
112          for d in self.decisions:
113              tier = d.tier.value
114              by_tier[tier] = by_tier.get(tier, 0) + 1
115  
116          lines = [
117              f"Compaction at {self.timestamp.strftime('%Y-%m-%d %H:%M')}",
118              f"  Processed: {self.items_processed} items",
119              f"  Compression: {self.compression_ratio:.1%}",
120              "  By tier:"
121          ]
122          for tier, count in sorted(by_tier.items()):
123              lines.append(f"    {tier}: {count}")
124  
125          return "\n".join(lines)
126  
127  
128  class AttentionCompactor:
129      """
130      Compacts items based on attention scores.
131  
132      The compactor:
133      1. Takes attention signals from the tracker
134      2. Computes retention scores for all items
135      3. Decides what to keep, compress, archive, or remove
136      4. Applies compression strategies per tier
137  
138      Run during BIRTH phase for daily compaction.
139      """
140  
141      def __init__(
142          self,
143          decay_rates: Dict[str, float] = None,
144          tier_thresholds: Dict[str, float] = None
145      ):
146          # Default decay rates by item type
147          self.decay_rates = decay_rates or {
148              'bullet': 0.1,      # Slow decay for ideas
149              'episode': 0.2,     # Medium decay for podcast episodes
150              'link': 0.3,        # Faster decay for links
151              'session': 0.05,    # Very slow decay for session context
152              'default': 0.15
153          }
154  
155          # Thresholds for tier assignment (can be tuned)
156          self.tier_thresholds = tier_thresholds or {
157              'hot': 0.8,
158              'warm': 0.5,
159              'cool': 0.2
160          }
161  
162          # Attention scores by item
163          self._scores: Dict[str, AttentionScore] = {}
164  
165          # Items marked as unresolved (nag list)
166          self._unresolved: set = set()
167  
168          # Cross-session attractors (get a boost)
169          self._attractors: set = set()
170  
171      def record_attention(
172          self,
173          item_id: str,
174          item_type: str = 'default',
175          intensity: float = 1.0
176      ) -> None:
177          """
178          Record an attention event for an item.
179  
180          Args:
181              item_id: The item that received attention
182              item_type: Type of item (for decay rate selection)
183              intensity: How intense the attention was (0-1)
184          """
185          decay_rate = self.decay_rates.get(item_type, self.decay_rates['default'])
186  
187          if item_id in self._scores:
188              score = self._scores[item_id]
189              # Blend new attention with existing
190              score.raw_score = min(1.0, score.raw_score * 0.8 + intensity * 0.2)
191              score.last_attended = datetime.now()
192              score.attend_count += 1
193          else:
194              self._scores[item_id] = AttentionScore(
195                  item_id=item_id,
196                  raw_score=intensity,
197                  decay_rate=decay_rate,
198                  last_attended=datetime.now(),
199                  attend_count=1
200              )
201  
202          # Check if it's an attractor
203          if item_id in self._attractors:
204              self._scores[item_id].cross_session_boost = 0.3
205  
206      def mark_unresolved(self, item_id: str) -> None:
207          """Mark an item as unresolved (will never decay)."""
208          self._unresolved.add(item_id)
209          if item_id in self._scores:
210              self._scores[item_id].unresolved = True
211  
212      def resolve(self, item_id: str) -> None:
213          """Mark an item as resolved (can now decay)."""
214          self._unresolved.discard(item_id)
215          if item_id in self._scores:
216              self._scores[item_id].unresolved = False
217  
218      def set_attractors(self, attractor_ids: List[str]) -> None:
219          """Set the current cross-session attractors."""
220          self._attractors = set(attractor_ids)
221          for item_id in attractor_ids:
222              if item_id in self._scores:
223                  self._scores[item_id].cross_session_boost = 0.3
224  
225      def get_tier(self, item_id: str) -> RetentionTier:
226          """Get the retention tier for an item."""
227          if item_id not in self._scores:
228              return RetentionTier.COLD
229  
230          return self._scores[item_id].tier
231  
232      def run_compaction(
233          self,
234          items: List[Dict[str, Any]],
235          dry_run: bool = False
236      ) -> CompactionResult:
237          """
238          Run compaction on a list of items.
239  
240          Args:
241              items: List of dicts with 'id', 'type', 'content', 'size'
242              dry_run: If True, don't actually modify anything
243  
244          Returns:
245              CompactionResult with decisions and stats
246          """
247          result = CompactionResult(
248              timestamp=datetime.now(),
249              items_processed=len(items)
250          )
251  
252          result.bytes_before = sum(item.get('size', len(item.get('content', '')))
253                                    for item in items)
254  
255          for item in items:
256              item_id = item['id']
257              item_type = item.get('type', 'default')
258              content = item.get('content', '')
259  
260              # Get or create attention score
261              if item_id not in self._scores:
262                  # No attention record = cold
263                  score = 0.0
264                  tier = RetentionTier.COLD
265              else:
266                  score = self._scores[item_id].effective_score
267                  tier = self._scores[item_id].tier
268  
269              # Decide action based on tier
270              decision = self._make_decision(item_id, tier, content)
271              result.decisions.append(decision)
272  
273              # Track compressed size
274              if decision.action == 'keep':
275                  result.bytes_after += len(content)
276              elif decision.action == 'compress':
277                  result.bytes_after += len(decision.content_summary or '')
278              # 'archive' and 'remove' don't count toward active size
279  
280          return result
281  
282      def _make_decision(
283          self,
284          item_id: str,
285          tier: RetentionTier,
286          content: str
287      ) -> CompactionDecision:
288          """Make a compaction decision for an item."""
289  
290          if tier == RetentionTier.CORE:
291              return CompactionDecision(
292                  item_id=item_id,
293                  tier=tier,
294                  action='keep',
295                  reason='Unresolved - must persist',
296                  attention_score=1.0
297              )
298  
299          elif tier == RetentionTier.HOT:
300              return CompactionDecision(
301                  item_id=item_id,
302                  tier=tier,
303                  action='keep',
304                  reason='High attention - full fidelity',
305                  attention_score=self._scores[item_id].effective_score
306              )
307  
308          elif tier == RetentionTier.WARM:
309              # Compress to summary
310              summary = self._compress_content(content)
311              return CompactionDecision(
312                  item_id=item_id,
313                  tier=tier,
314                  action='compress',
315                  reason='Medium attention - compressed',
316                  attention_score=self._scores[item_id].effective_score,
317                  content_summary=summary
318              )
319  
320          elif tier == RetentionTier.COOL:
321              # Archive with minimal summary
322              summary = self._minimal_summary(content)
323              return CompactionDecision(
324                  item_id=item_id,
325                  tier=tier,
326                  action='archive',
327                  reason='Low attention - archived',
328                  attention_score=self._scores.get(item_id, AttentionScore(
329                      item_id=item_id, raw_score=0, decay_rate=0.1,
330                      last_attended=datetime.now()
331                  )).effective_score,
332                  content_summary=summary
333              )
334  
335          else:  # COLD
336              return CompactionDecision(
337                  item_id=item_id,
338                  tier=tier,
339                  action='remove',
340                  reason='No attention - candidate for removal',
341                  attention_score=0.0
342              )
343  
344      def _compress_content(self, content: str) -> str:
345          """Compress content while preserving essence."""
346          # Simple strategy: first 200 chars + last 100 chars
347          # Real implementation would use LLM summarization
348          if len(content) <= 300:
349              return content
350  
351          return f"{content[:200]}... [{len(content)} chars] ...{content[-100:]}"
352  
353      def _minimal_summary(self, content: str) -> str:
354          """Create minimal summary for archival."""
355          # Just first 100 chars
356          if len(content) <= 100:
357              return content
358          return f"{content[:100]}..."
359  
360  
361  class DailyNoteCompactor:
362      """
363      Applies compaction to the daily note.
364  
365      During BIRTH phase, this:
366      1. Reads the day's accumulated content
367      2. Applies attention-based compaction
368      3. Writes compressed version for tomorrow's context
369      4. Archives full version for retrieval if needed
370      """
371  
372      def __init__(
373          self,
374          compactor: AttentionCompactor,
375          daily_dir: str,
376          archive_dir: str
377      ):
378          from pathlib import Path
379  
380          self.compactor = compactor
381          self.daily_dir = Path(daily_dir)
382          self.archive_dir = Path(archive_dir)
383          self.archive_dir.mkdir(parents=True, exist_ok=True)
384  
385      def compact_day(self, date: datetime) -> CompactionResult:
386          """
387          Compact a day's daily note.
388  
389          Args:
390              date: The date to compact
391  
392          Returns:
393              CompactionResult
394          """
395          date_str = date.strftime('%Y-%m-%d')
396          daily_note = self.daily_dir / f"{date_str}.md"
397  
398          if not daily_note.exists():
399              return CompactionResult(
400                  timestamp=datetime.now(),
401                  items_processed=0
402              )
403  
404          content = daily_note.read_text()
405  
406          # Parse into items (sections)
407          items = self._parse_sections(content, date_str)
408  
409          # Run compaction
410          result = self.compactor.run_compaction(items)
411  
412          # Archive original
413          archive_path = self.archive_dir / f"{date_str}-full.md"
414          archive_path.write_text(content)
415  
416          # Write compacted version (for context carryover)
417          compacted_content = self._rebuild_note(result)
418          compacted_path = self.daily_dir / f"{date_str}-compacted.md"
419          compacted_path.write_text(compacted_content)
420  
421          return result
422  
423      def _parse_sections(
424          self,
425          content: str,
426          date_str: str
427      ) -> List[Dict[str, Any]]:
428          """Parse daily note into sections."""
429          items = []
430          current_section = None
431          current_content = []
432  
433          for line in content.split('\n'):
434              if line.startswith('## '):
435                  # New section
436                  if current_section:
437                      items.append({
438                          'id': f"{date_str}:{current_section}",
439                          'type': 'section',
440                          'name': current_section,
441                          'content': '\n'.join(current_content),
442                          'size': len('\n'.join(current_content))
443                      })
444                  current_section = line[3:].strip()
445                  current_content = []
446              else:
447                  current_content.append(line)
448  
449          # Last section
450          if current_section:
451              items.append({
452                  'id': f"{date_str}:{current_section}",
453                  'type': 'section',
454                  'name': current_section,
455                  'content': '\n'.join(current_content),
456                  'size': len('\n'.join(current_content))
457              })
458  
459          return items
460  
461      def _rebuild_note(self, result: CompactionResult) -> str:
462          """Rebuild note from compaction decisions."""
463          lines = [f"# Compacted Daily Note\n"]
464          lines.append(f"*Compacted at {result.timestamp.strftime('%Y-%m-%d %H:%M')}*")
465          lines.append(f"*Compression ratio: {result.compression_ratio:.1%}*\n")
466  
467          for decision in result.decisions:
468              if decision.action in ('keep', 'compress'):
469                  section_name = decision.item_id.split(':')[-1]
470                  lines.append(f"## {section_name}")
471  
472                  if decision.action == 'keep':
473                      lines.append(f"*[HOT - full fidelity]*\n")
474                  else:
475                      lines.append(f"*[WARM - compressed]*")
476                      lines.append(decision.content_summary or "")
477                  lines.append("")
478  
479          return '\n'.join(lines)
480  
481  
482  # Factory function
483  def create_compaction_system(
484      daily_dir: str,
485      archive_dir: str = None
486  ) -> Tuple[AttentionCompactor, DailyNoteCompactor]:
487      """
488      Create the compaction system.
489  
490      Args:
491          daily_dir: Path to daily notes
492          archive_dir: Path for archives (defaults to daily_dir/archive)
493  
494      Returns:
495          (AttentionCompactor, DailyNoteCompactor)
496      """
497      from pathlib import Path
498  
499      if archive_dir is None:
500          archive_dir = str(Path(daily_dir) / 'archive')
501  
502      compactor = AttentionCompactor()
503      daily_compactor = DailyNoteCompactor(compactor, daily_dir, archive_dir)
504  
505      return compactor, daily_compactor
506  
507  
508  if __name__ == "__main__":
509      print("=== Attention-Based Compaction Test ===\n")
510  
511      compactor = AttentionCompactor()
512  
513      # Simulate attention events
514      compactor.record_attention("idea_001", item_type="bullet", intensity=0.9)
515      compactor.record_attention("idea_001", item_type="bullet", intensity=0.8)
516      compactor.record_attention("idea_002", item_type="bullet", intensity=0.5)
517      compactor.record_attention("idea_003", item_type="bullet", intensity=0.2)
518      # idea_004 has no attention
519  
520      # Mark one as unresolved
521      compactor.mark_unresolved("idea_005")
522      compactor.record_attention("idea_005", item_type="bullet", intensity=0.3)
523  
524      # Set cross-session attractors
525      compactor.set_attractors(["idea_002"])
526  
527      # Check tiers
528      print("Retention tiers:")
529      for item_id in ["idea_001", "idea_002", "idea_003", "idea_004", "idea_005"]:
530          tier = compactor.get_tier(item_id)
531          print(f"  {item_id}: {tier.value}")
532  
533      # Run compaction
534      items = [
535          {"id": "idea_001", "type": "bullet", "content": "High attention idea with lots of detail " * 20},
536          {"id": "idea_002", "type": "bullet", "content": "Medium attention idea " * 10},
537          {"id": "idea_003", "type": "bullet", "content": "Low attention idea " * 5},
538          {"id": "idea_004", "type": "bullet", "content": "No attention idea"},
539          {"id": "idea_005", "type": "bullet", "content": "Unresolved - must keep"},
540      ]
541  
542      result = compactor.run_compaction(items)
543  
544      print(f"\n{result.summary()}")
545  
546      print("\nDecisions:")
547      for d in result.decisions:
548          print(f"  {d.item_id}: {d.action} ({d.reason})")
549  
550      print("\n'Attention is all you need'")