unified_correlator.py
1 #!/usr/bin/env python3 2 """ 3 Unified Correlator - Connects all data sources into a single correlation layer. 4 5 Brings together: 6 1. Claude sessions (from session transcripts) 7 2. Monologue voice transcripts 8 3. Comet browser history 9 4. Semantic graph (atoms, edges from Sovereign_Estate) 10 5. Claude.ai artifacts 11 12 All data flows into a unified event stream that can be: 13 - Correlated by time windows 14 - Streamed via SSE 15 - Queried for insights 16 17 Usage: 18 correlator = UnifiedCorrelator() 19 correlator.start() # Starts background watching 20 21 # Get unified view 22 report = correlator.get_unified_report(hours_back=24) 23 24 # Subscribe to events 25 correlator.subscribe('all', callback) 26 """ 27 28 import json 29 import threading 30 from dataclasses import dataclass, field 31 from datetime import datetime, timedelta 32 from pathlib import Path 33 from typing import Dict, List, Optional, Set, Any, Callable 34 from collections import defaultdict 35 from enum import Enum 36 37 from .graph_integration import GraphIntegration, GraphEvent, GraphAtom 38 from .multi_source_correlator import MultiSourceCorrelator, CorrelatedMoment, EventSource, SourceEvent 39 from .artifact_analyzer import ArtifactAnalyzer, Artifact 40 from .signal_words import detect_signals, SignalDetection 41 42 43 class UnifiedEventType(Enum): 44 """All event types across all sources.""" 45 # Original sources 46 CLAUDE_SESSION = "claude_session" 47 MONOLOGUE = "monologue" 48 BROWSER = "browser" 49 50 # Graph events 51 GRAPH_ATOM = "graph_atom" 52 GRAPH_EDGE = "graph_edge" 53 GRAVITY_WELL = "gravity_well" 54 55 # Artifact events 56 ARTIFACT = "artifact" 57 58 # Derived events 59 AHA_MOMENT = "aha_moment" 60 CORRELATION = "correlation" 61 HIGH_RESONANCE = "high_resonance" 62 63 64 @dataclass 65 class UnifiedEvent: 66 """A unified event from any source.""" 67 event_type: UnifiedEventType 68 timestamp: datetime 69 source_id: str # UUID or source-specific ID 70 content: str 71 72 # Common attributes 73 weight: float = 1.0 74 tags: Set[str] = field(default_factory=set) 75 76 # Source-specific metadata 77 metadata: Dict[str, Any] = field(default_factory=dict) 78 79 # Correlation data 80 correlated_events: List[str] = field(default_factory=list) # IDs of related events 81 82 def to_dict(self) -> Dict[str, Any]: 83 return { 84 'event_type': self.event_type.value, 85 'timestamp': self.timestamp.isoformat(), 86 'source_id': self.source_id, 87 'content': self.content[:500], 88 'weight': self.weight, 89 'tags': list(self.tags), 90 'metadata': self.metadata, 91 'correlated_count': len(self.correlated_events) 92 } 93 94 95 @dataclass 96 class UnifiedReport: 97 """Report from unified correlation analysis.""" 98 time_window_hours: float = 0.0 99 total_events: int = 0 100 101 # By source 102 events_by_source: Dict[str, int] = field(default_factory=dict) 103 104 # High-value findings 105 aha_moments: List[UnifiedEvent] = field(default_factory=list) 106 high_resonance: List[UnifiedEvent] = field(default_factory=list) 107 gravity_wells: List[Dict[str, Any]] = field(default_factory=list) 108 109 # Correlations 110 multi_source_correlations: List[CorrelatedMoment] = field(default_factory=list) 111 112 # Timeline 113 event_timeline: List[UnifiedEvent] = field(default_factory=list) 114 115 # Topics 116 unified_topic_map: Dict[str, List[str]] = field(default_factory=dict) 117 118 def to_dict(self) -> Dict[str, Any]: 119 return { 120 'time_window_hours': self.time_window_hours, 121 'total_events': self.total_events, 122 'events_by_source': self.events_by_source, 123 'aha_count': len(self.aha_moments), 124 'high_resonance_count': len(self.high_resonance), 125 'gravity_wells': len(self.gravity_wells), 126 'correlations': len(self.multi_source_correlations), 127 'topics': list(self.unified_topic_map.keys())[:20] 128 } 129 130 def to_markdown(self) -> str: 131 lines = [ 132 "# Unified Correlation Report", 133 f"*{self.total_events} events, {self.time_window_hours:.0f} hours*", 134 "", 135 ] 136 137 # Event distribution 138 lines.append("## Event Sources") 139 for source, count in sorted(self.events_by_source.items(), key=lambda x: -x[1]): 140 lines.append(f"- {source}: {count}") 141 lines.append("") 142 143 # Aha moments 144 if self.aha_moments: 145 lines.append("## Aha Moments") 146 for event in self.aha_moments[:10]: 147 lines.append(f"\n### {event.timestamp.strftime('%Y-%m-%d %H:%M')}") 148 lines.append(f"Weight: {event.weight:.2f} | Tags: {', '.join(list(event.tags)[:5])}") 149 lines.append(f"> {event.content[:200]}...") 150 lines.append("") 151 152 # High resonance 153 if self.high_resonance: 154 lines.append("## High Resonance Content") 155 for event in self.high_resonance[:10]: 156 lines.append(f"- [{event.event_type.value}] {event.content[:100]}...") 157 lines.append("") 158 159 # Gravity wells 160 if self.gravity_wells: 161 lines.append("## Gravity Wells") 162 for well in self.gravity_wells[:10]: 163 lines.append(f"- **{well.get('content', '')[:60]}...** ({well.get('total_edges', 0)} edges)") 164 165 return '\n'.join(lines) 166 167 168 class UnifiedCorrelator: 169 """ 170 Unifies all data sources into a single correlation layer. 171 172 Watches multiple sources and correlates events by time, content, and topics. 173 """ 174 175 def __init__(self): 176 # Sub-systems 177 self.graph = GraphIntegration() 178 self.multi_source = MultiSourceCorrelator() 179 self.artifact_analyzer = ArtifactAnalyzer() 180 181 # Unified event storage 182 self._events: Dict[str, UnifiedEvent] = {} 183 184 # Subscribers 185 self._subscribers: Dict[str, List[Callable]] = defaultdict(list) 186 187 # State 188 self._running = False 189 self._watch_thread: Optional[threading.Thread] = None 190 191 def start(self): 192 """Start watching all sources.""" 193 if self._running: 194 return 195 196 self._running = True 197 198 # Subscribe to graph events 199 self.graph.subscribe('all', self._on_graph_event) 200 self.graph.start_watching() 201 202 print("[UnifiedCorrelator] Started - watching all sources") 203 204 def stop(self): 205 """Stop watching.""" 206 self._running = False 207 self.graph.stop_watching() 208 print("[UnifiedCorrelator] Stopped") 209 210 def subscribe(self, event_type: str, callback: Callable[[UnifiedEvent], None]): 211 """Subscribe to unified events.""" 212 self._subscribers[event_type].append(callback) 213 214 def _emit(self, event: UnifiedEvent): 215 """Emit event to subscribers.""" 216 for callback in self._subscribers.get(event.event_type.value, []): 217 try: 218 callback(event) 219 except Exception as e: 220 print(f"[UnifiedCorrelator] Subscriber error: {e}") 221 222 for callback in self._subscribers.get('all', []): 223 try: 224 callback(event) 225 except Exception as e: 226 pass 227 228 def _on_graph_event(self, graph_event: GraphEvent): 229 """Handle event from graph integration.""" 230 # Convert to unified event 231 if graph_event.event_type == 'atom': 232 unified = UnifiedEvent( 233 event_type=UnifiedEventType.GRAPH_ATOM, 234 timestamp=graph_event.timestamp, 235 source_id=graph_event.data.get('uuid', ''), 236 content=graph_event.data.get('content', ''), 237 weight=graph_event.data.get('resonance', 0.5), 238 tags=set(graph_event.data.get('tags', [])), 239 metadata={ 240 'altitude': graph_event.data.get('altitude'), 241 'source_type': graph_event.data.get('source_type') 242 } 243 ) 244 elif graph_event.event_type == 'edge': 245 unified = UnifiedEvent( 246 event_type=UnifiedEventType.GRAPH_EDGE, 247 timestamp=graph_event.timestamp, 248 source_id=f"{graph_event.data.get('source')}->{graph_event.data.get('target')}", 249 content=f"Edge: {graph_event.data.get('type', 'relates_to')}", 250 weight=graph_event.data.get('strength', 0.5), 251 metadata=graph_event.data 252 ) 253 elif graph_event.event_type == 'gravity_well': 254 unified = UnifiedEvent( 255 event_type=UnifiedEventType.GRAVITY_WELL, 256 timestamp=graph_event.timestamp, 257 source_id=graph_event.data.get('uuid', ''), 258 content=graph_event.data.get('content', ''), 259 weight=2.0, # High weight for gravity wells 260 metadata={'edge_count': graph_event.data.get('edge_count', 0)} 261 ) 262 else: 263 return 264 265 self._events[unified.source_id] = unified 266 self._emit(unified) 267 268 def get_unified_report(self, hours_back: float = 24) -> UnifiedReport: 269 """ 270 Generate a unified correlation report across all sources. 271 """ 272 report = UnifiedReport(time_window_hours=hours_back) 273 cutoff = datetime.now() - timedelta(hours=hours_back) 274 275 # Load graph data 276 self.graph.load_range(days_back=max(1, int(hours_back / 24) + 1)) 277 278 # Load multi-source correlation 279 multi_report = self.multi_source.correlate(hours_back=hours_back) 280 281 # Load artifacts 282 artifact_report = self.artifact_analyzer.analyze(hours_back=hours_back) 283 284 # Count events by source 285 report.events_by_source = { 286 'graph_atoms': self.graph._state.atom_count, 287 'graph_edges': self.graph._state.edge_count, 288 'multi_source': multi_report.total_events, 289 'artifacts': artifact_report.artifacts_analyzed 290 } 291 report.total_events = sum(report.events_by_source.values()) 292 293 # High resonance from graph 294 for atom_data in self.graph.get_high_resonance_atoms(20): 295 report.high_resonance.append(UnifiedEvent( 296 event_type=UnifiedEventType.HIGH_RESONANCE, 297 timestamp=datetime.fromisoformat(atom_data.get('timestamp', datetime.now().isoformat()).replace('Z', '')), 298 source_id=atom_data.get('uuid', ''), 299 content=atom_data.get('content', ''), 300 weight=atom_data.get('resonance', 0.5), 301 tags=set(atom_data.get('tags', [])) 302 )) 303 304 # Gravity wells 305 report.gravity_wells = self.graph.get_gravity_wells() 306 307 # Aha moments from multi-source 308 for moment in multi_report.aha_moments: 309 event = UnifiedEvent( 310 event_type=UnifiedEventType.AHA_MOMENT, 311 timestamp=moment.timestamp, 312 source_id=f"aha-{moment.timestamp.isoformat()}", 313 content=self._summarize_moment(moment), 314 weight=moment.combined_weight, 315 tags=self._extract_topics(moment), 316 metadata={ 317 'claude_count': len(moment.claude_events), 318 'monologue_count': len(moment.monologue_events), 319 'browser_count': len(moment.browser_events), 320 'confidence': moment.confidence 321 } 322 ) 323 report.aha_moments.append(event) 324 325 # Correlations 326 report.multi_source_correlations = multi_report.aha_moments + multi_report.validation_seeking 327 328 # Build unified topic map 329 for topic, events in multi_report.topic_research_map.items(): 330 if topic not in report.unified_topic_map: 331 report.unified_topic_map[topic] = [] 332 report.unified_topic_map[topic].extend(events) 333 334 # Add graph topics 335 for alt, count in self.graph._state.atoms_by_altitude.items(): 336 report.unified_topic_map[f"altitude:{alt}"] = [f"{count} atoms"] 337 338 # Sort and trim 339 report.aha_moments.sort(key=lambda e: e.weight, reverse=True) 340 report.high_resonance.sort(key=lambda e: e.weight, reverse=True) 341 342 return report 343 344 def _summarize_moment(self, moment: CorrelatedMoment) -> str: 345 """Summarize a correlated moment.""" 346 parts = [] 347 if moment.claude_events: 348 parts.append(f"Claude: {moment.claude_events[0].content[:100]}") 349 if moment.monologue_events: 350 parts.append(f"Voice: {moment.monologue_events[0].content[:100]}") 351 if moment.browser_events and moment.browser_events[0].browser_title: 352 parts.append(f"Researching: {moment.browser_events[0].browser_title}") 353 return " | ".join(parts) if parts else "Correlated moment" 354 355 def _extract_topics(self, moment: CorrelatedMoment) -> Set[str]: 356 """Extract topics from a correlated moment.""" 357 topics = set() 358 for event in moment.claude_events + moment.monologue_events: 359 topics.update(event.tags) 360 return topics 361 362 def get_timeline( 363 self, 364 hours_back: float = 24, 365 event_types: Optional[List[str]] = None 366 ) -> List[UnifiedEvent]: 367 """ 368 Get a timeline of all events, optionally filtered by type. 369 """ 370 cutoff = datetime.now() - timedelta(hours=hours_back) 371 372 events = [] 373 374 # Add graph atoms 375 for atom_data in self.graph.get_recent_atoms(limit=200): 376 ts_str = atom_data.get('timestamp', '') 377 try: 378 ts = datetime.fromisoformat(ts_str.replace('Z', '')) 379 except: 380 ts = datetime.now() 381 382 if ts < cutoff: 383 continue 384 385 events.append(UnifiedEvent( 386 event_type=UnifiedEventType.GRAPH_ATOM, 387 timestamp=ts, 388 source_id=atom_data.get('uuid', ''), 389 content=atom_data.get('content', ''), 390 weight=atom_data.get('weight', 1.0), 391 tags=set(atom_data.get('tags', [])) 392 )) 393 394 # Filter by type if specified 395 if event_types: 396 type_set = {UnifiedEventType(t) for t in event_types if t in [e.value for e in UnifiedEventType]} 397 events = [e for e in events if e.event_type in type_set] 398 399 # Sort by timestamp 400 events.sort(key=lambda e: e.timestamp, reverse=True) 401 402 return events 403 404 405 # Singleton 406 _unified_correlator: Optional[UnifiedCorrelator] = None 407 408 409 def get_unified_correlator() -> UnifiedCorrelator: 410 """Get or create the unified correlator singleton.""" 411 global _unified_correlator 412 if _unified_correlator is None: 413 _unified_correlator = UnifiedCorrelator() 414 _unified_correlator.start() 415 return _unified_correlator 416 417 418 if __name__ == '__main__': 419 print("=== Unified Correlator ===\n") 420 421 correlator = UnifiedCorrelator() 422 report = correlator.get_unified_report(hours_back=24) 423 424 print(report.to_markdown())