conversation_parser.py
1 """ 2 Conversation Parser - Extract patterns from JSONL conversation logs. 3 4 Parses Claude Code conversation logs and extracts: 5 - Topics discussed 6 - Decisions made 7 - Key artifacts created 8 - Patterns and insights 9 - Flow state moments (via signal word detection) 10 """ 11 12 import json 13 import re 14 import sys 15 from pathlib import Path 16 from datetime import datetime 17 from typing import List, Dict, Any, Optional, Tuple 18 from dataclasses import dataclass, field 19 20 # Import signal word detector if available 21 try: 22 from core.attention.signal_words import SignalWordDetector, SignalDetection 23 HAS_SIGNAL_WORDS = True 24 except ImportError: 25 HAS_SIGNAL_WORDS = False 26 SignalWordDetector = None 27 SignalDetection = None 28 29 # Import resonance engine for typed axiom detection 30 try: 31 from core.metacog.resonance import ( 32 calculate_axiom_resonance, 33 get_dominant_axiom, 34 AXIOM_FIELDS 35 ) 36 HAS_RESONANCE_ENGINE = True 37 except ImportError: 38 HAS_RESONANCE_ENGINE = False 39 AXIOM_FIELDS = {} 40 41 def calculate_axiom_resonance(terms, axiom_id=None): 42 return {} 43 44 def get_dominant_axiom(terms): 45 return None 46 47 # Resonance thresholds for chat sessions (tuned for insight discovery) 48 CHAT_RESONANCE_THRESHOLD = 0.10 # Lower than code sessions - more exploratory 49 CHAT_HIGH_VALUE_THRESHOLD = 0.25 # Surface to attention 50 51 52 @dataclass 53 class ConversationMessage: 54 """A single message in a conversation.""" 55 role: str # 'human' or 'assistant' 56 content: str 57 timestamp: Optional[datetime] = None 58 tool_calls: List[str] = field(default_factory=list) 59 files_touched: List[str] = field(default_factory=list) 60 61 62 @dataclass 63 class FlowMoment: 64 """A detected flow state moment in the conversation.""" 65 timestamp: Optional[datetime] 66 text: str 67 weight_modifier: float 68 signals: List[str] 69 tags: List[str] 70 71 72 @dataclass 73 class ConversationThread: 74 """A parsed conversation thread.""" 75 thread_id: str 76 start_time: Optional[datetime] = None 77 end_time: Optional[datetime] = None 78 messages: List[ConversationMessage] = field(default_factory=list) 79 topics: List[str] = field(default_factory=list) 80 concepts: List[str] = field(default_factory=list) # Key concepts for resonance 81 decisions: List[str] = field(default_factory=list) 82 artifacts: List[str] = field(default_factory=list) 83 key_insights: List[str] = field(default_factory=list) 84 flow_moments: List[FlowMoment] = field(default_factory=list) # High-engagement moments 85 peak_engagement: float = 0.0 # Highest weight modifier seen 86 87 # Resonance metadata - typed axiom detection 88 axiom_resonance: Dict[str, float] = field(default_factory=dict) # A0-A4 resonance scores 89 dominant_axiom: Optional[Tuple[str, float]] = None # (axiom_id, score) if one dominates 90 high_resonance_segments: List[Dict[str, Any]] = field(default_factory=list) # Signal, not noise 91 is_signal_rich: bool = False # Does this chat have significant axiom content? 92 93 @property 94 def duration_minutes(self) -> Optional[float]: 95 if self.start_time and self.end_time: 96 return (self.end_time - self.start_time).total_seconds() / 60 97 return None 98 99 @property 100 def exchange_count(self) -> int: 101 """Count human-assistant exchange pairs.""" 102 human_count = sum(1 for m in self.messages if m.role == 'human') 103 return human_count 104 105 106 class ConversationParser: 107 """Parse JSONL conversation logs.""" 108 109 def __init__(self): 110 # Initialize signal word detector if available 111 self.signal_detector = SignalWordDetector() if HAS_SIGNAL_WORDS else None 112 113 self.topic_patterns = [ 114 r'(?:working on|building|implementing|creating)\s+(.+?)(?:\.|$)', 115 r'(?:Let\'s|let me)\s+(.+?)(?:\.|$)', 116 r'(?:need to|want to|should)\s+(.+?)(?:\.|$)', 117 ] 118 119 # Key concept keywords for resonance detection 120 self.concept_keywords = [ 121 # Cognitive/consciousness 122 'consciousness', 'cognitive', 'attention', 'awareness', 'metacognition', 123 'thinking', 'reasoning', 'memory', 'perception', 124 # Biometrics 125 'eeg', 'biometric', 'brainwave', 'neural', 'physiological', 126 'heart rate', 'gsr', 'eye tracking', 'tobii', 'mindmonitor', 127 # Architecture 128 'architecture', 'pipeline', 'daemon', 'adapter', 'interface', 129 'protocol', 'pattern', 'engine', 'system', 'framework', 130 # Data/Processing 131 'transcription', 'audio', 'streaming', 'real-time', 'sync', 132 'database', 'sqlite', 'storage', 'ingestion', 133 # Sovereign OS specific 134 'phoenix', 'compression', 'resonance', 'sovereign', 'cockpit', 135 'tribal', 'first officer', 'mission control', 'handoff', 136 # Graph/topology 137 'graph', 'topology', 'node', 'edge', 'cluster', 'routing', 138 'navigation', 'temporal', 'position', 'vector', 139 # Flow state indicators 140 'jamming', 'buzzing', 'clicking', 'landing', 'vibing', 141 'resonating', 'alignment', 'crystallizing', 'breakthrough', 142 'flow', 'cooking', 'rolling', 'fire', 143 ] 144 self.decision_patterns = [ 145 r'(?:decided to|will use|going with|chose)\s+(.+?)(?:\.|$)', 146 r'(?:the approach is|solution is)\s+(.+?)(?:\.|$)', 147 ] 148 self.artifact_patterns = [ 149 r'(?:created|wrote|generated|built)\s+[`"]?([^`"]+\.(?:py|md|yaml|json|ts|js))[`"]?', 150 r'(?:file|path):\s*[`"]?([^`"\s]+\.(?:py|md|yaml|json|ts|js))[`"]?', 151 ] 152 153 def parse_jsonl(self, path: Path) -> ConversationThread: 154 """Parse a JSONL conversation log file.""" 155 thread = ConversationThread(thread_id=path.stem) 156 157 with open(path, 'r') as f: 158 for line in f: 159 line = line.strip() 160 if not line: 161 continue 162 try: 163 entry = json.loads(line) 164 self._process_entry(entry, thread) 165 except json.JSONDecodeError: 166 continue 167 168 # Extract high-level patterns after parsing 169 self._extract_patterns(thread) 170 171 # Detect flow moments using signal word detector 172 self._detect_flow_moments(thread) 173 174 return thread 175 176 def _detect_flow_moments(self, thread: ConversationThread) -> None: 177 """Detect flow state moments using signal word detection.""" 178 if not self.signal_detector: 179 return 180 181 flow_moments = [] 182 peak_engagement = 1.0 183 184 for msg in thread.messages: 185 if msg.role != 'human': 186 continue 187 188 detection = self.signal_detector.detect(msg.content) 189 190 # Track peak engagement 191 if detection.weight_modifier > peak_engagement: 192 peak_engagement = detection.weight_modifier 193 194 # If high engagement detected (weight > 1.3), record as flow moment 195 if detection.weight_modifier > 1.3: 196 flow_moments.append(FlowMoment( 197 timestamp=msg.timestamp, 198 text=msg.content[:200], # Truncate 199 weight_modifier=detection.weight_modifier, 200 signals=[s.pattern for s, m in detection.signals_found], 201 tags=list(detection.suggested_tags) 202 )) 203 204 thread.flow_moments = flow_moments 205 thread.peak_engagement = peak_engagement 206 207 def _process_entry(self, entry: Dict[str, Any], thread: ConversationThread) -> None: 208 """Process a single JSONL entry.""" 209 # Claude Code format: type field with nested message object 210 entry_type = entry.get('type', '') 211 212 # Skip non-message entries 213 if entry_type in ('queue-operation', 'tool_result', 'summary'): 214 return 215 216 # Handle Claude Code format: type="user" or "assistant" with nested message 217 if entry_type in ('user', 'assistant') and 'message' in entry: 218 self._process_claude_code_entry(entry, thread) 219 elif 'type' in entry: 220 self._process_typed_entry(entry, thread) 221 elif 'role' in entry: 222 self._process_message_entry(entry, thread) 223 elif 'message' in entry: 224 self._process_nested_entry(entry, thread) 225 226 def _process_claude_code_entry(self, entry: Dict[str, Any], thread: ConversationThread) -> None: 227 """Process Claude Code specific format entries.""" 228 entry_type = entry.get('type', '') 229 message = entry.get('message', {}) 230 231 if not isinstance(message, dict): 232 return 233 234 role = 'human' if entry_type == 'user' else 'assistant' 235 236 # Extract content 237 content = message.get('content', '') 238 if isinstance(content, list): 239 # Assistant content blocks format 240 texts = [] 241 for block in content: 242 if isinstance(block, dict) and block.get('type') == 'text': 243 texts.append(block.get('text', '')) 244 content = '\n'.join(texts) 245 246 if not content: 247 return 248 249 msg = ConversationMessage( 250 role=role, 251 content=content[:5000] 252 ) 253 254 # Extract timestamp from entry 255 ts_str = entry.get('timestamp') 256 if ts_str: 257 try: 258 ts = datetime.fromisoformat(ts_str.replace('Z', '+00:00')) 259 msg.timestamp = ts 260 if thread.start_time is None or ts < thread.start_time: 261 thread.start_time = ts 262 if thread.end_time is None or ts > thread.end_time: 263 thread.end_time = ts 264 except (ValueError, TypeError): 265 pass 266 267 # Extract tool calls and files for assistant messages 268 if role == 'assistant': 269 msg.tool_calls = self._extract_tool_calls(message) 270 msg.files_touched = self._extract_files(entry) 271 272 thread.messages.append(msg) 273 274 def _process_typed_entry(self, entry: Dict[str, Any], thread: ConversationThread) -> None: 275 """Process an entry with explicit type field.""" 276 entry_type = entry.get('type', '') 277 278 if entry_type == 'human': 279 content = self._extract_content(entry) 280 if content: 281 msg = ConversationMessage(role='human', content=content) 282 self._extract_timestamp(entry, msg, thread) 283 thread.messages.append(msg) 284 285 elif entry_type == 'assistant': 286 content = self._extract_content(entry) 287 tool_calls = self._extract_tool_calls(entry) 288 files = self._extract_files(entry) 289 if content or tool_calls: 290 msg = ConversationMessage( 291 role='assistant', 292 content=content, 293 tool_calls=tool_calls, 294 files_touched=files 295 ) 296 self._extract_timestamp(entry, msg, thread) 297 thread.messages.append(msg) 298 299 def _process_message_entry(self, entry: Dict[str, Any], thread: ConversationThread) -> None: 300 """Process an entry with role field.""" 301 role = entry.get('role', '') 302 if role not in ('human', 'assistant', 'user'): 303 return 304 305 role = 'human' if role == 'user' else role 306 content = self._extract_content(entry) 307 308 if content: 309 msg = ConversationMessage(role=role, content=content) 310 self._extract_timestamp(entry, msg, thread) 311 312 if role == 'assistant': 313 msg.tool_calls = self._extract_tool_calls(entry) 314 msg.files_touched = self._extract_files(entry) 315 316 thread.messages.append(msg) 317 318 def _process_nested_entry(self, entry: Dict[str, Any], thread: ConversationThread) -> None: 319 """Process an entry with nested message field.""" 320 message = entry.get('message', {}) 321 if isinstance(message, dict): 322 self._process_message_entry(message, thread) 323 324 def _extract_content(self, entry: Dict[str, Any]) -> str: 325 """Extract text content from an entry.""" 326 # Try different content locations 327 content = entry.get('content', '') 328 if isinstance(content, str): 329 return content[:5000] # Truncate long content 330 331 if isinstance(content, list): 332 # Content blocks format 333 texts = [] 334 for block in content: 335 if isinstance(block, dict): 336 if block.get('type') == 'text': 337 texts.append(block.get('text', '')) 338 elif isinstance(block, str): 339 texts.append(block) 340 return ' '.join(texts)[:5000] 341 342 # Try message field 343 message = entry.get('message', '') 344 if isinstance(message, str): 345 return message[:5000] 346 347 return '' 348 349 def _extract_tool_calls(self, entry: Dict[str, Any]) -> List[str]: 350 """Extract tool call names from an entry.""" 351 tools = [] 352 353 # Check content blocks for tool_use 354 content = entry.get('content', []) 355 if isinstance(content, list): 356 for block in content: 357 if isinstance(block, dict) and block.get('type') == 'tool_use': 358 tools.append(block.get('name', 'unknown')) 359 360 # Check tool_calls field 361 tool_calls = entry.get('tool_calls', []) 362 if isinstance(tool_calls, list): 363 for tc in tool_calls: 364 if isinstance(tc, dict): 365 tools.append(tc.get('name', tc.get('function', {}).get('name', 'unknown'))) 366 367 return tools 368 369 def _extract_files(self, entry: Dict[str, Any]) -> List[str]: 370 """Extract file paths from an entry.""" 371 files = [] 372 content = self._extract_content(entry) 373 374 for pattern in self.artifact_patterns: 375 matches = re.findall(pattern, content, re.IGNORECASE) 376 files.extend(matches) 377 378 # Also check tool call inputs 379 content_blocks = entry.get('content', []) 380 if isinstance(content_blocks, list): 381 for block in content_blocks: 382 if isinstance(block, dict) and block.get('type') == 'tool_use': 383 input_data = block.get('input', {}) 384 if isinstance(input_data, dict): 385 for key in ('file_path', 'path', 'file'): 386 if key in input_data: 387 files.append(input_data[key]) 388 389 return list(set(files)) 390 391 def _extract_timestamp(self, entry: Dict[str, Any], msg: ConversationMessage, thread: ConversationThread) -> None: 392 """Extract and set timestamp from entry.""" 393 ts_str = entry.get('timestamp') or entry.get('created_at') or entry.get('time') 394 if ts_str: 395 try: 396 if isinstance(ts_str, (int, float)): 397 ts = datetime.fromtimestamp(ts_str) 398 else: 399 # Try ISO format 400 ts = datetime.fromisoformat(ts_str.replace('Z', '+00:00')) 401 msg.timestamp = ts 402 403 # Update thread bounds 404 if thread.start_time is None or ts < thread.start_time: 405 thread.start_time = ts 406 if thread.end_time is None or ts > thread.end_time: 407 thread.end_time = ts 408 except (ValueError, TypeError): 409 pass 410 411 def _extract_patterns(self, thread: ConversationThread) -> None: 412 """Extract high-level patterns from parsed messages.""" 413 all_text = ' '.join(m.content for m in thread.messages) 414 all_text_lower = all_text.lower() 415 416 # Extract key concepts (for resonance detection) 417 concepts = set() 418 for keyword in self.concept_keywords: 419 if keyword.lower() in all_text_lower: 420 concepts.add(keyword.lower()) 421 thread.concepts = list(concepts) 422 423 # Extract topics (verbose descriptions) 424 topics = set() 425 for pattern in self.topic_patterns: 426 matches = re.findall(pattern, all_text, re.IGNORECASE) 427 for match in matches[:10]: # Limit per pattern 428 topic = match.strip() 429 if len(topic) > 5 and len(topic) < 100: 430 topics.add(topic) 431 thread.topics = list(topics)[:15] 432 433 # Extract decisions 434 decisions = set() 435 for pattern in self.decision_patterns: 436 matches = re.findall(pattern, all_text, re.IGNORECASE) 437 for match in matches[:5]: 438 decision = match.strip() 439 if len(decision) > 5 and len(decision) < 150: 440 decisions.add(decision) 441 thread.decisions = list(decisions)[:10] 442 443 # Artifacts from files touched 444 artifacts = set() 445 for msg in thread.messages: 446 artifacts.update(msg.files_touched) 447 thread.artifacts = list(artifacts)[:20] 448 449 # Key insights from assistant messages (look for insight markers) 450 insights = [] 451 insight_markers = ['important:', 'key insight:', 'note:', 'critical:', 'the key is'] 452 for msg in thread.messages: 453 if msg.role == 'assistant': 454 content_lower = msg.content.lower() 455 for marker in insight_markers: 456 if marker in content_lower: 457 # Extract sentence containing marker 458 idx = content_lower.find(marker) 459 end = msg.content.find('.', idx) 460 if end > idx: 461 insight = msg.content[idx:end+1].strip() 462 if len(insight) < 200: 463 insights.append(insight) 464 thread.key_insights = insights[:10] 465 466 # === RESONANCE-BASED AXIOM DETECTION === 467 # Calculate typed resonance across all axioms (find signal, reduce noise) 468 self._calculate_resonance(thread, all_text) 469 470 def _calculate_resonance(self, thread: ConversationThread, all_text: str) -> None: 471 """ 472 Calculate axiom resonance for the conversation thread. 473 474 Uses typed resonance to determine which axioms this chat explores. 475 High-resonance segments are extracted for further analysis. 476 """ 477 if not HAS_RESONANCE_ENGINE: 478 return 479 480 # Extract terms from full conversation 481 words = set(re.findall(r'\b[a-zA-Z]{3,}\b', all_text.lower())) 482 axiom_scores = calculate_axiom_resonance(words) 483 dominant = get_dominant_axiom(words) 484 485 thread.axiom_resonance = axiom_scores 486 thread.dominant_axiom = dominant 487 488 # Determine if this is a signal-rich conversation 489 peak_resonance = max(axiom_scores.values()) if axiom_scores else 0.0 490 thread.is_signal_rich = peak_resonance >= CHAT_RESONANCE_THRESHOLD 491 492 # Extract high-resonance segments (individual messages with strong axiom signal) 493 high_res_segments = [] 494 for i, msg in enumerate(thread.messages): 495 msg_words = set(re.findall(r'\b[a-zA-Z]{3,}\b', msg.content.lower())) 496 msg_scores = calculate_axiom_resonance(msg_words) 497 msg_dominant = get_dominant_axiom(msg_words) 498 499 msg_peak = max(msg_scores.values()) if msg_scores else 0.0 500 501 if msg_peak >= CHAT_HIGH_VALUE_THRESHOLD: 502 high_res_segments.append({ 503 "index": i, 504 "role": msg.role, 505 "preview": msg.content[:200], 506 "resonance": msg_peak, 507 "scores": msg_scores, 508 "dominant": msg_dominant, 509 "is_insight": msg_dominant[0] in ("A0", "A1") if msg_dominant else False, 510 "is_operational": msg_dominant[0] in ("A3", "A4") if msg_dominant else False, 511 }) 512 513 thread.high_resonance_segments = high_res_segments 514 515 516 def summarize_thread(thread: ConversationThread) -> str: 517 """Generate a summary of a conversation thread.""" 518 lines = [ 519 f"Thread: {thread.thread_id}", 520 f"Exchanges: {thread.exchange_count}", 521 ] 522 523 if thread.start_time: 524 lines.append(f"Start: {thread.start_time.strftime('%Y-%m-%d %H:%M')}") 525 if thread.duration_minutes: 526 lines.append(f"Duration: {thread.duration_minutes:.0f} minutes") 527 528 if thread.topics: 529 lines.append(f"\nTopics ({len(thread.topics)}):") 530 for t in thread.topics[:5]: 531 lines.append(f" - {t}") 532 533 if thread.decisions: 534 lines.append(f"\nDecisions ({len(thread.decisions)}):") 535 for d in thread.decisions[:5]: 536 lines.append(f" - {d}") 537 538 if thread.artifacts: 539 lines.append(f"\nArtifacts ({len(thread.artifacts)}):") 540 for a in thread.artifacts[:10]: 541 lines.append(f" - {a}") 542 543 # Resonance summary 544 if thread.axiom_resonance: 545 lines.append(f"\nAxiom Resonance (signal/noise detection):") 546 signal_status = "SIGNAL-RICH" if thread.is_signal_rich else "LOW-SIGNAL" 547 lines.append(f" Status: {signal_status}") 548 if thread.dominant_axiom: 549 axiom_id, score = thread.dominant_axiom 550 axiom_name = AXIOM_FIELDS.get(axiom_id, {}).get("name", axiom_id) 551 lines.append(f" Dominant: {axiom_id} ({axiom_name}) @ {score:.2f}") 552 for aid, score in sorted(thread.axiom_resonance.items(), key=lambda x: -x[1]): 553 if score >= CHAT_RESONANCE_THRESHOLD: 554 lines.append(f" {aid}: {score:.2f}") 555 556 if thread.high_resonance_segments: 557 lines.append(f"\nHigh-Value Segments ({len(thread.high_resonance_segments)}):") 558 for seg in thread.high_resonance_segments[:5]: 559 marker = "💡" if seg.get("is_insight") else "⚙️" if seg.get("is_operational") else "📍" 560 lines.append(f" {marker} [{seg['role']}] {seg['preview'][:60]}... (r={seg['resonance']:.2f})") 561 562 return '\n'.join(lines)