/ core / metacog / hunting_party.py
hunting_party.py
  1  #!/usr/bin/env python3
  2  """
  3  Sovereign OS - Hunting Party
  4  =============================
  5  
  6  Real-time extraction agents that watch the conversation stream and
  7  extract nodes, edges, dropped threads, and paths not taken.
  8  
  9  This is the first piece of the AI Chorus architecture - the extractors
 10  that feed the knowledge graph in real-time.
 11  
 12  From the flight-protocol:
 13  - ThreadCatcher: catches dropped/interrupted threads
 14  - PathTracker: tracks paths taken AND not taken
 15  - RelatedSurfacer: finds connected items from the graph
 16  - ConversationModeler: builds conversation shape
 17  
 18  Usage:
 19      python3 core/metacog/hunting_party.py                    # Watch current session
 20      python3 core/metacog/hunting_party.py --transcript PATH  # Watch specific transcript
 21      python3 core/metacog/hunting_party.py --extract-once     # Single extraction pass
 22      python3 core/metacog/hunting_party.py --status           # Show extraction status
 23  
 24  Architecture:
 25      TRANSCRIPT (JSONL) → HUNTING PARTY → MESH (hunting_* events) → GRAPH
 26  """
 27  
 28  import json
 29  import re
 30  import sys
 31  import time
 32  import hashlib
 33  import urllib.request
 34  import urllib.error
 35  from pathlib import Path
 36  from datetime import datetime
 37  from typing import Dict, List, Any, Optional, Set, Tuple
 38  from dataclasses import dataclass, field
 39  from collections import defaultdict
 40  
 41  # Paths
 42  SOVEREIGN_OS = Path(__file__).parent.parent.parent
 43  SESSIONS_DIR = SOVEREIGN_OS / "sessions"
 44  CLAUDE_PROJECTS_DIR = Path.home() / ".claude" / "projects"
 45  GRAPH_DATA = Path.home() / ".sovereign" / "graph-data.json"
 46  HUNTING_STATE = Path.home() / ".sovereign" / "hunting-state.json"
 47  
 48  # Mesh network
 49  MESH_HTTP_PORT = 7778
 50  
 51  
 52  @dataclass
 53  class ExtractedAtom:
 54      """A node extracted from conversation."""
 55      id: str
 56      content: str
 57      atom_type: str  # 'concept', 'insight', 'decision', 'question', 'thread'
 58      confidence: float  # 0-1, how confident we are this is a real atom
 59      source_line: int
 60      context: str  # surrounding context
 61      axioms: List[str] = field(default_factory=list)
 62      timestamp: str = ""
 63  
 64      def to_dict(self) -> Dict:
 65          return {
 66              'id': self.id,
 67              'content': self.content,
 68              'atom_type': self.atom_type,
 69              'confidence': self.confidence,
 70              'source_line': self.source_line,
 71              'context': self.context,
 72              'axioms': self.axioms,
 73              'timestamp': self.timestamp
 74          }
 75  
 76  
 77  @dataclass
 78  class ExtractedEdge:
 79      """An edge between atoms."""
 80      source_id: str
 81      target_id: str
 82      edge_type: str  # 'relates_to', 'derives_from', 'contradicts', 'extends', 'resonates_with'
 83      confidence: float
 84      evidence: str  # text that suggests this connection
 85  
 86      def to_dict(self) -> Dict:
 87          return {
 88              'source_id': self.source_id,
 89              'target_id': self.target_id,
 90              'edge_type': self.edge_type,
 91              'confidence': self.confidence,
 92              'evidence': self.evidence
 93          }
 94  
 95  
 96  @dataclass
 97  class DroppedThread:
 98      """A thread that was interrupted/dropped."""
 99      content: str
