multi_source_correlator.py
1 #!/usr/bin/env python3 2 """ 3 Multi-Source Correlator 4 5 Correlates events across multiple data sources to triangulate high-confidence insights: 6 - Claude sessions (text transcripts) 7 - Monologue voice transcripts (with intonation context) 8 - Comet browser history (what was being researched) 9 10 Time-aligned correlation reveals: 11 - What you were researching when you had an insight 12 - Voice emphasis that corroborates text-based signals 13 - Validation-seeking moments with full context 14 15 Usage: 16 correlator = MultiSourceCorrelator() 17 report = correlator.correlate(hours_back=168) 18 """ 19 20 import json 21 import re 22 import sqlite3 23 from dataclasses import dataclass, field 24 from datetime import datetime, timedelta 25 from pathlib import Path 26 from typing import Dict, List, Optional, Set, Tuple, Any 27 from enum import Enum 28 29 from .signal_words import SignalWordDetector, SignalDetection, detect_signals 30 31 32 class EventSource(Enum): 33 """Sources of events.""" 34 CLAUDE_SESSION = "claude_session" 35 MONOLOGUE = "monologue" 36 COMET_BROWSER = "comet_browser" 37 38 39 @dataclass 40 class SourceEvent: 41 """A single event from any source.""" 42 source: EventSource 43 timestamp: datetime 44 content: str 45 46 # Source-specific metadata 47 session_id: Optional[str] = None # Claude session 48 browser_url: Optional[str] = None # Comet 49 browser_title: Optional[str] = None # Comet 50 browser_context: Optional[str] = None # Monologue's [Context: ...] tag 51 speaker: Optional[str] = None 52 53 # Signal analysis 54 signals: Optional[SignalDetection] = None 55 weight: float = 1.0 56 tags: Set[str] = field(default_factory=set) 57 58 def __post_init__(self): 59 if self.tags is None: 60 self.tags = set() 61 62 63 @dataclass 64 class CorrelatedMoment: 65 """A moment where multiple sources align.""" 66 timestamp: datetime 67 window_seconds: float # How wide the correlation window was 68 69 # Events from each source within the window 70 claude_events: List[SourceEvent] = field(default_factory=list) 71 monologue_events: List[SourceEvent] = field(default_factory=list) 72 browser_events: List[SourceEvent] = field(default_factory=list) 73 74 # Computed properties 75 combined_weight: float = 1.0 76 confidence: float = 0.0 # 0-1 based on source count and signal alignment 77 topics: Set[str] = field(default_factory=set) 78 79 # Flags 80 is_aha_moment: bool = False 81 is_validation_seeking: bool = False 82 is_research_corroborated: bool = False 83 84 @property 85 def source_count(self) -> int: 86 """Number of sources contributing to this moment.""" 87 count = 0 88 if self.claude_events: 89 count += 1 90 if self.monologue_events: 91 count += 1 92 if self.browser_events: 93 count += 1 94 return count 95 96 def get_browser_context(self) -> Optional[str]: 97 """Get what was being researched during this moment.""" 98 if self.browser_events: 99 titles = [e.browser_title for e in self.browser_events if e.browser_title] 100 if titles: 101 return " | ".join(titles[:3]) 102 return None 103 104 def to_dict(self) -> Dict[str, Any]: 105 return { 106 'timestamp': self.timestamp.isoformat(), 107 'source_count': self.source_count, 108 'combined_weight': self.combined_weight, 109 'confidence': self.confidence, 110 'topics': list(self.topics), 111 'is_aha_moment': self.is_aha_moment, 112 'is_validation_seeking': self.is_validation_seeking, 113 'browser_context': self.get_browser_context(), 114 'claude_content': [e.content[:200] for e in self.claude_events[:2]], 115 'monologue_content': [e.content[:200] for e in self.monologue_events[:2]] 116 } 117 118 119 @dataclass 120 class CorrelationReport: 121 """Full report from multi-source correlation.""" 122 time_window_hours: float 123 sources_found: Dict[str, int] # source -> event count 124 125 # High-value discoveries 126 aha_moments: List[CorrelatedMoment] = field(default_factory=list) 127 validation_seeking: List[CorrelatedMoment] = field(default_factory=list) 128 research_corroborated: List[CorrelatedMoment] = field(default_factory=list) 129 130 # All correlated moments above threshold 131 high_confidence_moments: List[CorrelatedMoment] = field(default_factory=list) 132 133 # Topic analysis 134 topic_research_map: Dict[str, List[str]] = field(default_factory=dict) # topic -> URLs researched 135 136 # Statistics 137 total_events: int = 0 138 correlated_events: int = 0 139 multi_source_moments: int = 0 140 141 def to_markdown(self) -> str: 142 lines = [ 143 "# Multi-Source Correlation Report", 144 f"*{self.time_window_hours:.0f} hours, {self.total_events} events, {self.multi_source_moments} correlated moments*", 145 "", 146 ] 147 148 # Sources summary 149 lines.append("## Sources") 150 for source, count in self.sources_found.items(): 151 lines.append(f"- **{source}**: {count} events") 152 lines.append("") 153 154 # Aha moments 155 if self.aha_moments: 156 lines.append("## Aha Moments (Multi-Source Confirmed)") 157 for moment in self.aha_moments[:10]: 158 lines.append(f"\n### {moment.timestamp.strftime('%Y-%m-%d %H:%M')}") 159 lines.append(f"*Confidence: {moment.confidence:.0%}, Sources: {moment.source_count}*") 160 if moment.get_browser_context(): 161 lines.append(f"**Researching:** {moment.get_browser_context()}") 162 for event in moment.claude_events[:1]: 163 lines.append(f"\n> {event.content[:300]}...") 164 for event in moment.monologue_events[:1]: 165 lines.append(f"\n*Voice:* {event.content[:200]}...") 166 lines.append("") 167 168 # Validation seeking 169 if self.validation_seeking: 170 lines.append("## Validation-Seeking Moments") 171 lines.append("*Times you asked 'what do you think?' with full context*") 172 for moment in self.validation_seeking[:10]: 173 lines.append(f"\n### {moment.timestamp.strftime('%Y-%m-%d %H:%M')}") 174 if moment.get_browser_context(): 175 lines.append(f"**Context:** {moment.get_browser_context()}") 176 for event in moment.claude_events[:1]: 177 lines.append(f"> {event.content[:200]}...") 178 lines.append("") 179 180 # Research-corroborated insights 181 if self.research_corroborated: 182 lines.append("## Research-Corroborated Insights") 183 lines.append("*Insights where browser history shows supporting research*") 184 for moment in self.research_corroborated[:10]: 185 lines.append(f"\n### {moment.timestamp.strftime('%Y-%m-%d %H:%M')}") 186 lines.append(f"**Researched:** {moment.get_browser_context()}") 187 for event in (moment.claude_events or moment.monologue_events)[:1]: 188 lines.append(f"> {event.content[:200]}...") 189 lines.append("") 190 191 # Topic-research map 192 if self.topic_research_map: 193 lines.append("## Topics and Related Research") 194 for topic, urls in sorted(self.topic_research_map.items(), 195 key=lambda x: -len(x[1]))[:15]: 196 lines.append(f"\n### {topic}") 197 for url in urls[:5]: 198 lines.append(f"- {url}") 199 200 return "\n".join(lines) 201 202 203 class MultiSourceCorrelator: 204 """ 205 Correlates events across Claude sessions, Monologue, and Comet browser. 206 207 The key insight: when you have an "aha" moment that appears in Claude text, 208 is emphasized in your voice (Monologue), AND was preceded by specific research 209 (Comet), that's a high-confidence validated insight. 210 """ 211 212 # Default paths 213 DEFAULT_SESSIONS_DIR = Path.home() / 'repos/Sovereign_Estate/daily/sessions' 214 DEFAULT_MONOLOGUE_DIR = Path.home() / 'Library/Mobile Documents/iCloud~md~obsidian/Documents/Sovereign_Estate/research/cognitive-capture/monologue-transcripts' 215 DEFAULT_COMET_HISTORY = Path.home() / 'Library/Application Support/Comet/Default/History' 216 217 # Correlation window (seconds before/after to consider related) 218 CORRELATION_WINDOW = 300 # 5 minutes 219 220 def __init__( 221 self, 222 sessions_dir: Optional[str] = None, 223 monologue_dir: Optional[str] = None, 224 comet_history_path: Optional[str] = None 225 ): 226 self.sessions_dir = Path(sessions_dir) if sessions_dir else self.DEFAULT_SESSIONS_DIR 227 self.monologue_dir = Path(monologue_dir) if monologue_dir else self.DEFAULT_MONOLOGUE_DIR 228 self.comet_history = Path(comet_history_path) if comet_history_path else self.DEFAULT_COMET_HISTORY 229 230 self.signal_detector = SignalWordDetector() 231 232 # Internal state 233 self._events: List[SourceEvent] = [] 234 self._events_by_time: Dict[datetime, List[SourceEvent]] = {} 235 236 def correlate( 237 self, 238 hours_back: float = 168, 239 correlation_window: float = None, 240 min_weight: float = 1.0 241 ) -> CorrelationReport: 242 """ 243 Run multi-source correlation. 244 245 Args: 246 hours_back: How far back to look 247 correlation_window: Seconds to consider events related (default 300) 248 min_weight: Minimum signal weight to include 249 250 Returns: 251 CorrelationReport with findings 252 """ 253 window = correlation_window or self.CORRELATION_WINDOW 254 cutoff = datetime.now() - timedelta(hours=hours_back) 255 256 # Collect events from all sources 257 self._events = [] 258 sources_found = {} 259 260 # Claude sessions 261 claude_events = self._load_claude_sessions(cutoff) 262 self._events.extend(claude_events) 263 sources_found['claude_sessions'] = len(claude_events) 264 265 # Monologue transcripts 266 monologue_events = self._load_monologue_transcripts(cutoff) 267 self._events.extend(monologue_events) 268 sources_found['monologue'] = len(monologue_events) 269 270 # Comet browser history 271 browser_events = self._load_comet_history(cutoff) 272 self._events.extend(browser_events) 273 sources_found['comet_browser'] = len(browser_events) 274 275 # Sort by timestamp 276 self._events.sort(key=lambda e: e.timestamp) 277 278 # Build time index 279 self._build_time_index() 280 281 # Find correlated moments 282 moments = self._find_correlated_moments(window, min_weight) 283 284 # Classify moments 285 aha_moments = [] 286 validation_seeking = [] 287 research_corroborated = [] 288 high_confidence = [] 289 290 for moment in moments: 291 self._classify_moment(moment) 292 293 if moment.is_aha_moment: 294 aha_moments.append(moment) 295 if moment.is_validation_seeking: 296 validation_seeking.append(moment) 297 if moment.is_research_corroborated: 298 research_corroborated.append(moment) 299 if moment.confidence >= 0.5: 300 high_confidence.append(moment) 301 302 # Build topic-research map 303 topic_research_map = self._build_topic_research_map(moments) 304 305 # Build report 306 return CorrelationReport( 307 time_window_hours=hours_back, 308 sources_found=sources_found, 309 aha_moments=sorted(aha_moments, key=lambda m: -m.combined_weight)[:20], 310 validation_seeking=sorted(validation_seeking, key=lambda m: -m.combined_weight)[:20], 311 research_corroborated=sorted(research_corroborated, key=lambda m: -m.combined_weight)[:20], 312 high_confidence_moments=sorted(high_confidence, key=lambda m: -m.confidence)[:50], 313 topic_research_map=topic_research_map, 314 total_events=len(self._events), 315 correlated_events=sum(m.source_count > 1 for m in moments), 316 multi_source_moments=len([m for m in moments if m.source_count > 1]) 317 ) 318 319 def _load_claude_sessions(self, cutoff: datetime) -> List[SourceEvent]: 320 """Load events from Claude session transcripts (markdown format).""" 321 events = [] 322 323 if not self.sessions_dir.exists(): 324 return events 325 326 # Session files are markdown: 2026-01-13-7be54298-live.md 327 for session_file in self.sessions_dir.glob('*-live.md'): 328 try: 329 # Check file modification time first 330 mtime = datetime.fromtimestamp(session_file.stat().st_mtime) 331 if mtime < cutoff: 332 continue 333 334 # Extract date from filename: 2026-01-13-xxxxx-live.md 335 filename = session_file.stem 336 date_match = re.match(r'(\d{4}-\d{2}-\d{2})', filename) 337 if not date_match: 338 continue 339 base_date = datetime.strptime(date_match.group(1), '%Y-%m-%d') 340 341 content = session_file.read_text() 342 session_id = filename 343 344 # Parse markdown format: **[HH:MM:SS] Speaker** 345 lines = content.split('\n') 346 current_speaker = None 347 current_content = [] 348 current_time = None 349 350 for line in lines: 351 # Check for speaker header pattern 352 speaker_match = re.match( 353 r'^\s*-?\s*\*\*\[(\d{2}:\d{2}:\d{2})\]\s+(Rick|Claude)\*\*', 354 line 355 ) 356 357 if speaker_match: 358 # Save previous atom 359 if current_speaker and current_content: 360 text = '\n'.join(current_content).strip() 361 if text and len(text) > 10: 362 # Combine date + time 363 timestamp = base_date.replace( 364 hour=current_time.hour, 365 minute=current_time.minute, 366 second=current_time.second 367 ) 368 369 signals = detect_signals(text) 370 event = SourceEvent( 371 source=EventSource.CLAUDE_SESSION, 372 timestamp=timestamp, 373 content=text, 374 session_id=session_id, 375 speaker=current_speaker, 376 signals=signals, 377 weight=signals.weight_modifier, 378 tags=signals.suggested_tags 379 ) 380 events.append(event) 381 382 # Start new atom 383 time_str = speaker_match.group(1) 384 current_time = datetime.strptime(time_str, '%H:%M:%S').time() 385 current_speaker = speaker_match.group(2) 386 current_content = [] 387 388 elif current_speaker: 389 # Accumulate content 390 # Strip leading list markers 391 clean_line = re.sub(r'^\s*-\s*', '', line) 392 if clean_line.strip(): 393 current_content.append(clean_line) 394 395 # Don't forget last atom 396 if current_speaker and current_content: 397 text = '\n'.join(current_content).strip() 398 if text and len(text) > 10 and current_time: 399 timestamp = base_date.replace( 400 hour=current_time.hour, 401 minute=current_time.minute, 402 second=current_time.second 403 ) 404 signals = detect_signals(text) 405 event = SourceEvent( 406 source=EventSource.CLAUDE_SESSION, 407 timestamp=timestamp, 408 content=text, 409 session_id=session_id, 410 speaker=current_speaker, 411 signals=signals, 412 weight=signals.weight_modifier, 413 tags=signals.suggested_tags 414 ) 415 events.append(event) 416 417 except Exception as e: 418 continue 419 420 return events 421 422 def _load_monologue_transcripts(self, cutoff: datetime) -> List[SourceEvent]: 423 """Load events from Monologue voice transcripts.""" 424 events = [] 425 426 if not self.monologue_dir.exists(): 427 return events 428 429 # Pattern to match timestamp headers 430 timestamp_pattern = re.compile(r'###\s+(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})') 431 context_pattern = re.compile(r'\[Context:\s*([^\]]+)\]') 432 433 for md_file in self.monologue_dir.glob('*.md'): 434 try: 435 content = md_file.read_text() 436 437 # Split by timestamp headers 438 sections = re.split(r'(###\s+\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})', content) 439 440 current_timestamp = None 441 for i, section in enumerate(sections): 442 # Check if this is a timestamp header 443 ts_match = timestamp_pattern.match(section) 444 if ts_match: 445 try: 446 current_timestamp = datetime.fromisoformat(ts_match.group(1)) 447 except: 448 current_timestamp = None 449 continue 450 451 if current_timestamp is None or current_timestamp < cutoff: 452 continue 453 454 # This is content after a timestamp 455 text = section.strip() 456 if not text or len(text) < 10: 457 continue 458 459 # Extract browser context if present 460 browser_context = None 461 ctx_match = context_pattern.search(text) 462 if ctx_match: 463 browser_context = ctx_match.group(1).strip() 464 # Remove context tag from content 465 text = context_pattern.sub('', text).strip() 466 467 if not text: 468 continue 469 470 # Analyze signals 471 signals = detect_signals(text) 472 473 event = SourceEvent( 474 source=EventSource.MONOLOGUE, 475 timestamp=current_timestamp, 476 content=text, 477 browser_context=browser_context, 478 speaker='Rick', # Monologue is always user 479 signals=signals, 480 weight=signals.weight_modifier, 481 tags=signals.suggested_tags 482 ) 483 events.append(event) 484 485 except Exception as e: 486 continue 487 488 return events 489 490 def _load_comet_history(self, cutoff: datetime) -> List[SourceEvent]: 491 """Load browser history from Comet.""" 492 events = [] 493 494 if not self.comet_history.exists(): 495 return events 496 497 try: 498 # Connect to SQLite (read-only) 499 conn = sqlite3.connect(f'file:{self.comet_history}?mode=ro', uri=True) 500 cursor = conn.cursor() 501 502 # Chrome/Chromium stores time as microseconds since 1601-01-01 503 # Convert cutoff to Chrome time 504 chrome_epoch = datetime(1601, 1, 1) 505 cutoff_chrome = int((cutoff - chrome_epoch).total_seconds() * 1000000) 506 507 # Query history 508 cursor.execute(""" 509 SELECT 510 last_visit_time, 511 url, 512 title 513 FROM urls 514 WHERE last_visit_time > ? 515 ORDER BY last_visit_time 516 """, (cutoff_chrome,)) 517 518 for row in cursor.fetchall(): 519 chrome_time, url, title = row 520 521 # Convert Chrome time to datetime 522 try: 523 timestamp = chrome_epoch + timedelta(microseconds=chrome_time) 524 except: 525 continue 526 527 # Skip internal URLs 528 if url.startswith('chrome://') or url.startswith('chrome-extension://'): 529 continue 530 531 # Create event (browser events have lower base weight) 532 event = SourceEvent( 533 source=EventSource.COMET_BROWSER, 534 timestamp=timestamp, 535 content=title or url, 536 browser_url=url, 537 browser_title=title, 538 weight=0.5 # Base weight for browser events 539 ) 540 541 # Boost weight for research-heavy sites 542 if any(domain in url for domain in [ 543 'arxiv.org', 'scholar.google', 'wikipedia.org', 544 'github.com', 'stackoverflow.com', 'medium.com', 545 'nature.com', 'sciencedirect.com' 546 ]): 547 event.weight = 1.0 548 event.tags.add('research') 549 550 events.append(event) 551 552 conn.close() 553 554 except Exception as e: 555 pass 556 557 return events 558 559 def _build_time_index(self): 560 """Index events by time for efficient lookup.""" 561 self._events_by_time = {} 562 for event in self._events: 563 # Round to minute for indexing 564 key = event.timestamp.replace(second=0, microsecond=0) 565 if key not in self._events_by_time: 566 self._events_by_time[key] = [] 567 self._events_by_time[key].append(event) 568 569 def _find_correlated_moments( 570 self, 571 window: float, 572 min_weight: float 573 ) -> List[CorrelatedMoment]: 574 """Find moments where multiple sources correlate.""" 575 moments = [] 576 processed_times = set() 577 578 # Look for high-weight events as anchors 579 for event in self._events: 580 if event.weight < min_weight: 581 continue 582 583 # Skip if we've already processed this time window 584 window_key = event.timestamp.replace(second=0, microsecond=0) 585 if window_key in processed_times: 586 continue 587 processed_times.add(window_key) 588 589 # Find all events within the correlation window 590 window_start = event.timestamp - timedelta(seconds=window) 591 window_end = event.timestamp + timedelta(seconds=window) 592 593 related_events = [ 594 e for e in self._events 595 if window_start <= e.timestamp <= window_end 596 ] 597 598 if not related_events: 599 continue 600 601 # Group by source 602 moment = CorrelatedMoment( 603 timestamp=event.timestamp, 604 window_seconds=window 605 ) 606 607 for e in related_events: 608 if e.source == EventSource.CLAUDE_SESSION: 609 moment.claude_events.append(e) 610 elif e.source == EventSource.MONOLOGUE: 611 moment.monologue_events.append(e) 612 elif e.source == EventSource.COMET_BROWSER: 613 moment.browser_events.append(e) 614 615 # Collect topics 616 moment.topics.update(e.tags) 617 618 # Calculate combined weight 619 all_events = moment.claude_events + moment.monologue_events + moment.browser_events 620 if all_events: 621 moment.combined_weight = max(e.weight for e in all_events) 622 623 # Boost for multi-source 624 moment.combined_weight *= (1 + 0.3 * (moment.source_count - 1)) 625 626 # Calculate confidence based on source diversity and signal alignment 627 moment.confidence = self._calculate_confidence(moment) 628 629 moments.append(moment) 630 631 return moments 632 633 def _calculate_confidence(self, moment: CorrelatedMoment) -> float: 634 """ 635 Calculate confidence score for a correlated moment. 636 637 Factors: 638 - Number of sources (more = higher confidence) 639 - Signal word alignment across sources 640 - Research relevance 641 """ 642 confidence = 0.0 643 644 # Base confidence from source count 645 confidence += moment.source_count * 0.25 646 647 # Check for signal alignment across sources 648 all_tags = set() 649 for e in moment.claude_events: 650 if e.signals: 651 all_tags.update(e.signals.suggested_tags) 652 for e in moment.monologue_events: 653 if e.signals: 654 all_tags.update(e.signals.suggested_tags) 655 656 # High-value tags boost confidence 657 if 'aha' in all_tags or 'insight' in all_tags: 658 confidence += 0.15 659 if 'principle' in all_tags: 660 confidence += 0.1 661 if 'decision' in all_tags: 662 confidence += 0.1 663 if 'needs_validation' in all_tags: 664 confidence += 0.1 665 666 # Research correlation boosts confidence 667 if moment.browser_events and (moment.claude_events or moment.monologue_events): 668 # Check if browser content relates to topic 669 browser_text = ' '.join(e.content for e in moment.browser_events).lower() 670 topic_text = ' '.join(moment.topics).lower() 671 672 # Simple overlap check 673 topic_words = set(topic_text.split()) 674 browser_words = set(browser_text.split()) 675 if topic_words & browser_words: 676 confidence += 0.15 677 678 return min(confidence, 1.0) 679 680 def _classify_moment(self, moment: CorrelatedMoment): 681 """Classify a moment into categories.""" 682 # Check for aha moment 683 for e in moment.claude_events + moment.monologue_events: 684 if e.signals and ('aha' in e.signals.suggested_tags or 'insight' in e.signals.suggested_tags): 685 if moment.source_count > 1: # Requires multi-source confirmation 686 moment.is_aha_moment = True 687 break 688 689 # Check for validation seeking 690 for e in moment.claude_events + moment.monologue_events: 691 if e.signals and 'needs_validation' in e.signals.suggested_tags: 692 moment.is_validation_seeking = True 693 break 694 695 # Check for research corroboration 696 if moment.browser_events and moment.combined_weight > 1.2: 697 # High-weight moment with browser context 698 moment.is_research_corroborated = True 699 700 def _build_topic_research_map( 701 self, 702 moments: List[CorrelatedMoment] 703 ) -> Dict[str, List[str]]: 704 """Build a map of topics to related research URLs.""" 705 topic_urls: Dict[str, List[str]] = {} 706 707 for moment in moments: 708 if not moment.browser_events: 709 continue 710 711 urls = [e.browser_url for e in moment.browser_events if e.browser_url] 712 713 for topic in moment.topics: 714 if topic not in topic_urls: 715 topic_urls[topic] = [] 716 topic_urls[topic].extend(urls) 717 718 # Deduplicate 719 for topic in topic_urls: 720 topic_urls[topic] = list(dict.fromkeys(topic_urls[topic]))[:10] 721 722 return topic_urls 723 724 725 # Convenience function 726 def correlate_sources(hours_back: float = 168) -> CorrelationReport: 727 """Quick correlation across all sources.""" 728 correlator = MultiSourceCorrelator() 729 return correlator.correlate(hours_back=hours_back) 730 731 732 if __name__ == '__main__': 733 print("=== Multi-Source Correlator ===\n") 734 735 correlator = MultiSourceCorrelator() 736 report = correlator.correlate(hours_back=168) 737 738 print(report.to_markdown())