/ core / replay / conversation_parser.py
conversation_parser.py
  1  """
  2  Conversation Parser - Extract patterns from JSONL conversation logs.
  3  
  4  Parses Claude Code conversation logs and extracts:
  5  - Topics discussed
  6  - Decisions made
  7  - Key artifacts created
  8  - Patterns and insights
  9  - Flow state moments (via signal word detection)
 10  """
 11  
 12  import json
 13  import re
 14  import sys
 15  from pathlib import Path
 16  from datetime import datetime
 17  from typing import List, Dict, Any, Optional, Tuple
 18  from dataclasses import dataclass, field
 19  
 20  # Import signal word detector if available
 21  try:
 22      from core.attention.signal_words import SignalWordDetector, SignalDetection
 23      HAS_SIGNAL_WORDS = True
 24  except ImportError:
 25      HAS_SIGNAL_WORDS = False
 26      SignalWordDetector = None
 27      SignalDetection = None
 28  
 29  # Import resonance engine for typed axiom detection
 30  try:
 31      from core.metacog.resonance import (
 32          calculate_axiom_resonance,
 33          get_dominant_axiom,
 34          AXIOM_FIELDS
 35      )
 36      HAS_RESONANCE_ENGINE = True
 37  except ImportError:
 38      HAS_RESONANCE_ENGINE = False
 39      AXIOM_FIELDS = {}
 40  
 41      def calculate_axiom_resonance(terms, axiom_id=None):
 42          return {}
 43  
 44      def get_dominant_axiom(terms):
 45          return None
 46  
 47  # Resonance thresholds for chat sessions (tuned for insight discovery)
 48  CHAT_RESONANCE_THRESHOLD = 0.10  # Lower than code sessions - more exploratory
 49  CHAT_HIGH_VALUE_THRESHOLD = 0.25  # Surface to attention
 50  
 51  
 52  @dataclass
 53  class ConversationMessage:
 54      """A single message in a conversation."""
 55      role: str  # 'human' or 'assistant'
 56      content: str
 57      timestamp: Optional[datetime] = None
 58      tool_calls: List[str] = field(default_factory=list)
 59      files_touched: List[str] = field(default_factory=list)
 60  
 61  
 62  @dataclass
 63  class FlowMoment:
 64      """A detected flow state moment in the conversation."""
 65      timestamp: Optional[datetime]
 66      text: str
 67      weight_modifier: float
 68      signals: List[str]
 69      tags: List[str]
 70  
 71  
 72  @dataclass
 73  class ConversationThread:
 74      """A parsed conversation thread."""
 75      thread_id: str
 76      start_time: Optional[datetime] = None
 77      end_time: Optional[datetime] = None
 78      messages: List[ConversationMessage] = field(default_factory=list)
 79      topics: List[str] = field(default_factory=list)
 80      concepts: List[str] = field(default_factory=list)  # Key concepts for resonance
 81      decisions: List[str] = field(default_factory=list)
 82      artifacts: List[str] = field(default_factory=list)
 83      key_insights: List[str] = field(default_factory=list)
 84      flow_moments: List[FlowMoment] = field(default_factory=list)  # High-engagement moments
 85      peak_engagement: float = 0.0  # Highest weight modifier seen
 86  
 87      # Resonance metadata - typed axiom detection
 88      axiom_resonance: Dict[str, float] = field(default_factory=dict)  # A0-A4 resonance scores
 89      dominant_axiom: Optional[Tuple[str, float]] = None  # (axiom_id, score) if one dominates
 90      high_resonance_segments: List[Dict[str, Any]] = field(default_factory=list)  # Signal, not noise
 91      is_signal_rich: bool = False  # Does this chat have significant axiom content?
 92  
 93      @property
 94      def duration_minutes(self) -> Optional[float]:
 95          if self.start_time and self.end_time:
 96              return (self.end_time - self.start_time).total_seconds() / 60
 97          return None
 98  
 99      @property