100      dropped_at_line: int
101      context: str
102      resumption_score: float  # 0-1, how likely this should be resumed
103  
104      def to_dict(self) -> Dict:
105          return {
106              'content': self.content,
107              'dropped_at_line': self.dropped_at_line,
108              'context': self.context,
109              'resumption_score': self.resumption_score
110          }
111  
112  
113  @dataclass
114  class PathNotTaken:
115      """A decision branch that wasn't explored."""
116      description: str
117      branch_point_line: int
118      chosen_path: str
119      reason_not_taken: str = ""
120      nagging_score: float = 0.3  # how much this "nags" to be revisited
121  
122      def to_dict(self) -> Dict:
123          return {
124              'description': self.description,
125              'branch_point_line': self.branch_point_line,
126              'chosen_path': self.chosen_path,
127              'reason_not_taken': self.reason_not_taken,
128              'nagging_score': self.nagging_score
129          }
130  
131  
132  class HuntingParty:
133      """
134      The hunting party - a collection of extraction agents that work together
135      to extract structure from linear conversation.
136      """
137  
138      def __init__(self, transcript_path: Optional[Path] = None):
139          self.transcript_path = transcript_path or self._find_current_transcript()
140          self.last_processed_line = 0
141          self.atoms: Dict[str, ExtractedAtom] = {}
142          self.edges: List[ExtractedEdge] = []
143          self.dropped_threads: List[DroppedThread] = []
144          self.paths_not_taken: List[PathNotTaken] = []
145          self.orphan_atoms: Set[str] = set()  # atoms with no edges
146  
147          # Patterns for extraction
148          self.insight_patterns = [
149              r"insight[:\s]+(.+)",
150              r"the key is[:\s]+(.+)",
151              r"realize[ds]?\s+that\s+(.+)",
152              r"discovered?\s+that\s+(.+)",
153              r"important[:\s]+(.+)",
154              r"principle[:\s]+(.+)",
155          ]
156  
157          self.decision_patterns = [
158              r"decide[ds]?\s+to\s+(.+)",
159              r"chose\s+(.+)",
160              r"going\s+with\s+(.+)",
161              r"let's\s+(.+)",
162              r"we'll\s+(.+)",
163              r"I'll\s+(.+)",
164          ]
165  
166          self.question_patterns = [
167              r"(?:^|\s)(how\s+(?:do|can|should|would|might)\s+.+\?)",
168              r"(?:^|\s)(what\s+(?:is|are|should|would|if)\s+.+\?)",
169              r"(?:^|\s)(why\s+.+\?)",
170              r"(?:^|\s)(should\s+we\s+.+\?)",
171          ]
172  
173          self.branch_patterns = [
174              r"(?:could|can)\s+(?:also|either)\s+(.+)",
175              r"alternatively[,:\s]+(.+)",
176              r"another\s+(?:option|approach)[:\s]+(.+)",
177              r"or\s+we\s+could\s+(.+)",
178              r"option\s+\d+[:\s]+(.+)",
179          ]
180  
181          self.axiom_patterns = {
182              'A0': [r'boundar', r'markov', r'structure\s+flow', r'sovereign'],
183              'A1': [r'integrat', r'connection', r'isolation', r'binding'],
184              'A2': [r'life', r'death', r'motion', r'primitive', r'ornament', r'calcif'],
185              'A3': [r'pole', r'tension', r'dyad', r'navigat', r'dynamic'],
186              'A4': [r'ruin', r'ergodic', r'asymmetr', r'survival', r'catastroph'],
187          }
188  
189          self._load_state()
190  
191      def _find_current_transcript(self) -> Optional[Path]:
192          """Find the most recent transcript for Sovereign_OS."""
193          project_dirs = list(CLAUDE_PROJECTS_DIR.glob("-Users-rcerf-repos*"))
194          if not project_dirs:
195              return None
196  
197          # Find most recent JSONL
198          most_recent = None
199          most_recent_time = 0
200  
201          for project_dir in project_dirs:
202              for jsonl in project_dir.glob("*.jsonl"):
203                  mtime = jsonl.stat().st_mtime
204                  if mtime > most_recent_time:
205                      most_recent_time = mtime
206                      most_recent = jsonl
207  
208          return most_recent
209  
210      def _load_state(self):
211          """Load hunting state from disk."""
212          if HUNTING_STATE.exists():
213              try:
214                  with open(HUNTING_STATE) as f:
215                      state = json.load(f)
216                  self.last_processed_line = state.get('last_processed_line', 0)
217                  # Load atoms, edges, etc. if needed
218              except Exception:
219                  pass
220  
221      def _save_state(self):
222          """Save hunting state to disk."""
223          HUNTING_STATE.parent.mkdir(parents=True, exist_ok=True)
224          state = {
225              'last_processed_line': self.last_processed_line,
226              'atom_count': len(self.atoms),
227              'edge_count': len(self.edges),
228              'orphan_count': len(self.orphan_atoms),
229              'dropped_thread_count': len(self.dropped_threads),
230              'paths_not_taken_count': len(self.paths_not_taken),
231              'updated': datetime.now().isoformat()
232          }
233          with open(HUNTING_STATE, 'w') as f:
234              json.dump(state, f, indent=2)
235  
236      def _generate_id(self, content: str) -> str:
237          """Generate stable ID from content."""
238          return hashlib.md5(content.encode()).hexdigest()[:12]
239  
240      def _detect_axioms(self, text: str) -> List[str]:
241          """Detect which axioms are involved in the text."""
242          text_lower = text.lower()
243          axioms = []
244          for axiom, patterns in self.axiom_patterns.items():
245              for pattern in patterns:
246                  if re.search(pattern, text_lower):
247                      axioms.append(axiom)
248                      break
249          return axioms
250  
251      def _publish_to_mesh(self, message_type: str, payload: Dict) -> bool:
252          """Publish extraction event to mesh."""
253          try:
254              msg = json.dumps({
255                  "type": message_type,
256                  "payload": payload,
257                  "timestamp": datetime.now().isoformat(),
258                  "source": "hunting_party"
259              })
260              req = urllib.request.Request(
261                  f"http://localhost:{MESH_HTTP_PORT}/publish",
262                  data=msg.encode('utf-8'),
263                  headers={"Content-Type": "application/json"},
264                  method="POST"
265              )
266              with urllib.request.urlopen(req, timeout=2) as resp:
267                  return resp.status == 200
268          except (urllib.error.URLError, Exception):
269              return False
270  
271      def extract_from_message(self, message: Dict, line_num: int) -> Tuple[List[ExtractedAtom], List[ExtractedEdge]]:
272          """Extract atoms and edges from a single message."""
273          atoms = []
274          edges = []
275  
276          # Get message content
277          content = ""
278          if isinstance(message, dict):
279              if 'content' in message:
280                  content = message['content']
281              elif 'message' in message:
282                  msg = message['message']
283                  if isinstance(msg, dict) and 'content' in msg:
284                      content = msg['content']
285  
286          if not content:
287              return atoms, edges
288  
289          # Handle content that's a list (Claude's format)
290          if isinstance(content, list):
291              text_parts = []
292              for part in content:
293                  if isinstance(part, dict) and part.get('type') == 'text':
294                      text_parts.append(part.get('text', ''))
295                  elif isinstance(part, str):
296                      text_parts.append(part)
297              content = '\n'.join(text_parts)
298  
299          # Extract insights
300          for pattern in self.insight_patterns:
301              for match in re.finditer(pattern, content, re.IGNORECASE):
302                  insight_text = match.group(1).strip()
303                  if len(insight_text) > 10:
304                      atom_id = f"INS_{self._generate_id(insight_text)}"
305                      if atom_id not in self.atoms:
306                          atom = ExtractedAtom(
307                              id=atom_id,
308                              content=insight_text[:200],
309                              atom_type='insight',
310                              confidence=0.7,
311                              source_line=line_num,
312                              context=content[:100],
313                              axioms=self._detect_axioms(insight_text),
314                              timestamp=datetime.now().isoformat()
315                          )
316                          atoms.append(atom)
317                          self.atoms[atom_id] = atom
318  
319          # Extract decisions
320          for pattern in self.decision_patterns:
321              for match in re.finditer(pattern, content, re.IGNORECASE):
322                  decision_text = match.group(1).strip()
323                  if len(decision_text) > 5:
324                      atom_id = f"DEC_{self._generate_id(decision_text)}"
325                      if atom_id not in self.atoms:
326                          atom = ExtractedAtom(
327                              id=atom_id,
328                              content=decision_text[:200],
329                              atom_type='decision',
330                              confidence=0.8,
331                              source_line=line_num,
332                              context=content[:100],
333                              axioms=self._detect_axioms(decision_text),
334                              timestamp=datetime.now().isoformat()
335                          )
336                          atoms.append(atom)
337                          self.atoms[atom_id] = atom
338  
339          # Extract questions (potential gravity wells)
340          for pattern in self.question_patterns:
341              for match in re.finditer(pattern, content, re.IGNORECASE):
342                  question_text = match.group(1).strip()
343                  if len(question_text) > 10:
344                      atom_id = f"QST_{self._generate_id(question_text)}"
345                      if atom_id not in self.atoms:
346                          atom = ExtractedAtom(
347                              id=atom_id,
348                              content=question_text[:200],
349                              atom_type='question',
350                              confidence=0.9,
351                              source_line=line_num,
352                              context=content[:100],
353                              axioms=self._detect_axioms(question_text),
354                              timestamp=datetime.now().isoformat()
355                          )
356                          atoms.append(atom)
357                          self.atoms[atom_id] = atom
358  
359          # Extract paths not taken (branches)
360          for pattern in self.branch_patterns:
361              for match in re.finditer(pattern, content, re.IGNORECASE):
362                  branch_text = match.group(1).strip()
363                  if len(branch_text) > 10:
364                      path = PathNotTaken(
365                          description=branch_text[:200],
366                          branch_point_line=line_num,
367                          chosen_path="",  # Will be updated when we see what was chosen
368                          nagging_score=0.4
369                      )
370                      self.paths_not_taken.append(path)
371  
372          # Extract concept nodes from [[links]] (Obsidian style)
373          link_pattern = r'\[\[([^\]]+)\]\]'
374          for match in re.finditer(link_pattern, content):
375              concept = match.group(1).strip()
376              atom_id = f"CON_{self._generate_id(concept)}"
377              if atom_id not in self.atoms:
378                  atom = ExtractedAtom(
379                      id=atom_id,
380                      content=concept,
381                      atom_type='concept',
382                      confidence=1.0,  # Explicit link = high confidence
383                      source_line=line_num,
384                      context=content[:100],
385                      axioms=self._detect_axioms(concept),
386                      timestamp=datetime.now().isoformat()
387                  )
388                  atoms.append(atom)
389                  self.atoms[atom_id] = atom
390  
391          # Detect edges between atoms in same message
392          atom_ids = [a.id for a in atoms]
393          if len(atom_ids) > 1:
394              # Create co-occurrence edges
395              for i, src in enumerate(atom_ids):
396                  for tgt in atom_ids[i+1:]:
397                      edge = ExtractedEdge(
398                          source_id=src,
399                          target_id=tgt,
400                          edge_type='co_occurs',
401                          confidence=0.5,
402                          evidence=f"Co-occurred in line {line_num}"
403                      )
404                      edges.append(edge)
405                      self.edges.append(edge)
406  
407          return atoms, edges
408  
409      def detect_dropped_thread(self, messages: List[Dict], current_idx: int) -> Optional[DroppedThread]:
410          """Detect if a thread was interrupted."""
411          if current_idx < 2:
412              return None
413  
414          current = messages[current_idx]
415          prev = messages[current_idx - 1]
416  
417          # Look for topic shifts
418          current_text = self._get_message_text(current)
419          prev_text = self._get_message_text(prev)
420  
421          # Simple heuristic: if the message starts with "Actually" or "Wait" or changes topic abruptly
422          interruption_markers = [
423              r"^actually[,\s]",
424              r"^wait[,\s]",
425              r"^hold on[,\s]",
426              r"^but first[,\s]",
427              r"^let me switch",
428              r"^different topic",
429          ]
430  
431          for pattern in interruption_markers:
432              if re.search(pattern, current_text, re.IGNORECASE):
433                  return DroppedThread(
434                      content=prev_text[:200],
435                      dropped_at_line=current_idx,
436                      context=current_text[:100],
437                      resumption_score=0.6
438                  )
439  
440          return None
441  
442      def _get_message_text(self, message: Dict) -> str:
443          """Extract text from a message."""
444          if isinstance(message, dict):
445              content = message.get('content', message.get('message', {}).get('content', ''))
446              if isinstance(content, list):
447                  return ' '.join(p.get('text', '') if isinstance(p, dict) else str(p) for p in content)
448              return str(content)
449          return str(message)
450  
451      def process_transcript(self, from_line: int = 0) -> Dict[str, int]:
452          """Process transcript from a given line number."""
453          if not self.transcript_path or not self.transcript_path.exists():
454              return {'error': 'No transcript found'}
455  
456          messages = []
457          try:
458              with open(self.transcript_path) as f:
459                  for i, line in enumerate(f):
460                      if i >= from_line:
461                          try:
462                              messages.append((i, json.loads(line)))
463                          except json.JSONDecodeError:
464                              continue
465          except Exception as e:
466              return {'error': str(e)}
467  
468          new_atoms = []
469          new_edges = []
470  
471          for line_num, message in messages:
472              atoms, edges = self.extract_from_message(message, line_num)
473              new_atoms.extend(atoms)
474              new_edges.extend(edges)
475  
476              # Check for dropped threads
477              all_messages = [m for _, m in messages]
478              idx = line_num - from_line
479              if idx >= 0 and idx < len(all_messages):
480                  dropped = self.detect_dropped_thread(all_messages, idx)
481                  if dropped:
482                      self.dropped_threads.append(dropped)
483  
484          # Update orphans (atoms with no edges)
485          connected_atoms = set()
486          for edge in self.edges:
487              connected_atoms.add(edge.source_id)
488              connected_atoms.add(edge.target_id)
489  
490          self.orphan_atoms = set(self.atoms.keys()) - connected_atoms
491  
492          # Update last processed line
493          if messages:
494              self.last_processed_line = messages[-1][0] + 1
495  
496          self._save_state()
497  
498          # Publish to mesh
499          if new_atoms:
500              self._publish_to_mesh("hunting_extraction", {
501                  "new_atoms": len(new_atoms),
502                  "new_edges": len(new_edges),
503                  "total_atoms": len(self.atoms),
504                  "total_edges": len(self.edges),
505                  "orphan_count": len(self.orphan_atoms),
506                  "atoms": [a.to_dict() for a in new_atoms[-5:]]  # Last 5
507              })
508  
509          return {
510              'new_atoms': len(new_atoms),
511              'new_edges': len(new_edges),
512              'total_atoms': len(self.atoms),
513              'total_edges': len(self.edges),
514              'orphan_count': len(self.orphan_atoms),
515              'dropped_threads': len(self.dropped_threads),
516              'paths_not_taken': len(self.paths_not_taken)
517          }
518  
519      def get_orphans(self) -> List[ExtractedAtom]:
520          """Get atoms that have no connections."""
521          return [self.atoms[aid] for aid in self.orphan_atoms if aid in self.atoms]
522  
523      def export_to_graph_feeder(self) -> Dict:
524          """Export extracted data in format compatible with graph_feeder."""
525          return {
526              'nodes': [
527                  {
528                      'id': atom.id,
529                      'label': atom.content[:30],
530                      'node_type': atom.atom_type,
531                      'content': atom.content,
532                      'axioms': atom.axioms,
533                      'importance': atom.confidence,
534                      'source': 'hunting_party'
535                  }
536                  for atom in self.atoms.values()
537              ],
538              'edges': [
539                  {
540                      'source_id': edge.source_id,
541                      'target_id': edge.target_id,
542                      'edge_type': edge.edge_type,
543                      'strength': edge.confidence
544                  }
545                  for edge in self.edges
546              ],
547              'dropped_threads': [t.to_dict() for t in self.dropped_threads],
548              'paths_not_taken': [p.to_dict() for p in self.paths_not_taken]
549          }
550  
551      def run_watch(self, interval: int = 30):
552          """Watch mode - continuously process new messages."""
553          print(f"Hunting Party watching: {self.transcript_path}")
554          print(f"Starting from line: {self.last_processed_line}")
555          print(f"Interval: {interval}s")
556          print("Press Ctrl+C to stop\n")
557  
558          while True:
559              try:
560                  results = self.process_transcript(self.last_processed_line)
561  
562                  if results.get('new_atoms', 0) > 0 or results.get('new_edges', 0) > 0:
563                      now = datetime.now().strftime('%H:%M:%S')
564                      print(f"[{now}] +{results.get('new_atoms', 0)} atoms, +{results.get('new_edges', 0)} edges | " +
565                            f"Total: {results.get('total_atoms', 0)} atoms, {results.get('total_edges', 0)} edges | " +
566                            f"Orphans: {results.get('orphan_count', 0)}")
567  
568                  time.sleep(interval)
569  
570              except KeyboardInterrupt:
571                  print("\nHunting party stopped.")
572                  break
573              except Exception as e:
574                  print(f"Error: {e}")
575                  time.sleep(interval)
576  
577  
578  def show_status():
579      """Show hunting party status."""
580      party = HuntingParty()
581  
582      print()
583      print("=" * 60)
584      print("HUNTING PARTY STATUS")
585      print("=" * 60)
586      print()
587  
588      if party.transcript_path:
589          print(f"Transcript: {party.transcript_path.name}")
590      else:
591          print("Transcript: NOT FOUND")
592  
593      print(f"Last processed line: {party.last_processed_line}")
594      print()
595  
596      # Process once to get current state
597      results = party.process_transcript(party.last_processed_line)
598  
599      print(f"Atoms extracted: {results.get('total_atoms', 0)}")
600      print(f"Edges detected: {results.get('total_edges', 0)}")
601      print(f"Orphan atoms: {results.get('orphan_count', 0)}")
602      print(f"Dropped threads: {results.get('dropped_threads', 0)}")
603      print(f"Paths not taken: {results.get('paths_not_taken', 0)}")
604      print()
605  
606      # Show some orphans
607      orphans = party.get_orphans()
608      if orphans:
609          print("ORPHAN ATOMS (need connections):")
610          for orphan in orphans[:5]:
611              print(f"  - [{orphan.atom_type}] {orphan.content[:50]}...")
612      print()
613  
614  
615  def extract_once():
616      """Run extraction once and show results."""
617      party = HuntingParty()
618  
619      print("Running single extraction pass...")
620      results = party.process_transcript(0)  # From beginning
621  
622      print()
623      print(f"Extracted: {results.get('total_atoms', 0)} atoms, {results.get('total_edges', 0)} edges")
624      print(f"Orphans: {results.get('orphan_count', 0)}")
625      print(f"Dropped threads: {results.get('dropped_threads', 0)}")
626      print(f"Paths not taken: {results.get('paths_not_taken', 0)}")
627      print()
628  
629      # Export for graph feeder
630      export = party.export_to_graph_feeder()
631      export_path = HUNTING_STATE.parent / "hunting-export.json"
632      with open(export_path, 'w') as f:
633          json.dump(export, f, indent=2)
634      print(f"Exported to: {export_path}")
635  
636      # Show some examples
637      print("\nSample atoms:")
638      for atom in list(party.atoms.values())[:5]:
639          print(f"  [{atom.atom_type}] {atom.content[:60]}...")
640  
641      if party.dropped_threads:
642          print("\nDropped threads:")
643          for thread in party.dropped_threads[:3]:
644              print(f"  - Line {thread.dropped_at_line}: {thread.content[:50]}...")
645  
646      if party.paths_not_taken:
647          print("\nPaths not taken:")
648          for path in party.paths_not_taken[:3]:
649              print(f"  - Line {path.branch_point_line}: {path.description[:50]}...")
650  
651  
652  if __name__ == "__main__":
653      if len(sys.argv) > 1:
654          if sys.argv[1] == "--status":
655              show_status()
656          elif sys.argv[1] == "--extract-once":
657              extract_once()
658          elif sys.argv[1] == "--transcript" and len(sys.argv) > 2:
659              party = HuntingParty(Path(sys.argv[2]))
660              party.run_watch()
661          elif sys.argv[1] in ["--help", "-h"]:
662              print(__doc__)
663          else:
664              print(f"Unknown option: {sys.argv[1]}")
665              print(__doc__)
666      else:
667          # Default: watch mode
668          party = HuntingParty()
669          party.run_watch()