/ core / attention / transcript_miner.py
transcript_miner.py
  1  """
  2  Transcript Miner - Extract insights from historical conversations.
  3  
  4  Your 7 days of transcripts are rich with:
  5  1. Signal words showing what you value
  6  2. Decision patterns
  7  3. Topic trajectories
  8  4. Validation-seeking moments (potential insights to mine)
  9  5. Language patterns for personalization
 10  
 11  This module:
 12  - Parses session transcripts
 13  - Extracts and categorizes signal words
 14  - Identifies unmined insights (high-weight items not yet integrated)
 15  - Builds topic maps showing conceptual relationships
 16  - Learns your personal vocabulary/patterns
 17  """
 18  
 19  from dataclasses import dataclass, field
 20  from datetime import datetime, timedelta
 21  from pathlib import Path
 22  from typing import Optional, List, Dict, Tuple, Set, Any
 23  from collections import defaultdict
 24  import re
 25  import json
 26  
 27  from .signal_words import SignalWordDetector, SignalDetection, detect_signals
 28  
 29  
 30  @dataclass
 31  class TranscriptAtom:
 32      """A single message/atom from a transcript."""
 33      content: str
 34      speaker: str  # 'Rick', 'Claude', 'System'
 35      timestamp: Optional[datetime] = None
 36      session_id: str = ""
 37  
 38      # Signal analysis
 39      signals: Optional[SignalDetection] = None
 40      weight: float = 1.0
 41      tags: Set[str] = field(default_factory=set)
 42  
 43      # Derived
 44      is_insight: bool = False  # High weight + validation-seeking
 45      is_decision: bool = False
 46      is_principle: bool = False
 47      needs_integration: bool = False  # Not yet mined
 48  
 49  
 50  @dataclass
 51  class TopicCluster:
 52      """A cluster of related content around a topic."""
 53      topic: str
 54      atoms: List[TranscriptAtom] = field(default_factory=list)
 55      total_weight: float = 0.0
 56      decision_count: int = 0
 57      insight_count: int = 0
 58      principle_count: int = 0
 59  
 60      # Related topics (co-occurrence)
 61      related_topics: Dict[str, int] = field(default_factory=dict)
 62  
 63  
 64  @dataclass
 65  class MiningReport:
 66      """Results from mining transcripts."""
 67      sessions_analyzed: int = 0
 68      atoms_processed: int = 0
 69      time_window_hours: float = 0.0
 70  
 71      # High-value items
 72      unmined_insights: List[TranscriptAtom] = field(default_factory=list)
 73      principles: List[TranscriptAtom] = field(default_factory=list)
 74      decisions: List[TranscriptAtom] = field(default_factory=list)
 75  
 76      # Patterns
 77      signal_word_counts: Dict[str, int] = field(default_factory=dict)
 78      topic_clusters: Dict[str, TopicCluster] = field(default_factory=dict)
 79  
 80      # Operator-specific
 81      your_common_signals: List[Tuple[str, int]] = field(default_factory=list)
 82      validation_seeking_rate: float = 0.0  # How often you ask "what do you think?"
 83  
 84      def to_markdown(self) -> str:
 85          """Generate markdown report."""
 86          lines = [
 87              "# Transcript Mining Report",
 88              f"*{self.sessions_analyzed} sessions, {self.atoms_processed} atoms, {self.time_window_hours:.1f} hours*",
 89              "",
 90          ]
 91  
 92          # Unmined insights
 93          if self.unmined_insights:
 94              lines.append("## Unmined Insights")
 95              lines.append("*High-weight items not yet integrated:*")
 96              lines.append("")
 97              for atom in self.unmined_insights[:10]:
 98                  tags = ', '.join(atom.tags) if atom.tags else 'none'
 99                  lines.append(f"- [{atom.weight:.2f}] {atom.content[:100]}... (tags: {tags})")