100      def exchange_count(self) -> int:
101          """Count human-assistant exchange pairs."""
102          human_count = sum(1 for m in self.messages if m.role == 'human')
103          return human_count
104  
105  
106  class ConversationParser:
107      """Parse JSONL conversation logs."""
108  
109      def __init__(self):
110          # Initialize signal word detector if available
111          self.signal_detector = SignalWordDetector() if HAS_SIGNAL_WORDS else None
112  
113          self.topic_patterns = [
114              r'(?:working on|building|implementing|creating)\s+(.+?)(?:\.|$)',
115              r'(?:Let\'s|let me)\s+(.+?)(?:\.|$)',
116              r'(?:need to|want to|should)\s+(.+?)(?:\.|$)',
117          ]
118  
119          # Key concept keywords for resonance detection
120          self.concept_keywords = [
121              # Cognitive/consciousness
122              'consciousness', 'cognitive', 'attention', 'awareness', 'metacognition',
123              'thinking', 'reasoning', 'memory', 'perception',
124              # Biometrics
125              'eeg', 'biometric', 'brainwave', 'neural', 'physiological',
126              'heart rate', 'gsr', 'eye tracking', 'tobii', 'mindmonitor',
127              # Architecture
128              'architecture', 'pipeline', 'daemon', 'adapter', 'interface',
129              'protocol', 'pattern', 'engine', 'system', 'framework',
130              # Data/Processing
131              'transcription', 'audio', 'streaming', 'real-time', 'sync',
132              'database', 'sqlite', 'storage', 'ingestion',
133              # Sovereign OS specific
134              'phoenix', 'compression', 'resonance', 'sovereign', 'cockpit',
135              'tribal', 'first officer', 'mission control', 'handoff',
136              # Graph/topology
137              'graph', 'topology', 'node', 'edge', 'cluster', 'routing',
138              'navigation', 'temporal', 'position', 'vector',
139              # Flow state indicators
140              'jamming', 'buzzing', 'clicking', 'landing', 'vibing',
141              'resonating', 'alignment', 'crystallizing', 'breakthrough',
142              'flow', 'cooking', 'rolling', 'fire',
143          ]
144          self.decision_patterns = [
145              r'(?:decided to|will use|going with|chose)\s+(.+?)(?:\.|$)',
146              r'(?:the approach is|solution is)\s+(.+?)(?:\.|$)',
147          ]
148          self.artifact_patterns = [
149              r'(?:created|wrote|generated|built)\s+[`"]?([^`"]+\.(?:py|md|yaml|json|ts|js))[`"]?',
150              r'(?:file|path):\s*[`"]?([^`"\s]+\.(?:py|md|yaml|json|ts|js))[`"]?',
151          ]
152  
153      def parse_jsonl(self, path: Path) -> ConversationThread:
154          """Parse a JSONL conversation log file."""
155          thread = ConversationThread(thread_id=path.stem)
156  
157          with open(path, 'r') as f:
158              for line in f:
159                  line = line.strip()
160                  if not line:
161                      continue
162                  try:
163                      entry = json.loads(line)
164                      self._process_entry(entry, thread)
165                  except json.JSONDecodeError:
166                      continue
167  
168          # Extract high-level patterns after parsing
169          self._extract_patterns(thread)
170  
171          # Detect flow moments using signal word detector
172          self._detect_flow_moments(thread)
173  
174          return thread
175  
176      def _detect_flow_moments(self, thread: ConversationThread) -> None:
177          """Detect flow state moments using signal word detection."""
178          if not self.signal_detector:
179              return
180  
181          flow_moments = []
182          peak_engagement = 1.0
183  
184          for msg in thread.messages:
185              if msg.role != 'human':
186                  continue
187  
188              detection = self.signal_detector.detect(msg.content)
189  
190              # Track peak engagement
191              if detection.weight_modifier > peak_engagement:
192                  peak_engagement = detection.weight_modifier
193  
194              # If high engagement detected (weight > 1.3), record as flow moment
195              if detection.weight_modifier > 1.3:
196                  flow_moments.append(FlowMoment(
197                      timestamp=msg.timestamp,
198                      text=msg.content[:200],  # Truncate
199                      weight_modifier=detection.weight_modifier,
200                      signals=[s.pattern for s, m in detection.signals_found],
201                      tags=list(detection.suggested_tags)
202                  ))
203  
204          thread.flow_moments = flow_moments
205          thread.peak_engagement = peak_engagement
206  
207      def _process_entry(self, entry: Dict[str, Any], thread: ConversationThread) -> None:
208          """Process a single JSONL entry."""
209          # Claude Code format: type field with nested message object
210          entry_type = entry.get('type', '')
211  
212          # Skip non-message entries
213          if entry_type in ('queue-operation', 'tool_result', 'summary'):
214              return
215  
216          # Handle Claude Code format: type="user" or "assistant" with nested message
217          if entry_type in ('user', 'assistant') and 'message' in entry:
218              self._process_claude_code_entry(entry, thread)
219          elif 'type' in entry:
220              self._process_typed_entry(entry, thread)
221          elif 'role' in entry:
222              self._process_message_entry(entry, thread)
223          elif 'message' in entry:
224              self._process_nested_entry(entry, thread)
225  
226      def _process_claude_code_entry(self, entry: Dict[str, Any], thread: ConversationThread) -> None:
227          """Process Claude Code specific format entries."""
228          entry_type = entry.get('type', '')
229          message = entry.get('message', {})
230  
231          if not isinstance(message, dict):
232              return
233  
234          role = 'human' if entry_type == 'user' else 'assistant'
235  
236          # Extract content
237          content = message.get('content', '')
238          if isinstance(content, list):
239              # Assistant content blocks format
240              texts = []
241              for block in content:
242                  if isinstance(block, dict) and block.get('type') == 'text':
243                      texts.append(block.get('text', ''))
244              content = '\n'.join(texts)
245  
246          if not content:
247              return
248  
249          msg = ConversationMessage(
250              role=role,
251              content=content[:5000]
252          )
253  
254          # Extract timestamp from entry
255          ts_str = entry.get('timestamp')
256          if ts_str:
257              try:
258                  ts = datetime.fromisoformat(ts_str.replace('Z', '+00:00'))
259                  msg.timestamp = ts
260                  if thread.start_time is None or ts < thread.start_time:
261                      thread.start_time = ts
262                  if thread.end_time is None or ts > thread.end_time:
263                      thread.end_time = ts
264              except (ValueError, TypeError):
265                  pass
266  
267          # Extract tool calls and files for assistant messages
268          if role == 'assistant':
269              msg.tool_calls = self._extract_tool_calls(message)
270              msg.files_touched = self._extract_files(entry)
271  
272          thread.messages.append(msg)
273  
274      def _process_typed_entry(self, entry: Dict[str, Any], thread: ConversationThread) -> None:
275          """Process an entry with explicit type field."""
276          entry_type = entry.get('type', '')
277  
278          if entry_type == 'human':
279              content = self._extract_content(entry)
280              if content:
281                  msg = ConversationMessage(role='human', content=content)
282                  self._extract_timestamp(entry, msg, thread)
283                  thread.messages.append(msg)
284  
285          elif entry_type == 'assistant':
286              content = self._extract_content(entry)
287              tool_calls = self._extract_tool_calls(entry)
288              files = self._extract_files(entry)
289              if content or tool_calls:
290                  msg = ConversationMessage(
291                      role='assistant',
292                      content=content,
293                      tool_calls=tool_calls,
294                      files_touched=files
295                  )
296                  self._extract_timestamp(entry, msg, thread)
297                  thread.messages.append(msg)
298  
299      def _process_message_entry(self, entry: Dict[str, Any], thread: ConversationThread) -> None:
300          """Process an entry with role field."""
301          role = entry.get('role', '')
302          if role not in ('human', 'assistant', 'user'):
303              return
304  
305          role = 'human' if role == 'user' else role
306          content = self._extract_content(entry)
307  
308          if content:
309              msg = ConversationMessage(role=role, content=content)
310              self._extract_timestamp(entry, msg, thread)
311  
312              if role == 'assistant':
313                  msg.tool_calls = self._extract_tool_calls(entry)
314                  msg.files_touched = self._extract_files(entry)
315  
316              thread.messages.append(msg)
317  
318      def _process_nested_entry(self, entry: Dict[str, Any], thread: ConversationThread) -> None:
319          """Process an entry with nested message field."""
320          message = entry.get('message', {})
321          if isinstance(message, dict):
322              self._process_message_entry(message, thread)
323  
324      def _extract_content(self, entry: Dict[str, Any]) -> str:
325          """Extract text content from an entry."""
326          # Try different content locations
327          content = entry.get('content', '')
328          if isinstance(content, str):
329              return content[:5000]  # Truncate long content
330  
331          if isinstance(content, list):
332              # Content blocks format
333              texts = []
334              for block in content:
335                  if isinstance(block, dict):
336                      if block.get('type') == 'text':
337                          texts.append(block.get('text', ''))
338                  elif isinstance(block, str):
339                      texts.append(block)
340              return ' '.join(texts)[:5000]
341  
342          # Try message field
343          message = entry.get('message', '')
344          if isinstance(message, str):
345              return message[:5000]
346  
347          return ''
348  
349      def _extract_tool_calls(self, entry: Dict[str, Any]) -> List[str]:
350          """Extract tool call names from an entry."""
351          tools = []
352  
353          # Check content blocks for tool_use
354          content = entry.get('content', [])
355          if isinstance(content, list):
356              for block in content:
357                  if isinstance(block, dict) and block.get('type') == 'tool_use':
358                      tools.append(block.get('name', 'unknown'))
359  
360          # Check tool_calls field
361          tool_calls = entry.get('tool_calls', [])
362          if isinstance(tool_calls, list):
363              for tc in tool_calls:
364                  if isinstance(tc, dict):
365                      tools.append(tc.get('name', tc.get('function', {}).get('name', 'unknown')))
366  
367          return tools
368  
369      def _extract_files(self, entry: Dict[str, Any]) -> List[str]:
370          """Extract file paths from an entry."""
371          files = []
372          content = self._extract_content(entry)
373  
374          for pattern in self.artifact_patterns:
375              matches = re.findall(pattern, content, re.IGNORECASE)
376              files.extend(matches)
377  
378          # Also check tool call inputs
379          content_blocks = entry.get('content', [])
380          if isinstance(content_blocks, list):
381              for block in content_blocks:
382                  if isinstance(block, dict) and block.get('type') == 'tool_use':
383                      input_data = block.get('input', {})
384                      if isinstance(input_data, dict):
385                          for key in ('file_path', 'path', 'file'):
386                              if key in input_data:
387                                  files.append(input_data[key])
388  
389          return list(set(files))
390  
391      def _extract_timestamp(self, entry: Dict[str, Any], msg: ConversationMessage, thread: ConversationThread) -> None:
392          """Extract and set timestamp from entry."""
393          ts_str = entry.get('timestamp') or entry.get('created_at') or entry.get('time')
394          if ts_str:
395              try:
396                  if isinstance(ts_str, (int, float)):
397                      ts = datetime.fromtimestamp(ts_str)
398                  else:
399                      # Try ISO format
400                      ts = datetime.fromisoformat(ts_str.replace('Z', '+00:00'))
401                  msg.timestamp = ts
402  
403                  # Update thread bounds
404                  if thread.start_time is None or ts < thread.start_time:
405                      thread.start_time = ts
406                  if thread.end_time is None or ts > thread.end_time:
407                      thread.end_time = ts
408              except (ValueError, TypeError):
409                  pass
410  
411      def _extract_patterns(self, thread: ConversationThread) -> None:
412          """Extract high-level patterns from parsed messages."""
413          all_text = ' '.join(m.content for m in thread.messages)
414          all_text_lower = all_text.lower()
415  
416          # Extract key concepts (for resonance detection)
417          concepts = set()
418          for keyword in self.concept_keywords:
419              if keyword.lower() in all_text_lower:
420                  concepts.add(keyword.lower())
421          thread.concepts = list(concepts)
422  
423          # Extract topics (verbose descriptions)
424          topics = set()
425          for pattern in self.topic_patterns:
426              matches = re.findall(pattern, all_text, re.IGNORECASE)
427              for match in matches[:10]:  # Limit per pattern
428                  topic = match.strip()
429                  if len(topic) > 5 and len(topic) < 100:
430                      topics.add(topic)
431          thread.topics = list(topics)[:15]
432  
433          # Extract decisions
434          decisions = set()
435          for pattern in self.decision_patterns:
436              matches = re.findall(pattern, all_text, re.IGNORECASE)
437              for match in matches[:5]:
438                  decision = match.strip()
439                  if len(decision) > 5 and len(decision) < 150:
440                      decisions.add(decision)
441          thread.decisions = list(decisions)[:10]
442  
443          # Artifacts from files touched
444          artifacts = set()
445          for msg in thread.messages:
446              artifacts.update(msg.files_touched)
447          thread.artifacts = list(artifacts)[:20]
448  
449          # Key insights from assistant messages (look for insight markers)
450          insights = []
451          insight_markers = ['important:', 'key insight:', 'note:', 'critical:', 'the key is']
452          for msg in thread.messages:
453              if msg.role == 'assistant':
454                  content_lower = msg.content.lower()
455                  for marker in insight_markers:
456                      if marker in content_lower:
457                          # Extract sentence containing marker
458                          idx = content_lower.find(marker)
459                          end = msg.content.find('.', idx)
460                          if end > idx:
461                              insight = msg.content[idx:end+1].strip()
462                              if len(insight) < 200:
463                                  insights.append(insight)
464          thread.key_insights = insights[:10]
465  
466          # === RESONANCE-BASED AXIOM DETECTION ===
467          # Calculate typed resonance across all axioms (find signal, reduce noise)
468          self._calculate_resonance(thread, all_text)
469  
470      def _calculate_resonance(self, thread: ConversationThread, all_text: str) -> None:
471          """
472          Calculate axiom resonance for the conversation thread.
473  
474          Uses typed resonance to determine which axioms this chat explores.
475          High-resonance segments are extracted for further analysis.
476          """
477          if not HAS_RESONANCE_ENGINE:
478              return
479  
480          # Extract terms from full conversation
481          words = set(re.findall(r'\b[a-zA-Z]{3,}\b', all_text.lower()))
482          axiom_scores = calculate_axiom_resonance(words)
483          dominant = get_dominant_axiom(words)
484  
485          thread.axiom_resonance = axiom_scores
486          thread.dominant_axiom = dominant
487  
488          # Determine if this is a signal-rich conversation
489          peak_resonance = max(axiom_scores.values()) if axiom_scores else 0.0
490          thread.is_signal_rich = peak_resonance >= CHAT_RESONANCE_THRESHOLD
491  
492          # Extract high-resonance segments (individual messages with strong axiom signal)
493          high_res_segments = []
494          for i, msg in enumerate(thread.messages):
495              msg_words = set(re.findall(r'\b[a-zA-Z]{3,}\b', msg.content.lower()))
496              msg_scores = calculate_axiom_resonance(msg_words)
497              msg_dominant = get_dominant_axiom(msg_words)
498  
499              msg_peak = max(msg_scores.values()) if msg_scores else 0.0
500  
501              if msg_peak >= CHAT_HIGH_VALUE_THRESHOLD:
502                  high_res_segments.append({
503                      "index": i,
504                      "role": msg.role,
505                      "preview": msg.content[:200],
506                      "resonance": msg_peak,
507                      "scores": msg_scores,
508                      "dominant": msg_dominant,
509                      "is_insight": msg_dominant[0] in ("A0", "A1") if msg_dominant else False,
510                      "is_operational": msg_dominant[0] in ("A3", "A4") if msg_dominant else False,
511                  })
512  
513          thread.high_resonance_segments = high_res_segments
514  
515  
516  def summarize_thread(thread: ConversationThread) -> str:
517      """Generate a summary of a conversation thread."""
518      lines = [
519          f"Thread: {thread.thread_id}",
520          f"Exchanges: {thread.exchange_count}",
521      ]
522  
523      if thread.start_time:
524          lines.append(f"Start: {thread.start_time.strftime('%Y-%m-%d %H:%M')}")
525      if thread.duration_minutes:
526          lines.append(f"Duration: {thread.duration_minutes:.0f} minutes")
527  
528      if thread.topics:
529          lines.append(f"\nTopics ({len(thread.topics)}):")
530          for t in thread.topics[:5]:
531              lines.append(f"  - {t}")
532  
533      if thread.decisions:
534          lines.append(f"\nDecisions ({len(thread.decisions)}):")
535          for d in thread.decisions[:5]:
536              lines.append(f"  - {d}")
537  
538      if thread.artifacts:
539          lines.append(f"\nArtifacts ({len(thread.artifacts)}):")
540          for a in thread.artifacts[:10]:
541              lines.append(f"  - {a}")
542  
543      # Resonance summary
544      if thread.axiom_resonance:
545          lines.append(f"\nAxiom Resonance (signal/noise detection):")
546          signal_status = "SIGNAL-RICH" if thread.is_signal_rich else "LOW-SIGNAL"
547          lines.append(f"  Status: {signal_status}")
548          if thread.dominant_axiom:
549              axiom_id, score = thread.dominant_axiom
550              axiom_name = AXIOM_FIELDS.get(axiom_id, {}).get("name", axiom_id)
551              lines.append(f"  Dominant: {axiom_id} ({axiom_name}) @ {score:.2f}")
552          for aid, score in sorted(thread.axiom_resonance.items(), key=lambda x: -x[1]):
553              if score >= CHAT_RESONANCE_THRESHOLD:
554                  lines.append(f"    {aid}: {score:.2f}")
555  
556      if thread.high_resonance_segments:
557          lines.append(f"\nHigh-Value Segments ({len(thread.high_resonance_segments)}):")
558          for seg in thread.high_resonance_segments[:5]:
559              marker = "💡" if seg.get("is_insight") else "⚙️" if seg.get("is_operational") else "📍"
560              lines.append(f"  {marker} [{seg['role']}] {seg['preview'][:60]}... (r={seg['resonance']:.2f})")
561  
562      return '\n'.join(lines)