transcript_miner.py
1 """ 2 Transcript Miner - Extract insights from historical conversations. 3 4 Your 7 days of transcripts are rich with: 5 1. Signal words showing what you value 6 2. Decision patterns 7 3. Topic trajectories 8 4. Validation-seeking moments (potential insights to mine) 9 5. Language patterns for personalization 10 11 This module: 12 - Parses session transcripts 13 - Extracts and categorizes signal words 14 - Identifies unmined insights (high-weight items not yet integrated) 15 - Builds topic maps showing conceptual relationships 16 - Learns your personal vocabulary/patterns 17 """ 18 19 from dataclasses import dataclass, field 20 from datetime import datetime, timedelta 21 from pathlib import Path 22 from typing import Optional, List, Dict, Tuple, Set, Any 23 from collections import defaultdict 24 import re 25 import json 26 27 from .signal_words import SignalWordDetector, SignalDetection, detect_signals 28 29 30 @dataclass 31 class TranscriptAtom: 32 """A single message/atom from a transcript.""" 33 content: str 34 speaker: str # 'Rick', 'Claude', 'System' 35 timestamp: Optional[datetime] = None 36 session_id: str = "" 37 38 # Signal analysis 39 signals: Optional[SignalDetection] = None 40 weight: float = 1.0 41 tags: Set[str] = field(default_factory=set) 42 43 # Derived 44 is_insight: bool = False # High weight + validation-seeking 45 is_decision: bool = False 46 is_principle: bool = False 47 needs_integration: bool = False # Not yet mined 48 49 50 @dataclass 51 class TopicCluster: 52 """A cluster of related content around a topic.""" 53 topic: str 54 atoms: List[TranscriptAtom] = field(default_factory=list) 55 total_weight: float = 0.0 56 decision_count: int = 0 57 insight_count: int = 0 58 principle_count: int = 0 59 60 # Related topics (co-occurrence) 61 related_topics: Dict[str, int] = field(default_factory=dict) 62 63 64 @dataclass 65 class MiningReport: 66 """Results from mining transcripts.""" 67 sessions_analyzed: int = 0 68 atoms_processed: int = 0 69 time_window_hours: float = 0.0 70 71 # High-value items 72 unmined_insights: List[TranscriptAtom] = field(default_factory=list) 73 principles: List[TranscriptAtom] = field(default_factory=list) 74 decisions: List[TranscriptAtom] = field(default_factory=list) 75 76 # Patterns 77 signal_word_counts: Dict[str, int] = field(default_factory=dict) 78 topic_clusters: Dict[str, TopicCluster] = field(default_factory=dict) 79 80 # Operator-specific 81 your_common_signals: List[Tuple[str, int]] = field(default_factory=list) 82 validation_seeking_rate: float = 0.0 # How often you ask "what do you think?" 83 84 def to_markdown(self) -> str: 85 """Generate markdown report.""" 86 lines = [ 87 "# Transcript Mining Report", 88 f"*{self.sessions_analyzed} sessions, {self.atoms_processed} atoms, {self.time_window_hours:.1f} hours*", 89 "", 90 ] 91 92 # Unmined insights 93 if self.unmined_insights: 94 lines.append("## Unmined Insights") 95 lines.append("*High-weight items not yet integrated:*") 96 lines.append("") 97 for atom in self.unmined_insights[:10]: 98 tags = ', '.join(atom.tags) if atom.tags else 'none' 99 lines.append(f"- [{atom.weight:.2f}] {atom.content[:100]}... (tags: {tags})") 100 lines.append("") 101 102 # Principles 103 if self.principles: 104 lines.append("## Principles Identified") 105 for atom in self.principles[:10]: 106 lines.append(f"- {atom.content[:150]}...") 107 lines.append("") 108 109 # Decisions 110 if self.decisions: 111 lines.append("## Decisions Made") 112 for atom in self.decisions[:10]: 113 lines.append(f"- {atom.content[:100]}...") 114 lines.append("") 115 116 # Topic clusters 117 if self.topic_clusters: 118 lines.append("## Topic Clusters") 119 sorted_clusters = sorted( 120 self.topic_clusters.values(), 121 key=lambda c: c.total_weight, 122 reverse=True 123 ) 124 for cluster in sorted_clusters[:10]: 125 lines.append(f"### {cluster.topic}") 126 lines.append(f"Weight: {cluster.total_weight:.1f}, Insights: {cluster.insight_count}, Decisions: {cluster.decision_count}") 127 if cluster.related_topics: 128 related = sorted(cluster.related_topics.items(), key=lambda x: -x[1])[:5] 129 lines.append(f"Related: {', '.join(t for t, c in related)}") 130 lines.append("") 131 132 # Signal patterns 133 if self.your_common_signals: 134 lines.append("## Your Signal Word Patterns") 135 for signal, count in self.your_common_signals[:15]: 136 lines.append(f"- {signal}: {count} times") 137 lines.append("") 138 lines.append(f"Validation-seeking rate: {self.validation_seeking_rate:.1%}") 139 140 return '\n'.join(lines) 141 142 143 class TranscriptMiner: 144 """ 145 Mines session transcripts for insights, patterns, and unmined content. 146 147 Usage: 148 miner = TranscriptMiner(sessions_dir) 149 report = miner.mine(hours_back=168) # 7 days 150 151 # Get unmined insights 152 for insight in report.unmined_insights: 153 print(f"[{insight.weight}] {insight.content}") 154 155 # Export topic map 156 miner.export_topic_map("topic_map.json") 157 """ 158 159 # Patterns to extract atoms from markdown session files 160 ATOM_PATTERNS = [ 161 # Standard speaker pattern: **[HH:MM:SS] Speaker** 162 (r'\*\*\[(\d{2}:\d{2}:\d{2})\]\s+(Rick|Claude)\*\*\s*\n\s*-\s+(.+?)(?=\n\n|\n\*\*\[|\Z)', 163 lambda m: (m.group(2), m.group(3), m.group(1))), 164 ] 165 166 # Topic extraction patterns 167 TOPIC_PATTERNS = [ 168 r'#(\w+)', # Hashtags 169 r'\b([a-z]+_[a-z_]+)\b', # snake_case (code terms) 170 r'\b(attention|context|stream|session|coherence|membrane|aha)\b', # Domain terms 171 ] 172 173 def __init__(self, sessions_dir: str): 174 self.sessions_dir = Path(sessions_dir) 175 self.signal_detector = SignalWordDetector() 176 177 # Accumulated state 178 self._atoms: List[TranscriptAtom] = [] 179 self._topic_clusters: Dict[str, TopicCluster] = defaultdict( 180 lambda: TopicCluster(topic="") 181 ) 182 self._signal_counts: Dict[str, int] = defaultdict(int) 183 184 def mine( 185 self, 186 hours_back: float = 168, # 7 days 187 speaker_filter: str = None, # 'Rick', 'Claude', or None for all 188 min_weight: float = 0.0 189 ) -> MiningReport: 190 """ 191 Mine transcripts from the specified time window. 192 193 Args: 194 hours_back: How far back to look 195 speaker_filter: Only analyze this speaker's messages 196 min_weight: Only include atoms above this weight 197 198 Returns: 199 MiningReport with insights, patterns, and topic clusters 200 """ 201 cutoff = datetime.now() - timedelta(hours=hours_back) 202 203 # Find session files 204 session_files = list(self.sessions_dir.glob("*-live.md")) 205 sessions_analyzed = 0 206 207 for session_file in session_files: 208 # Check file modification time 209 mtime = datetime.fromtimestamp(session_file.stat().st_mtime) 210 if mtime < cutoff: 211 continue 212 213 sessions_analyzed += 1 214 session_id = session_file.stem 215 216 # Extract atoms 217 content = session_file.read_text() 218 atoms = self._extract_atoms(content, session_id) 219 220 for atom in atoms: 221 if speaker_filter and atom.speaker != speaker_filter: 222 continue 223 if atom.weight < min_weight: 224 continue 225 226 self._atoms.append(atom) 227 self._process_atom_for_clusters(atom) 228 229 return self._build_report(sessions_analyzed, hours_back) 230 231 def _extract_atoms(self, content: str, session_id: str) -> List[TranscriptAtom]: 232 """Extract atoms from session markdown content.""" 233 atoms = [] 234 235 # Simple line-based extraction (more robust than complex regex) 236 lines = content.split('\n') 237 current_speaker = None 238 current_content = [] 239 current_timestamp = None 240 241 for line in lines: 242 # Check for speaker header 243 speaker_match = re.match(r'^\s*-\s*\*\*\[(\d{2}:\d{2}:\d{2})\]\s+(Rick|Claude)\*\*', line) 244 if speaker_match: 245 # Save previous atom 246 if current_speaker and current_content: 247 atom = self._create_atom( 248 '\n'.join(current_content), 249 current_speaker, 250 current_timestamp, 251 session_id 252 ) 253 if atom: 254 atoms.append(atom) 255 256 current_timestamp = speaker_match.group(1) 257 current_speaker = speaker_match.group(2) 258 current_content = [] 259 elif current_speaker: 260 # Content line - skip metadata/tool results 261 stripped = line.strip() 262 if stripped.startswith('-') and not stripped.startswith('- {'): 263 # Actual content bullet 264 text = re.sub(r'^-\s*', '', stripped) 265 # Skip metadata comments 266 if not text.startswith('<!--') and not text.startswith('{\'type\''): 267 current_content.append(text) 268 269 # Don't forget last atom 270 if current_speaker and current_content: 271 atom = self._create_atom( 272 '\n'.join(current_content), 273 current_speaker, 274 current_timestamp, 275 session_id 276 ) 277 if atom: 278 atoms.append(atom) 279 280 return atoms 281 282 def _create_atom( 283 self, 284 content: str, 285 speaker: str, 286 timestamp_str: str, 287 session_id: str 288 ) -> Optional[TranscriptAtom]: 289 """Create a TranscriptAtom with signal analysis.""" 290 if not content or len(content) < 10: 291 return None 292 293 # Clean content 294 content = re.sub(r'\^[a-f0-9]+', '', content) # Remove block refs 295 content = re.sub(r'<!--.*?-->', '', content, flags=re.DOTALL) # Remove comments 296 content = content.strip() 297 298 if not content: 299 return None 300 301 # Analyze signals 302 signals = self.signal_detector.detect(content) 303 304 # Track signal word usage 305 for signal, match in signals.signals_found: 306 self._signal_counts[signal.pattern] += 1 307 308 atom = TranscriptAtom( 309 content=content, 310 speaker=speaker, 311 session_id=session_id, 312 signals=signals, 313 weight=signals.weight_modifier, 314 tags=signals.suggested_tags.copy() 315 ) 316 317 # Categorize 318 atom.is_insight = signals.weight_modifier > 1.3 and "needs_validation" in signals.suggested_tags 319 atom.is_decision = signals.is_decision 320 atom.is_principle = "principle" in signals.suggested_tags 321 atom.needs_integration = signals.weight_modifier > 1.2 and speaker == "Rick" 322 323 return atom 324 325 def _process_atom_for_clusters(self, atom: TranscriptAtom) -> None: 326 """Add atom to relevant topic clusters.""" 327 topics = self._extract_topics(atom.content) 328 329 for topic in topics: 330 if topic not in self._topic_clusters: 331 self._topic_clusters[topic] = TopicCluster(topic=topic) 332 333 cluster = self._topic_clusters[topic] 334 cluster.atoms.append(atom) 335 cluster.total_weight += atom.weight 336 337 if atom.is_decision: 338 cluster.decision_count += 1 339 if atom.is_insight: 340 cluster.insight_count += 1 341 if atom.is_principle: 342 cluster.principle_count += 1 343 344 # Track co-occurring topics 345 for other_topic in topics: 346 if other_topic != topic: 347 cluster.related_topics[other_topic] = cluster.related_topics.get(other_topic, 0) + 1 348 349 def _extract_topics(self, content: str) -> Set[str]: 350 """Extract topics from content.""" 351 topics = set() 352 content_lower = content.lower() 353 354 for pattern in self.TOPIC_PATTERNS: 355 matches = re.findall(pattern, content_lower) 356 topics.update(matches) 357 358 # Filter out common words 359 stop_words = {'the', 'and', 'for', 'this', 'that', 'with', 'from', 'have', 'been'} 360 topics = {t for t in topics if t not in stop_words and len(t) > 2} 361 362 return topics 363 364 def _build_report(self, sessions_analyzed: int, hours_back: float) -> MiningReport: 365 """Build the mining report.""" 366 report = MiningReport( 367 sessions_analyzed=sessions_analyzed, 368 atoms_processed=len(self._atoms), 369 time_window_hours=hours_back 370 ) 371 372 # Collect high-value items 373 for atom in self._atoms: 374 if atom.needs_integration and atom.weight > 1.2: 375 report.unmined_insights.append(atom) 376 if atom.is_principle: 377 report.principles.append(atom) 378 if atom.is_decision: 379 report.decisions.append(atom) 380 381 # Sort by weight 382 report.unmined_insights.sort(key=lambda a: a.weight, reverse=True) 383 report.principles.sort(key=lambda a: a.weight, reverse=True) 384 report.decisions.sort(key=lambda a: a.weight, reverse=True) 385 386 # Topic clusters 387 report.topic_clusters = dict(self._topic_clusters) 388 389 # Signal patterns 390 report.signal_word_counts = dict(self._signal_counts) 391 report.your_common_signals = sorted( 392 self._signal_counts.items(), 393 key=lambda x: x[1], 394 reverse=True 395 ) 396 397 # Validation-seeking rate (for Rick's messages) 398 rick_atoms = [a for a in self._atoms if a.speaker == "Rick"] 399 validation_seeking = [ 400 a for a in rick_atoms 401 if a.signals and "needs_validation" in a.signals.suggested_tags 402 ] 403 if rick_atoms: 404 report.validation_seeking_rate = len(validation_seeking) / len(rick_atoms) 405 406 return report 407 408 def export_topic_map(self, output_path: str) -> None: 409 """Export topic map as JSON for visualization.""" 410 nodes = [] 411 edges = [] 412 413 for topic, cluster in self._topic_clusters.items(): 414 nodes.append({ 415 'id': topic, 416 'weight': cluster.total_weight, 417 'insights': cluster.insight_count, 418 'decisions': cluster.decision_count, 419 'principles': cluster.principle_count 420 }) 421 422 for related, count in cluster.related_topics.items(): 423 if count > 1: # Only significant connections 424 edges.append({ 425 'source': topic, 426 'target': related, 427 'weight': count 428 }) 429 430 with open(output_path, 'w') as f: 431 json.dump({'nodes': nodes, 'edges': edges}, f, indent=2) 432 433 def get_atoms_for_topic(self, topic: str) -> List[TranscriptAtom]: 434 """Get all atoms related to a topic.""" 435 if topic in self._topic_clusters: 436 return self._topic_clusters[topic].atoms 437 return [] 438 439 440 def create_transcript_miner(sessions_dir: str) -> TranscriptMiner: 441 """Create a transcript miner.""" 442 return TranscriptMiner(sessions_dir) 443 444 445 if __name__ == "__main__": 446 import sys 447 448 print("=== Transcript Miner ===\n") 449 450 sessions_dir = "/Users/rcerf/repos/Sovereign_Estate/daily/sessions/" 451 miner = TranscriptMiner(sessions_dir) 452 453 hours = 168 # 7 days 454 if len(sys.argv) > 1: 455 hours = float(sys.argv[1]) 456 457 print(f"Mining last {hours} hours...") 458 report = miner.mine(hours_back=hours) 459 460 print(report.to_markdown())