/ core / attention / unified_correlator.py
unified_correlator.py
  1  #!/usr/bin/env python3
  2  """
  3  Unified Correlator - Connects all data sources into a single correlation layer.
  4  
  5  Brings together:
  6  1. Claude sessions (from session transcripts)
  7  2. Monologue voice transcripts
  8  3. Comet browser history
  9  4. Semantic graph (atoms, edges from Sovereign_Estate)
 10  5. Claude.ai artifacts
 11  
 12  All data flows into a unified event stream that can be:
 13  - Correlated by time windows
 14  - Streamed via SSE
 15  - Queried for insights
 16  
 17  Usage:
 18      correlator = UnifiedCorrelator()
 19      correlator.start()  # Starts background watching
 20  
 21      # Get unified view
 22      report = correlator.get_unified_report(hours_back=24)
 23  
 24      # Subscribe to events
 25      correlator.subscribe('all', callback)
 26  """
 27  
 28  import json
 29  import threading
 30  from dataclasses import dataclass, field
 31  from datetime import datetime, timedelta
 32  from pathlib import Path
 33  from typing import Dict, List, Optional, Set, Any, Callable
 34  from collections import defaultdict
 35  from enum import Enum
 36  
 37  from .graph_integration import GraphIntegration, GraphEvent, GraphAtom
 38  from .multi_source_correlator import MultiSourceCorrelator, CorrelatedMoment, EventSource, SourceEvent
 39  from .artifact_analyzer import ArtifactAnalyzer, Artifact
 40  from .signal_words import detect_signals, SignalDetection
 41  
 42  
 43  class UnifiedEventType(Enum):
 44      """All event types across all sources."""
 45      # Original sources
 46      CLAUDE_SESSION = "claude_session"
 47      MONOLOGUE = "monologue"
 48      BROWSER = "browser"
 49  
 50      # Graph events
 51      GRAPH_ATOM = "graph_atom"
 52      GRAPH_EDGE = "graph_edge"
 53      GRAVITY_WELL = "gravity_well"
 54  
 55      # Artifact events
 56      ARTIFACT = "artifact"
 57  
 58      # Derived events
 59      AHA_MOMENT = "aha_moment"
 60      CORRELATION = "correlation"
 61      HIGH_RESONANCE = "high_resonance"
 62  
 63  
 64  @dataclass
 65  class UnifiedEvent:
 66      """A unified event from any source."""
 67      event_type: UnifiedEventType
 68      timestamp: datetime
 69      source_id: str  # UUID or source-specific ID
 70      content: str
 71  
 72      # Common attributes
 73      weight: float = 1.0
 74      tags: Set[str] = field(default_factory=set)
 75  
 76      # Source-specific metadata
 77      metadata: Dict[str, Any] = field(default_factory=dict)
 78  
 79      # Correlation data
 80      correlated_events: List[str] = field(default_factory=list)  # IDs of related events
 81  
 82      def to_dict(self) -> Dict[str, Any]:
 83          return {
 84              'event_type': self.event_type.value,
 85              'timestamp': self.timestamp.isoformat(),
 86              'source_id': self.source_id,
 87              'content': self.content[:500],
 88              'weight': self.weight,
 89              'tags': list(self.tags),
 90              'metadata': self.metadata,
 91              'correlated_count': len(self.correlated_events)
 92          }
 93  
 94  
 95  @dataclass
 96  class UnifiedReport:
 97      """Report from unified correlation analysis."""
 98      time_window_hours: float = 0.0
 99      total_events: int = 0
