hunting_party.py
1 #!/usr/bin/env python3 2 """ 3 Sovereign OS - Hunting Party 4 ============================= 5 6 Real-time extraction agents that watch the conversation stream and 7 extract nodes, edges, dropped threads, and paths not taken. 8 9 This is the first piece of the AI Chorus architecture - the extractors 10 that feed the knowledge graph in real-time. 11 12 From the flight-protocol: 13 - ThreadCatcher: catches dropped/interrupted threads 14 - PathTracker: tracks paths taken AND not taken 15 - RelatedSurfacer: finds connected items from the graph 16 - ConversationModeler: builds conversation shape 17 18 Usage: 19 python3 core/metacog/hunting_party.py # Watch current session 20 python3 core/metacog/hunting_party.py --transcript PATH # Watch specific transcript 21 python3 core/metacog/hunting_party.py --extract-once # Single extraction pass 22 python3 core/metacog/hunting_party.py --status # Show extraction status 23 24 Architecture: 25 TRANSCRIPT (JSONL) → HUNTING PARTY → MESH (hunting_* events) → GRAPH 26 """ 27 28 import json 29 import re 30 import sys 31 import time 32 import hashlib 33 import urllib.request 34 import urllib.error 35 from pathlib import Path 36 from datetime import datetime 37 from typing import Dict, List, Any, Optional, Set, Tuple 38 from dataclasses import dataclass, field 39 from collections import defaultdict 40 41 # Paths 42 SOVEREIGN_OS = Path(__file__).parent.parent.parent 43 SESSIONS_DIR = SOVEREIGN_OS / "sessions" 44 CLAUDE_PROJECTS_DIR = Path.home() / ".claude" / "projects" 45 GRAPH_DATA = Path.home() / ".sovereign" / "graph-data.json" 46 HUNTING_STATE = Path.home() / ".sovereign" / "hunting-state.json" 47 48 # Mesh network 49 MESH_HTTP_PORT = 7778 50 51 52 @dataclass 53 class ExtractedAtom: 54 """A node extracted from conversation.""" 55 id: str 56 content: str 57 atom_type: str # 'concept', 'insight', 'decision', 'question', 'thread' 58 confidence: float # 0-1, how confident we are this is a real atom 59 source_line: int 60 context: str # surrounding context 61 axioms: List[str] = field(default_factory=list) 62 timestamp: str = "" 63 64 def to_dict(self) -> Dict: 65 return { 66 'id': self.id, 67 'content': self.content, 68 'atom_type': self.atom_type, 69 'confidence': self.confidence, 70 'source_line': self.source_line, 71 'context': self.context, 72 'axioms': self.axioms, 73 'timestamp': self.timestamp 74 } 75 76 77 @dataclass 78 class ExtractedEdge: 79 """An edge between atoms.""" 80 source_id: str 81 target_id: str 82 edge_type: str # 'relates_to', 'derives_from', 'contradicts', 'extends', 'resonates_with' 83 confidence: float 84 evidence: str # text that suggests this connection 85 86 def to_dict(self) -> Dict: 87 return { 88 'source_id': self.source_id, 89 'target_id': self.target_id, 90 'edge_type': self.edge_type, 91 'confidence': self.confidence, 92 'evidence': self.evidence 93 } 94 95 96 @dataclass 97 class DroppedThread: 98 """A thread that was interrupted/dropped.""" 99 content: str 100 dropped_at_line: int 101 context: str 102 resumption_score: float # 0-1, how likely this should be resumed 103 104 def to_dict(self) -> Dict: 105 return { 106 'content': self.content, 107 'dropped_at_line': self.dropped_at_line, 108 'context': self.context, 109 'resumption_score': self.resumption_score 110 } 111 112 113 @dataclass 114 class PathNotTaken: 115 """A decision branch that wasn't explored.""" 116 description: str 117 branch_point_line: int 118 chosen_path: str 119 reason_not_taken: str = "" 120 nagging_score: float = 0.3 # how much this "nags" to be revisited 121 122 def to_dict(self) -> Dict: 123 return { 124 'description': self.description, 125 'branch_point_line': self.branch_point_line, 126 'chosen_path': self.chosen_path, 127 'reason_not_taken': self.reason_not_taken, 128 'nagging_score': self.nagging_score 129 } 130 131 132 class HuntingParty: 133 """ 134 The hunting party - a collection of extraction agents that work together 135 to extract structure from linear conversation. 136 """ 137 138 def __init__(self, transcript_path: Optional[Path] = None): 139 self.transcript_path = transcript_path or self._find_current_transcript() 140 self.last_processed_line = 0 141 self.atoms: Dict[str, ExtractedAtom] = {} 142 self.edges: List[ExtractedEdge] = [] 143 self.dropped_threads: List[DroppedThread] = [] 144 self.paths_not_taken: List[PathNotTaken] = [] 145 self.orphan_atoms: Set[str] = set() # atoms with no edges 146 147 # Patterns for extraction 148 self.insight_patterns = [ 149 r"insight[:\s]+(.+)", 150 r"the key is[:\s]+(.+)", 151 r"realize[ds]?\s+that\s+(.+)", 152 r"discovered?\s+that\s+(.+)", 153 r"important[:\s]+(.+)", 154 r"principle[:\s]+(.+)", 155 ] 156 157 self.decision_patterns = [ 158 r"decide[ds]?\s+to\s+(.+)", 159 r"chose\s+(.+)", 160 r"going\s+with\s+(.+)", 161 r"let's\s+(.+)", 162 r"we'll\s+(.+)", 163 r"I'll\s+(.+)", 164 ] 165 166 self.question_patterns = [ 167 r"(?:^|\s)(how\s+(?:do|can|should|would|might)\s+.+\?)", 168 r"(?:^|\s)(what\s+(?:is|are|should|would|if)\s+.+\?)", 169 r"(?:^|\s)(why\s+.+\?)", 170 r"(?:^|\s)(should\s+we\s+.+\?)", 171 ] 172 173 self.branch_patterns = [ 174 r"(?:could|can)\s+(?:also|either)\s+(.+)", 175 r"alternatively[,:\s]+(.+)", 176 r"another\s+(?:option|approach)[:\s]+(.+)", 177 r"or\s+we\s+could\s+(.+)", 178 r"option\s+\d+[:\s]+(.+)", 179 ] 180 181 self.axiom_patterns = { 182 'A0': [r'boundar', r'markov', r'structure\s+flow', r'sovereign'], 183 'A1': [r'integrat', r'connection', r'isolation', r'binding'], 184 'A2': [r'life', r'death', r'motion', r'primitive', r'ornament', r'calcif'], 185 'A3': [r'pole', r'tension', r'dyad', r'navigat', r'dynamic'], 186 'A4': [r'ruin', r'ergodic', r'asymmetr', r'survival', r'catastroph'], 187 } 188 189 self._load_state() 190 191 def _find_current_transcript(self) -> Optional[Path]: 192 """Find the most recent transcript for Sovereign_OS.""" 193 project_dirs = list(CLAUDE_PROJECTS_DIR.glob("-Users-rcerf-repos*")) 194 if not project_dirs: 195 return None 196 197 # Find most recent JSONL 198 most_recent = None 199 most_recent_time = 0 200 201 for project_dir in project_dirs: 202 for jsonl in project_dir.glob("*.jsonl"): 203 mtime = jsonl.stat().st_mtime 204 if mtime > most_recent_time: 205 most_recent_time = mtime 206 most_recent = jsonl 207 208 return most_recent 209 210 def _load_state(self): 211 """Load hunting state from disk.""" 212 if HUNTING_STATE.exists(): 213 try: 214 with open(HUNTING_STATE) as f: 215 state = json.load(f) 216 self.last_processed_line = state.get('last_processed_line', 0) 217 # Load atoms, edges, etc. if needed 218 except Exception: 219 pass 220 221 def _save_state(self): 222 """Save hunting state to disk.""" 223 HUNTING_STATE.parent.mkdir(parents=True, exist_ok=True) 224 state = { 225 'last_processed_line': self.last_processed_line, 226 'atom_count': len(self.atoms), 227 'edge_count': len(self.edges), 228 'orphan_count': len(self.orphan_atoms), 229 'dropped_thread_count': len(self.dropped_threads), 230 'paths_not_taken_count': len(self.paths_not_taken), 231 'updated': datetime.now().isoformat() 232 } 233 with open(HUNTING_STATE, 'w') as f: 234 json.dump(state, f, indent=2) 235 236 def _generate_id(self, content: str) -> str: 237 """Generate stable ID from content.""" 238 return hashlib.md5(content.encode()).hexdigest()[:12] 239 240 def _detect_axioms(self, text: str) -> List[str]: 241 """Detect which axioms are involved in the text.""" 242 text_lower = text.lower() 243 axioms = [] 244 for axiom, patterns in self.axiom_patterns.items(): 245 for pattern in patterns: 246 if re.search(pattern, text_lower): 247 axioms.append(axiom) 248 break 249 return axioms 250 251 def _publish_to_mesh(self, message_type: str, payload: Dict) -> bool: 252 """Publish extraction event to mesh.""" 253 try: 254 msg = json.dumps({ 255 "type": message_type, 256 "payload": payload, 257 "timestamp": datetime.now().isoformat(), 258 "source": "hunting_party" 259 }) 260 req = urllib.request.Request( 261 f"http://localhost:{MESH_HTTP_PORT}/publish", 262 data=msg.encode('utf-8'), 263 headers={"Content-Type": "application/json"}, 264 method="POST" 265 ) 266 with urllib.request.urlopen(req, timeout=2) as resp: 267 return resp.status == 200 268 except (urllib.error.URLError, Exception): 269 return False 270 271 def extract_from_message(self, message: Dict, line_num: int) -> Tuple[List[ExtractedAtom], List[ExtractedEdge]]: 272 """Extract atoms and edges from a single message.""" 273 atoms = [] 274 edges = [] 275 276 # Get message content 277 content = "" 278 if isinstance(message, dict): 279 if 'content' in message: 280 content = message['content'] 281 elif 'message' in message: 282 msg = message['message'] 283 if isinstance(msg, dict) and 'content' in msg: 284 content = msg['content'] 285 286 if not content: 287 return atoms, edges 288 289 # Handle content that's a list (Claude's format) 290 if isinstance(content, list): 291 text_parts = [] 292 for part in content: 293 if isinstance(part, dict) and part.get('type') == 'text': 294 text_parts.append(part.get('text', '')) 295 elif isinstance(part, str): 296 text_parts.append(part) 297 content = '\n'.join(text_parts) 298 299 # Extract insights 300 for pattern in self.insight_patterns: 301 for match in re.finditer(pattern, content, re.IGNORECASE): 302 insight_text = match.group(1).strip() 303 if len(insight_text) > 10: 304 atom_id = f"INS_{self._generate_id(insight_text)}" 305 if atom_id not in self.atoms: 306 atom = ExtractedAtom( 307 id=atom_id, 308 content=insight_text[:200], 309 atom_type='insight', 310 confidence=0.7, 311 source_line=line_num, 312 context=content[:100], 313 axioms=self._detect_axioms(insight_text), 314 timestamp=datetime.now().isoformat() 315 ) 316 atoms.append(atom) 317 self.atoms[atom_id] = atom 318 319 # Extract decisions 320 for pattern in self.decision_patterns: 321 for match in re.finditer(pattern, content, re.IGNORECASE): 322 decision_text = match.group(1).strip() 323 if len(decision_text) > 5: 324 atom_id = f"DEC_{self._generate_id(decision_text)}" 325 if atom_id not in self.atoms: 326 atom = ExtractedAtom( 327 id=atom_id, 328 content=decision_text[:200], 329 atom_type='decision', 330 confidence=0.8, 331 source_line=line_num, 332 context=content[:100], 333 axioms=self._detect_axioms(decision_text), 334 timestamp=datetime.now().isoformat() 335 ) 336 atoms.append(atom) 337 self.atoms[atom_id] = atom 338 339 # Extract questions (potential gravity wells) 340 for pattern in self.question_patterns: 341 for match in re.finditer(pattern, content, re.IGNORECASE): 342 question_text = match.group(1).strip() 343 if len(question_text) > 10: 344 atom_id = f"QST_{self._generate_id(question_text)}" 345 if atom_id not in self.atoms: 346 atom = ExtractedAtom( 347 id=atom_id, 348 content=question_text[:200], 349 atom_type='question', 350 confidence=0.9, 351 source_line=line_num, 352 context=content[:100], 353 axioms=self._detect_axioms(question_text), 354 timestamp=datetime.now().isoformat() 355 ) 356 atoms.append(atom) 357 self.atoms[atom_id] = atom 358 359 # Extract paths not taken (branches) 360 for pattern in self.branch_patterns: 361 for match in re.finditer(pattern, content, re.IGNORECASE): 362 branch_text = match.group(1).strip() 363 if len(branch_text) > 10: 364 path = PathNotTaken( 365 description=branch_text[:200], 366 branch_point_line=line_num, 367 chosen_path="", # Will be updated when we see what was chosen 368 nagging_score=0.4 369 ) 370 self.paths_not_taken.append(path) 371 372 # Extract concept nodes from [[links]] (Obsidian style) 373 link_pattern = r'\[\[([^\]]+)\]\]' 374 for match in re.finditer(link_pattern, content): 375 concept = match.group(1).strip() 376 atom_id = f"CON_{self._generate_id(concept)}" 377 if atom_id not in self.atoms: 378 atom = ExtractedAtom( 379 id=atom_id, 380 content=concept, 381 atom_type='concept', 382 confidence=1.0, # Explicit link = high confidence 383 source_line=line_num, 384 context=content[:100], 385 axioms=self._detect_axioms(concept), 386 timestamp=datetime.now().isoformat() 387 ) 388 atoms.append(atom) 389 self.atoms[atom_id] = atom 390 391 # Detect edges between atoms in same message 392 atom_ids = [a.id for a in atoms] 393 if len(atom_ids) > 1: 394 # Create co-occurrence edges 395 for i, src in enumerate(atom_ids): 396 for tgt in atom_ids[i+1:]: 397 edge = ExtractedEdge( 398 source_id=src, 399 target_id=tgt, 400 edge_type='co_occurs', 401 confidence=0.5, 402 evidence=f"Co-occurred in line {line_num}" 403 ) 404 edges.append(edge) 405 self.edges.append(edge) 406 407 return atoms, edges 408 409 def detect_dropped_thread(self, messages: List[Dict], current_idx: int) -> Optional[DroppedThread]: 410 """Detect if a thread was interrupted.""" 411 if current_idx < 2: 412 return None 413 414 current = messages[current_idx] 415 prev = messages[current_idx - 1] 416 417 # Look for topic shifts 418 current_text = self._get_message_text(current) 419 prev_text = self._get_message_text(prev) 420 421 # Simple heuristic: if the message starts with "Actually" or "Wait" or changes topic abruptly 422 interruption_markers = [ 423 r"^actually[,\s]", 424 r"^wait[,\s]", 425 r"^hold on[,\s]", 426 r"^but first[,\s]", 427 r"^let me switch", 428 r"^different topic", 429 ] 430 431 for pattern in interruption_markers: 432 if re.search(pattern, current_text, re.IGNORECASE): 433 return DroppedThread( 434 content=prev_text[:200], 435 dropped_at_line=current_idx, 436 context=current_text[:100], 437 resumption_score=0.6 438 ) 439 440 return None 441 442 def _get_message_text(self, message: Dict) -> str: 443 """Extract text from a message.""" 444 if isinstance(message, dict): 445 content = message.get('content', message.get('message', {}).get('content', '')) 446 if isinstance(content, list): 447 return ' '.join(p.get('text', '') if isinstance(p, dict) else str(p) for p in content) 448 return str(content) 449 return str(message) 450 451 def process_transcript(self, from_line: int = 0) -> Dict[str, int]: 452 """Process transcript from a given line number.""" 453 if not self.transcript_path or not self.transcript_path.exists(): 454 return {'error': 'No transcript found'} 455 456 messages = [] 457 try: 458 with open(self.transcript_path) as f: 459 for i, line in enumerate(f): 460 if i >= from_line: 461 try: 462 messages.append((i, json.loads(line))) 463 except json.JSONDecodeError: 464 continue 465 except Exception as e: 466 return {'error': str(e)} 467 468 new_atoms = [] 469 new_edges = [] 470 471 for line_num, message in messages: 472 atoms, edges = self.extract_from_message(message, line_num) 473 new_atoms.extend(atoms) 474 new_edges.extend(edges) 475 476 # Check for dropped threads 477 all_messages = [m for _, m in messages] 478 idx = line_num - from_line 479 if idx >= 0 and idx < len(all_messages): 480 dropped = self.detect_dropped_thread(all_messages, idx) 481 if dropped: 482 self.dropped_threads.append(dropped) 483 484 # Update orphans (atoms with no edges) 485 connected_atoms = set() 486 for edge in self.edges: 487 connected_atoms.add(edge.source_id) 488 connected_atoms.add(edge.target_id) 489 490 self.orphan_atoms = set(self.atoms.keys()) - connected_atoms 491 492 # Update last processed line 493 if messages: 494 self.last_processed_line = messages[-1][0] + 1 495 496 self._save_state() 497 498 # Publish to mesh 499 if new_atoms: 500 self._publish_to_mesh("hunting_extraction", { 501 "new_atoms": len(new_atoms), 502 "new_edges": len(new_edges), 503 "total_atoms": len(self.atoms), 504 "total_edges": len(self.edges), 505 "orphan_count": len(self.orphan_atoms), 506 "atoms": [a.to_dict() for a in new_atoms[-5:]] # Last 5 507 }) 508 509 return { 510 'new_atoms': len(new_atoms), 511 'new_edges': len(new_edges), 512 'total_atoms': len(self.atoms), 513 'total_edges': len(self.edges), 514 'orphan_count': len(self.orphan_atoms), 515 'dropped_threads': len(self.dropped_threads), 516 'paths_not_taken': len(self.paths_not_taken) 517 } 518 519 def get_orphans(self) -> List[ExtractedAtom]: 520 """Get atoms that have no connections.""" 521 return [self.atoms[aid] for aid in self.orphan_atoms if aid in self.atoms] 522 523 def export_to_graph_feeder(self) -> Dict: 524 """Export extracted data in format compatible with graph_feeder.""" 525 return { 526 'nodes': [ 527 { 528 'id': atom.id, 529 'label': atom.content[:30], 530 'node_type': atom.atom_type, 531 'content': atom.content, 532 'axioms': atom.axioms, 533 'importance': atom.confidence, 534 'source': 'hunting_party' 535 } 536 for atom in self.atoms.values() 537 ], 538 'edges': [ 539 { 540 'source_id': edge.source_id, 541 'target_id': edge.target_id, 542 'edge_type': edge.edge_type, 543 'strength': edge.confidence 544 } 545 for edge in self.edges 546 ], 547 'dropped_threads': [t.to_dict() for t in self.dropped_threads], 548 'paths_not_taken': [p.to_dict() for p in self.paths_not_taken] 549 } 550 551 def run_watch(self, interval: int = 30): 552 """Watch mode - continuously process new messages.""" 553 print(f"Hunting Party watching: {self.transcript_path}") 554 print(f"Starting from line: {self.last_processed_line}") 555 print(f"Interval: {interval}s") 556 print("Press Ctrl+C to stop\n") 557 558 while True: 559 try: 560 results = self.process_transcript(self.last_processed_line) 561 562 if results.get('new_atoms', 0) > 0 or results.get('new_edges', 0) > 0: 563 now = datetime.now().strftime('%H:%M:%S') 564 print(f"[{now}] +{results.get('new_atoms', 0)} atoms, +{results.get('new_edges', 0)} edges | " + 565 f"Total: {results.get('total_atoms', 0)} atoms, {results.get('total_edges', 0)} edges | " + 566 f"Orphans: {results.get('orphan_count', 0)}") 567 568 time.sleep(interval) 569 570 except KeyboardInterrupt: 571 print("\nHunting party stopped.") 572 break 573 except Exception as e: 574 print(f"Error: {e}") 575 time.sleep(interval) 576 577 578 def show_status(): 579 """Show hunting party status.""" 580 party = HuntingParty() 581 582 print() 583 print("=" * 60) 584 print("HUNTING PARTY STATUS") 585 print("=" * 60) 586 print() 587 588 if party.transcript_path: 589 print(f"Transcript: {party.transcript_path.name}") 590 else: 591 print("Transcript: NOT FOUND") 592 593 print(f"Last processed line: {party.last_processed_line}") 594 print() 595 596 # Process once to get current state 597 results = party.process_transcript(party.last_processed_line) 598 599 print(f"Atoms extracted: {results.get('total_atoms', 0)}") 600 print(f"Edges detected: {results.get('total_edges', 0)}") 601 print(f"Orphan atoms: {results.get('orphan_count', 0)}") 602 print(f"Dropped threads: {results.get('dropped_threads', 0)}") 603 print(f"Paths not taken: {results.get('paths_not_taken', 0)}") 604 print() 605 606 # Show some orphans 607 orphans = party.get_orphans() 608 if orphans: 609 print("ORPHAN ATOMS (need connections):") 610 for orphan in orphans[:5]: 611 print(f" - [{orphan.atom_type}] {orphan.content[:50]}...") 612 print() 613 614 615 def extract_once(): 616 """Run extraction once and show results.""" 617 party = HuntingParty() 618 619 print("Running single extraction pass...") 620 results = party.process_transcript(0) # From beginning 621 622 print() 623 print(f"Extracted: {results.get('total_atoms', 0)} atoms, {results.get('total_edges', 0)} edges") 624 print(f"Orphans: {results.get('orphan_count', 0)}") 625 print(f"Dropped threads: {results.get('dropped_threads', 0)}") 626 print(f"Paths not taken: {results.get('paths_not_taken', 0)}") 627 print() 628 629 # Export for graph feeder 630 export = party.export_to_graph_feeder() 631 export_path = HUNTING_STATE.parent / "hunting-export.json" 632 with open(export_path, 'w') as f: 633 json.dump(export, f, indent=2) 634 print(f"Exported to: {export_path}") 635 636 # Show some examples 637 print("\nSample atoms:") 638 for atom in list(party.atoms.values())[:5]: 639 print(f" [{atom.atom_type}] {atom.content[:60]}...") 640 641 if party.dropped_threads: 642 print("\nDropped threads:") 643 for thread in party.dropped_threads[:3]: 644 print(f" - Line {thread.dropped_at_line}: {thread.content[:50]}...") 645 646 if party.paths_not_taken: 647 print("\nPaths not taken:") 648 for path in party.paths_not_taken[:3]: 649 print(f" - Line {path.branch_point_line}: {path.description[:50]}...") 650 651 652 if __name__ == "__main__": 653 if len(sys.argv) > 1: 654 if sys.argv[1] == "--status": 655 show_status() 656 elif sys.argv[1] == "--extract-once": 657 extract_once() 658 elif sys.argv[1] == "--transcript" and len(sys.argv) > 2: 659 party = HuntingParty(Path(sys.argv[2])) 660 party.run_watch() 661 elif sys.argv[1] in ["--help", "-h"]: 662 print(__doc__) 663 else: 664 print(f"Unknown option: {sys.argv[1]}") 665 print(__doc__) 666 else: 667 # Default: watch mode 668 party = HuntingParty() 669 party.run_watch()