100              lines.append("")
101  
102          # Principles
103          if self.principles:
104              lines.append("## Principles Identified")
105              for atom in self.principles[:10]:
106                  lines.append(f"- {atom.content[:150]}...")
107              lines.append("")
108  
109          # Decisions
110          if self.decisions:
111              lines.append("## Decisions Made")
112              for atom in self.decisions[:10]:
113                  lines.append(f"- {atom.content[:100]}...")
114              lines.append("")
115  
116          # Topic clusters
117          if self.topic_clusters:
118              lines.append("## Topic Clusters")
119              sorted_clusters = sorted(
120                  self.topic_clusters.values(),
121                  key=lambda c: c.total_weight,
122                  reverse=True
123              )
124              for cluster in sorted_clusters[:10]:
125                  lines.append(f"### {cluster.topic}")
126                  lines.append(f"Weight: {cluster.total_weight:.1f}, Insights: {cluster.insight_count}, Decisions: {cluster.decision_count}")
127                  if cluster.related_topics:
128                      related = sorted(cluster.related_topics.items(), key=lambda x: -x[1])[:5]
129                      lines.append(f"Related: {', '.join(t for t, c in related)}")
130                  lines.append("")
131  
132          # Signal patterns
133          if self.your_common_signals:
134              lines.append("## Your Signal Word Patterns")
135              for signal, count in self.your_common_signals[:15]:
136                  lines.append(f"- {signal}: {count} times")
137              lines.append("")
138              lines.append(f"Validation-seeking rate: {self.validation_seeking_rate:.1%}")
139  
140          return '\n'.join(lines)
141  
142  
143  class TranscriptMiner:
144      """
145      Mines session transcripts for insights, patterns, and unmined content.
146  
147      Usage:
148          miner = TranscriptMiner(sessions_dir)
149          report = miner.mine(hours_back=168)  # 7 days
150  
151          # Get unmined insights
152          for insight in report.unmined_insights:
153              print(f"[{insight.weight}] {insight.content}")
154  
155          # Export topic map
156          miner.export_topic_map("topic_map.json")
157      """
158  
159      # Patterns to extract atoms from markdown session files
160      ATOM_PATTERNS = [
161          # Standard speaker pattern: **[HH:MM:SS] Speaker**
162          (r'\*\*\[(\d{2}:\d{2}:\d{2})\]\s+(Rick|Claude)\*\*\s*\n\s*-\s+(.+?)(?=\n\n|\n\*\*\[|\Z)',
163           lambda m: (m.group(2), m.group(3), m.group(1))),
164      ]
165  
166      # Topic extraction patterns
167      TOPIC_PATTERNS = [
168          r'#(\w+)',  # Hashtags
169          r'\b([a-z]+_[a-z_]+)\b',  # snake_case (code terms)
170          r'\b(attention|context|stream|session|coherence|membrane|aha)\b',  # Domain terms
171      ]
172  
173      def __init__(self, sessions_dir: str):
174          self.sessions_dir = Path(sessions_dir)
175          self.signal_detector = SignalWordDetector()
176  
177          # Accumulated state
178          self._atoms: List[TranscriptAtom] = []
179          self._topic_clusters: Dict[str, TopicCluster] = defaultdict(
180              lambda: TopicCluster(topic="")
181          )
182          self._signal_counts: Dict[str, int] = defaultdict(int)
183  
184      def mine(
185          self,
186          hours_back: float = 168,  # 7 days
187          speaker_filter: str = None,  # 'Rick', 'Claude', or None for all
188          min_weight: float = 0.0
189      ) -> MiningReport:
190          """
191          Mine transcripts from the specified time window.
192  
193          Args:
194              hours_back: How far back to look
195              speaker_filter: Only analyze this speaker's messages
196              min_weight: Only include atoms above this weight
197  
198          Returns:
199              MiningReport with insights, patterns, and topic clusters
200          """
201          cutoff = datetime.now() - timedelta(hours=hours_back)
202  
203          # Find session files
204          session_files = list(self.sessions_dir.glob("*-live.md"))
205          sessions_analyzed = 0
206  
207          for session_file in session_files:
208              # Check file modification time
209              mtime = datetime.fromtimestamp(session_file.stat().st_mtime)
210              if mtime < cutoff:
211                  continue
212  
213              sessions_analyzed += 1
214              session_id = session_file.stem
215  
216              # Extract atoms
217              content = session_file.read_text()
218              atoms = self._extract_atoms(content, session_id)
219  
220              for atom in atoms:
221                  if speaker_filter and atom.speaker != speaker_filter:
222                      continue
223                  if atom.weight < min_weight:
224                      continue
225  
226                  self._atoms.append(atom)
227                  self._process_atom_for_clusters(atom)
228  
229          return self._build_report(sessions_analyzed, hours_back)
230  
231      def _extract_atoms(self, content: str, session_id: str) -> List[TranscriptAtom]:
232          """Extract atoms from session markdown content."""
233          atoms = []
234  
235          # Simple line-based extraction (more robust than complex regex)
236          lines = content.split('\n')
237          current_speaker = None
238          current_content = []
239          current_timestamp = None
240  
241          for line in lines:
242              # Check for speaker header
243              speaker_match = re.match(r'^\s*-\s*\*\*\[(\d{2}:\d{2}:\d{2})\]\s+(Rick|Claude)\*\*', line)
244              if speaker_match:
245                  # Save previous atom
246                  if current_speaker and current_content:
247                      atom = self._create_atom(
248                          '\n'.join(current_content),
249                          current_speaker,
250                          current_timestamp,
251                          session_id
252                      )
253                      if atom:
254                          atoms.append(atom)
255  
256                  current_timestamp = speaker_match.group(1)
257                  current_speaker = speaker_match.group(2)
258                  current_content = []
259              elif current_speaker:
260                  # Content line - skip metadata/tool results
261                  stripped = line.strip()
262                  if stripped.startswith('-') and not stripped.startswith('- {'):
263                      # Actual content bullet
264                      text = re.sub(r'^-\s*', '', stripped)
265                      # Skip metadata comments
266                      if not text.startswith('<!--') and not text.startswith('{\'type\''):
267                          current_content.append(text)
268  
269          # Don't forget last atom
270          if current_speaker and current_content:
271              atom = self._create_atom(
272                  '\n'.join(current_content),
273                  current_speaker,
274                  current_timestamp,
275                  session_id
276              )
277              if atom:
278                  atoms.append(atom)
279  
280          return atoms
281  
282      def _create_atom(
283          self,
284          content: str,
285          speaker: str,
286          timestamp_str: str,
287          session_id: str
288      ) -> Optional[TranscriptAtom]:
289          """Create a TranscriptAtom with signal analysis."""
290          if not content or len(content) < 10:
291              return None
292  
293          # Clean content
294          content = re.sub(r'\^[a-f0-9]+', '', content)  # Remove block refs
295          content = re.sub(r'<!--.*?-->', '', content, flags=re.DOTALL)  # Remove comments
296          content = content.strip()
297  
298          if not content:
299              return None
300  
301          # Analyze signals
302          signals = self.signal_detector.detect(content)
303  
304          # Track signal word usage
305          for signal, match in signals.signals_found:
306              self._signal_counts[signal.pattern] += 1
307  
308          atom = TranscriptAtom(
309              content=content,
310              speaker=speaker,
311              session_id=session_id,
312              signals=signals,
313              weight=signals.weight_modifier,
314              tags=signals.suggested_tags.copy()
315          )
316  
317          # Categorize
318          atom.is_insight = signals.weight_modifier > 1.3 and "needs_validation" in signals.suggested_tags
319          atom.is_decision = signals.is_decision
320          atom.is_principle = "principle" in signals.suggested_tags
321          atom.needs_integration = signals.weight_modifier > 1.2 and speaker == "Rick"
322  
323          return atom
324  
325      def _process_atom_for_clusters(self, atom: TranscriptAtom) -> None:
326          """Add atom to relevant topic clusters."""
327          topics = self._extract_topics(atom.content)
328  
329          for topic in topics:
330              if topic not in self._topic_clusters:
331                  self._topic_clusters[topic] = TopicCluster(topic=topic)
332  
333              cluster = self._topic_clusters[topic]
334              cluster.atoms.append(atom)
335              cluster.total_weight += atom.weight
336  
337              if atom.is_decision:
338                  cluster.decision_count += 1
339              if atom.is_insight:
340                  cluster.insight_count += 1
341              if atom.is_principle:
342                  cluster.principle_count += 1
343  
344              # Track co-occurring topics
345              for other_topic in topics:
346                  if other_topic != topic:
347                      cluster.related_topics[other_topic] = cluster.related_topics.get(other_topic, 0) + 1
348  
349      def _extract_topics(self, content: str) -> Set[str]:
350          """Extract topics from content."""
351          topics = set()
352          content_lower = content.lower()
353  
354          for pattern in self.TOPIC_PATTERNS:
355              matches = re.findall(pattern, content_lower)
356              topics.update(matches)
357  
358          # Filter out common words
359          stop_words = {'the', 'and', 'for', 'this', 'that', 'with', 'from', 'have', 'been'}
360          topics = {t for t in topics if t not in stop_words and len(t) > 2}
361  
362          return topics
363  
364      def _build_report(self, sessions_analyzed: int, hours_back: float) -> MiningReport:
365          """Build the mining report."""
366          report = MiningReport(
367              sessions_analyzed=sessions_analyzed,
368              atoms_processed=len(self._atoms),
369              time_window_hours=hours_back
370          )
371  
372          # Collect high-value items
373          for atom in self._atoms:
374              if atom.needs_integration and atom.weight > 1.2:
375                  report.unmined_insights.append(atom)
376              if atom.is_principle:
377                  report.principles.append(atom)
378              if atom.is_decision:
379                  report.decisions.append(atom)
380  
381          # Sort by weight
382          report.unmined_insights.sort(key=lambda a: a.weight, reverse=True)
383          report.principles.sort(key=lambda a: a.weight, reverse=True)
384          report.decisions.sort(key=lambda a: a.weight, reverse=True)
385  
386          # Topic clusters
387          report.topic_clusters = dict(self._topic_clusters)
388  
389          # Signal patterns
390          report.signal_word_counts = dict(self._signal_counts)
391          report.your_common_signals = sorted(
392              self._signal_counts.items(),
393              key=lambda x: x[1],
394              reverse=True
395          )
396  
397          # Validation-seeking rate (for Rick's messages)
398          rick_atoms = [a for a in self._atoms if a.speaker == "Rick"]
399          validation_seeking = [
400              a for a in rick_atoms
401              if a.signals and "needs_validation" in a.signals.suggested_tags
402          ]
403          if rick_atoms:
404              report.validation_seeking_rate = len(validation_seeking) / len(rick_atoms)
405  
406          return report
407  
408      def export_topic_map(self, output_path: str) -> None:
409          """Export topic map as JSON for visualization."""
410          nodes = []
411          edges = []
412  
413          for topic, cluster in self._topic_clusters.items():
414              nodes.append({
415                  'id': topic,
416                  'weight': cluster.total_weight,
417                  'insights': cluster.insight_count,
418                  'decisions': cluster.decision_count,
419                  'principles': cluster.principle_count
420              })
421  
422              for related, count in cluster.related_topics.items():
423                  if count > 1:  # Only significant connections
424                      edges.append({
425                          'source': topic,
426                          'target': related,
427                          'weight': count
428                      })
429  
430          with open(output_path, 'w') as f:
431              json.dump({'nodes': nodes, 'edges': edges}, f, indent=2)
432  
433      def get_atoms_for_topic(self, topic: str) -> List[TranscriptAtom]:
434          """Get all atoms related to a topic."""
435          if topic in self._topic_clusters:
436              return self._topic_clusters[topic].atoms
437          return []
438  
439  
440  def create_transcript_miner(sessions_dir: str) -> TranscriptMiner:
441      """Create a transcript miner."""
442      return TranscriptMiner(sessions_dir)
443  
444  
445  if __name__ == "__main__":
446      import sys
447  
448      print("=== Transcript Miner ===\n")
449  
450      sessions_dir = "/Users/rcerf/repos/Sovereign_Estate/daily/sessions/"
451      miner = TranscriptMiner(sessions_dir)
452  
453      hours = 168  # 7 days
454      if len(sys.argv) > 1:
455          hours = float(sys.argv[1])
456  
457      print(f"Mining last {hours} hours...")
458      report = miner.mine(hours_back=hours)
459  
460      print(report.to_markdown())