100  
101      # By source
102      events_by_source: Dict[str, int] = field(default_factory=dict)
103  
104      # High-value findings
105      aha_moments: List[UnifiedEvent] = field(default_factory=list)
106      high_resonance: List[UnifiedEvent] = field(default_factory=list)
107      gravity_wells: List[Dict[str, Any]] = field(default_factory=list)
108  
109      # Correlations
110      multi_source_correlations: List[CorrelatedMoment] = field(default_factory=list)
111  
112      # Timeline
113      event_timeline: List[UnifiedEvent] = field(default_factory=list)
114  
115      # Topics
116      unified_topic_map: Dict[str, List[str]] = field(default_factory=dict)
117  
118      def to_dict(self) -> Dict[str, Any]:
119          return {
120              'time_window_hours': self.time_window_hours,
121              'total_events': self.total_events,
122              'events_by_source': self.events_by_source,
123              'aha_count': len(self.aha_moments),
124              'high_resonance_count': len(self.high_resonance),
125              'gravity_wells': len(self.gravity_wells),
126              'correlations': len(self.multi_source_correlations),
127              'topics': list(self.unified_topic_map.keys())[:20]
128          }
129  
130      def to_markdown(self) -> str:
131          lines = [
132              "# Unified Correlation Report",
133              f"*{self.total_events} events, {self.time_window_hours:.0f} hours*",
134              "",
135          ]
136  
137          # Event distribution
138          lines.append("## Event Sources")
139          for source, count in sorted(self.events_by_source.items(), key=lambda x: -x[1]):
140              lines.append(f"- {source}: {count}")
141          lines.append("")
142  
143          # Aha moments
144          if self.aha_moments:
145              lines.append("## Aha Moments")
146              for event in self.aha_moments[:10]:
147                  lines.append(f"\n### {event.timestamp.strftime('%Y-%m-%d %H:%M')}")
148                  lines.append(f"Weight: {event.weight:.2f} | Tags: {', '.join(list(event.tags)[:5])}")
149                  lines.append(f"> {event.content[:200]}...")
150          lines.append("")
151  
152          # High resonance
153          if self.high_resonance:
154              lines.append("## High Resonance Content")
155              for event in self.high_resonance[:10]:
156                  lines.append(f"- [{event.event_type.value}] {event.content[:100]}...")
157          lines.append("")
158  
159          # Gravity wells
160          if self.gravity_wells:
161              lines.append("## Gravity Wells")
162              for well in self.gravity_wells[:10]:
163                  lines.append(f"- **{well.get('content', '')[:60]}...** ({well.get('total_edges', 0)} edges)")
164  
165          return '\n'.join(lines)
166  
167  
168  class UnifiedCorrelator:
169      """
170      Unifies all data sources into a single correlation layer.
171  
172      Watches multiple sources and correlates events by time, content, and topics.
173      """
174  
175      def __init__(self):
176          # Sub-systems
177          self.graph = GraphIntegration()
178          self.multi_source = MultiSourceCorrelator()
179          self.artifact_analyzer = ArtifactAnalyzer()
180  
181          # Unified event storage
182          self._events: Dict[str, UnifiedEvent] = {}
183  
184          # Subscribers
185          self._subscribers: Dict[str, List[Callable]] = defaultdict(list)
186  
187          # State
188          self._running = False
189          self._watch_thread: Optional[threading.Thread] = None
190  
191      def start(self):
192          """Start watching all sources."""
193          if self._running:
194              return
195  
196          self._running = True
197  
198          # Subscribe to graph events
199          self.graph.subscribe('all', self._on_graph_event)
200          self.graph.start_watching()
201  
202          print("[UnifiedCorrelator] Started - watching all sources")
203  
204      def stop(self):
205          """Stop watching."""
206          self._running = False
207          self.graph.stop_watching()
208          print("[UnifiedCorrelator] Stopped")
209  
210      def subscribe(self, event_type: str, callback: Callable[[UnifiedEvent], None]):
211          """Subscribe to unified events."""
212          self._subscribers[event_type].append(callback)
213  
214      def _emit(self, event: UnifiedEvent):
215          """Emit event to subscribers."""
216          for callback in self._subscribers.get(event.event_type.value, []):
217              try:
218                  callback(event)
219              except Exception as e:
220                  print(f"[UnifiedCorrelator] Subscriber error: {e}")
221  
222          for callback in self._subscribers.get('all', []):
223              try:
224                  callback(event)
225              except Exception as e:
226                  pass
227  
228      def _on_graph_event(self, graph_event: GraphEvent):
229          """Handle event from graph integration."""
230          # Convert to unified event
231          if graph_event.event_type == 'atom':
232              unified = UnifiedEvent(
233                  event_type=UnifiedEventType.GRAPH_ATOM,
234                  timestamp=graph_event.timestamp,
235                  source_id=graph_event.data.get('uuid', ''),
236                  content=graph_event.data.get('content', ''),
237                  weight=graph_event.data.get('resonance', 0.5),
238                  tags=set(graph_event.data.get('tags', [])),
239                  metadata={
240                      'altitude': graph_event.data.get('altitude'),
241                      'source_type': graph_event.data.get('source_type')
242                  }
243              )
244          elif graph_event.event_type == 'edge':
245              unified = UnifiedEvent(
246                  event_type=UnifiedEventType.GRAPH_EDGE,
247                  timestamp=graph_event.timestamp,
248                  source_id=f"{graph_event.data.get('source')}->{graph_event.data.get('target')}",
249                  content=f"Edge: {graph_event.data.get('type', 'relates_to')}",
250                  weight=graph_event.data.get('strength', 0.5),
251                  metadata=graph_event.data
252              )
253          elif graph_event.event_type == 'gravity_well':
254              unified = UnifiedEvent(
255                  event_type=UnifiedEventType.GRAVITY_WELL,
256                  timestamp=graph_event.timestamp,
257                  source_id=graph_event.data.get('uuid', ''),
258                  content=graph_event.data.get('content', ''),
259                  weight=2.0,  # High weight for gravity wells
260                  metadata={'edge_count': graph_event.data.get('edge_count', 0)}
261              )
262          else:
263              return
264  
265          self._events[unified.source_id] = unified
266          self._emit(unified)
267  
268      def get_unified_report(self, hours_back: float = 24) -> UnifiedReport:
269          """
270          Generate a unified correlation report across all sources.
271          """
272          report = UnifiedReport(time_window_hours=hours_back)
273          cutoff = datetime.now() - timedelta(hours=hours_back)
274  
275          # Load graph data
276          self.graph.load_range(days_back=max(1, int(hours_back / 24) + 1))
277  
278          # Load multi-source correlation
279          multi_report = self.multi_source.correlate(hours_back=hours_back)
280  
281          # Load artifacts
282          artifact_report = self.artifact_analyzer.analyze(hours_back=hours_back)
283  
284          # Count events by source
285          report.events_by_source = {
286              'graph_atoms': self.graph._state.atom_count,
287              'graph_edges': self.graph._state.edge_count,
288              'multi_source': multi_report.total_events,
289              'artifacts': artifact_report.artifacts_analyzed
290          }
291          report.total_events = sum(report.events_by_source.values())
292  
293          # High resonance from graph
294          for atom_data in self.graph.get_high_resonance_atoms(20):
295              report.high_resonance.append(UnifiedEvent(
296                  event_type=UnifiedEventType.HIGH_RESONANCE,
297                  timestamp=datetime.fromisoformat(atom_data.get('timestamp', datetime.now().isoformat()).replace('Z', '')),
298                  source_id=atom_data.get('uuid', ''),
299                  content=atom_data.get('content', ''),
300                  weight=atom_data.get('resonance', 0.5),
301                  tags=set(atom_data.get('tags', []))
302              ))
303  
304          # Gravity wells
305          report.gravity_wells = self.graph.get_gravity_wells()
306  
307          # Aha moments from multi-source
308          for moment in multi_report.aha_moments:
309              event = UnifiedEvent(
310                  event_type=UnifiedEventType.AHA_MOMENT,
311                  timestamp=moment.timestamp,
312                  source_id=f"aha-{moment.timestamp.isoformat()}",
313                  content=self._summarize_moment(moment),
314                  weight=moment.combined_weight,
315                  tags=self._extract_topics(moment),
316                  metadata={
317                      'claude_count': len(moment.claude_events),
318                      'monologue_count': len(moment.monologue_events),
319                      'browser_count': len(moment.browser_events),
320                      'confidence': moment.confidence
321                  }
322              )
323              report.aha_moments.append(event)
324  
325          # Correlations
326          report.multi_source_correlations = multi_report.aha_moments + multi_report.validation_seeking
327  
328          # Build unified topic map
329          for topic, events in multi_report.topic_research_map.items():
330              if topic not in report.unified_topic_map:
331                  report.unified_topic_map[topic] = []
332              report.unified_topic_map[topic].extend(events)
333  
334          # Add graph topics
335          for alt, count in self.graph._state.atoms_by_altitude.items():
336              report.unified_topic_map[f"altitude:{alt}"] = [f"{count} atoms"]
337  
338          # Sort and trim
339          report.aha_moments.sort(key=lambda e: e.weight, reverse=True)
340          report.high_resonance.sort(key=lambda e: e.weight, reverse=True)
341  
342          return report
343  
344      def _summarize_moment(self, moment: CorrelatedMoment) -> str:
345          """Summarize a correlated moment."""
346          parts = []
347          if moment.claude_events:
348              parts.append(f"Claude: {moment.claude_events[0].content[:100]}")
349          if moment.monologue_events:
350              parts.append(f"Voice: {moment.monologue_events[0].content[:100]}")
351          if moment.browser_events and moment.browser_events[0].browser_title:
352              parts.append(f"Researching: {moment.browser_events[0].browser_title}")
353          return " | ".join(parts) if parts else "Correlated moment"
354  
355      def _extract_topics(self, moment: CorrelatedMoment) -> Set[str]:
356          """Extract topics from a correlated moment."""
357          topics = set()
358          for event in moment.claude_events + moment.monologue_events:
359              topics.update(event.tags)
360          return topics
361  
362      def get_timeline(
363          self,
364          hours_back: float = 24,
365          event_types: Optional[List[str]] = None
366      ) -> List[UnifiedEvent]:
367          """
368          Get a timeline of all events, optionally filtered by type.
369          """
370          cutoff = datetime.now() - timedelta(hours=hours_back)
371  
372          events = []
373  
374          # Add graph atoms
375          for atom_data in self.graph.get_recent_atoms(limit=200):
376              ts_str = atom_data.get('timestamp', '')
377              try:
378                  ts = datetime.fromisoformat(ts_str.replace('Z', ''))
379              except:
380                  ts = datetime.now()
381  
382              if ts < cutoff:
383                  continue
384  
385              events.append(UnifiedEvent(
386                  event_type=UnifiedEventType.GRAPH_ATOM,
387                  timestamp=ts,
388                  source_id=atom_data.get('uuid', ''),
389                  content=atom_data.get('content', ''),
390                  weight=atom_data.get('weight', 1.0),
391                  tags=set(atom_data.get('tags', []))
392              ))
393  
394          # Filter by type if specified
395          if event_types:
396              type_set = {UnifiedEventType(t) for t in event_types if t in [e.value for e in UnifiedEventType]}
397              events = [e for e in events if e.event_type in type_set]
398  
399          # Sort by timestamp
400          events.sort(key=lambda e: e.timestamp, reverse=True)
401  
402          return events
403  
404  
405  # Singleton
406  _unified_correlator: Optional[UnifiedCorrelator] = None
407  
408  
409  def get_unified_correlator() -> UnifiedCorrelator:
410      """Get or create the unified correlator singleton."""
411      global _unified_correlator
412      if _unified_correlator is None:
413          _unified_correlator = UnifiedCorrelator()
414          _unified_correlator.start()
415      return _unified_correlator
416  
417  
418  if __name__ == '__main__':
419      print("=== Unified Correlator ===\n")
420  
421      correlator = UnifiedCorrelator()
422      report = correlator.get_unified_report(hours_back=24)
423  
424      print(report.to_markdown())