historical_backhaul.py
1 #!/usr/bin/env python3 2 """ 3 Historical Backhaul - Extract Value from Linear Time 4 5 "Recursively tie what is linear time back around and connect it with 6 everything that's happened historically and connect everything that's 7 happened historically with derivatives of it in the future." 8 9 This script processes ALL historical conversations to: 10 1. Extract nodes (concepts, insights, decisions, artifacts) 11 2. Discover edges (cross-session connections) 12 3. Build temporal graph (what built on what) 13 4. Connect past to present to future 14 5. Track insight provenance (etymology of ideas) 15 16 Usage: 17 python3 scripts/historical_backhaul.py # Full backhaul 18 python3 scripts/historical_backhaul.py --scan # Scan only (no ingest) 19 python3 scripts/historical_backhaul.py --since 2026-01-01 # From date 20 python3 scripts/historical_backhaul.py --provenance "axiom" # Query insight etymology 21 python3 scripts/historical_backhaul.py --foundational # Show Torah core insights 22 """ 23 24 import sys 25 import json 26 import re 27 import os 28 from pathlib import Path 29 from datetime import datetime, timedelta 30 from typing import List, Dict, Set, Optional, Tuple 31 from collections import defaultdict 32 from dataclasses import dataclass, field, asdict 33 import hashlib 34 35 # Add parent to path for imports 36 REPO_ROOT = Path(__file__).parent.parent 37 sys.path.insert(0, str(REPO_ROOT)) 38 39 from core.replay.conversation_parser import ConversationParser, ConversationThread 40 41 # Try to import graph sink 42 try: 43 from core.graph.sink import get_sink, GraphSink 44 HAS_GRAPH_SINK = True 45 except ImportError: 46 HAS_GRAPH_SINK = False 47 print("[backhaul] Warning: Graph sink not available") 48 49 # Paths 50 CLAUDE_PROJECTS = Path.home() / ".claude" / "projects" 51 SOVEREIGN_HOME = Path.home() / ".sovereign" 52 BACKHAUL_STATE = SOVEREIGN_HOME / "backhaul-state.json" 53 BACKHAUL_REPORT = REPO_ROOT / "sessions" / "synthesis" / "backhaul-report.md" 54 55 56 @dataclass 57 class SessionNode: 58 """A node extracted from a historical session.""" 59 id: str 60 content: str 61 node_type: str # session, concept, insight, decision, artifact, principle 62 source_session: str 63 timestamp: Optional[str] = None 64 axioms: List[str] = field(default_factory=list) 65 metadata: Dict = field(default_factory=dict) 66 67 # Provenance tracking 68 is_foundational: bool = False # Is this a core/origin insight? 69 depth: int = 0 # 0 = root, 1 = first derivative, etc. 70 derives_from: List[str] = field(default_factory=list) # Parent node IDs 71 72 73 @dataclass 74 class InsightLineage: 75 """Track the etymology of an insight through sessions.""" 76 insight_id: str 77 content: str 78 origin_session: str 79 origin_timestamp: str 80 evolution_chain: List[Dict] = field(default_factory=list) # [{session, timestamp, form}] 81 derivatives: List[str] = field(default_factory=list) # Ideas that built on this 82 is_foundational: bool = False 83 84 85 @dataclass 86 class SessionEdge: 87 """An edge connecting nodes across sessions.""" 88 source_id: str 89 target_id: str 90 edge_type: str # temporal, conceptual, builds_on, references 91 strength: float = 0.5 92 evidence: str = "" 93 94 95 @dataclass 96 class BackhaulState: 97 """State of the backhaul process.""" 98 processed_sessions: Set[str] = field(default_factory=set) 99 total_nodes: int = 0 100 total_edges: int = 0 101 last_run: Optional[str] = None 102 session_timeline: List[Dict] = field(default_factory=list) 103 104 def save(self): 105 SOVEREIGN_HOME.mkdir(parents=True, exist_ok=True) 106 with open(BACKHAUL_STATE, 'w') as f: 107 data = { 108 "processed_sessions": list(self.processed_sessions), 109 "total_nodes": self.total_nodes, 110 "total_edges": self.total_edges, 111 "last_run": self.last_run, 112 "session_timeline": self.session_timeline 113 } 114 json.dump(data, f, indent=2) 115 116 @classmethod 117 def load(cls) -> 'BackhaulState': 118 if BACKHAUL_STATE.exists(): 119 with open(BACKHAUL_STATE) as f: 120 data = json.load(f) 121 return cls( 122 processed_sessions=set(data.get("processed_sessions", [])), 123 total_nodes=data.get("total_nodes", 0), 124 total_edges=data.get("total_edges", 0), 125 last_run=data.get("last_run"), 126 session_timeline=data.get("session_timeline", []) 127 ) 128 return cls() 129 130 131 class HistoricalBackhaul: 132 """Process historical conversations and extract graph structure.""" 133 134 # Philosophical/theological markers for foundational content 135 FOUNDATIONAL_MARKERS = [ 136 # Axioms and principles 137 'axiom', 'principle', 'fundamental', 'bedrock', 'foundation', 138 'core insight', 'key insight', 'the insight', 'realization', 139 # Theological/philosophical 140 'satan', 'god', 'divine', 'sacred', 'soul', 'spirit', 'consciousness', 141 'good', 'evil', 'truth', 'beauty', 'meaning', 'purpose', 'telos', 142 'being', 'existence', 'essence', 'transcend', 'immanent', 143 # Metaphorical/symbolic 144 'markov blanket', 'boundary', 'integration', 'isolation', 145 'life', 'death', 'primitive', 'calcified', 'ornament', 146 'pole', 'dyad', 'tension', 'navigate', 'shadow', 147 'ruin', 'ergodic', 'asymmetry', 'survival', 148 # Meta-cognitive 149 'torah', 'talmud', 'compression', 'essence', 'instance', 150 'phoenix', 'resurrection', 'sovereign', 'alignment', 151 ] 152 153 # High-weight markers for truly foundational content 154 CORE_MARKERS = [ 155 'the core is', 'the key is', 'the insight is', 'this is the', 156 'fundamental truth', 'bedrock axiom', 'core principle', 157 'everything flows from', 'this is why', 'the reason is', 158 ] 159 160 def __init__(self): 161 self.parser = ConversationParser() 162 self.state = BackhaulState.load() 163 self.nodes: List[SessionNode] = [] 164 self.edges: List[SessionEdge] = [] 165 166 # Concept tracking for cross-session edge discovery 167 self.concept_sessions: Dict[str, List[str]] = defaultdict(list) # concept -> [session_ids] 168 self.artifact_sessions: Dict[str, List[str]] = defaultdict(list) # artifact -> [session_ids] 169 170 # Lineage tracking 171 self.insight_first_seen: Dict[str, str] = {} # insight_hash -> first session_id 172 self.insight_lineage: Dict[str, InsightLineage] = {} # insight_hash -> lineage 173 self.session_order: List[str] = [] # Chronological session order 174 175 def find_transcripts(self, since: Optional[datetime] = None) -> List[Path]: 176 """Find all conversation transcripts.""" 177 transcripts = [] 178 179 for jsonl in CLAUDE_PROJECTS.rglob("*.jsonl"): 180 # Skip subagent files for now (they're context of parent) 181 if "subagent" in str(jsonl): 182 continue 183 184 # Check date filter 185 if since: 186 mtime = datetime.fromtimestamp(jsonl.stat().st_mtime) 187 if mtime < since: 188 continue 189 190 transcripts.append(jsonl) 191 192 # Sort by modification time (oldest first for temporal ordering) 193 transcripts.sort(key=lambda p: p.stat().st_mtime) 194 return transcripts 195 196 def process_session(self, transcript_path: Path) -> Optional[ConversationThread]: 197 """Process a single session transcript.""" 198 session_id = transcript_path.stem 199 200 # Skip if already processed 201 if session_id in self.state.processed_sessions: 202 return None 203 204 try: 205 thread = self.parser.parse_jsonl(transcript_path) 206 207 # Only process sessions with meaningful content 208 if thread.exchange_count < 2: 209 return None 210 211 return thread 212 213 except Exception as e: 214 print(f" Error parsing {session_id}: {e}") 215 return None 216 217 def _is_foundational(self, text: str) -> Tuple[bool, float]: 218 """Check if text contains foundational/philosophical content.""" 219 text_lower = text.lower() 220 score = 0.0 221 222 # Check core markers (highest weight) 223 for marker in self.CORE_MARKERS: 224 if marker in text_lower: 225 score += 0.3 226 227 # Check foundational markers 228 marker_count = sum(1 for m in self.FOUNDATIONAL_MARKERS if m in text_lower) 229 score += marker_count * 0.05 230 231 # Check for no code (philosophical sessions often have no code) 232 has_code = any(x in text for x in ['```', 'def ', 'function ', 'class ', 'import ']) 233 if not has_code and marker_count > 3: 234 score += 0.2 235 236 is_foundational = score >= 0.3 237 return (is_foundational, min(score, 1.0)) 238 239 def _extract_principles(self, thread: ConversationThread) -> List[Dict]: 240 """Extract principle-level insights from philosophical content.""" 241 principles = [] 242 243 for msg in thread.messages: 244 if msg.role != 'assistant': 245 continue 246 247 content = msg.content 248 is_found, score = self._is_foundational(content) 249 250 if is_found: 251 # Look for principle statements 252 sentences = re.split(r'[.!?]\s+', content) 253 for sentence in sentences: 254 sentence_lower = sentence.lower() 255 256 # Check for core insight markers 257 for marker in self.CORE_MARKERS: 258 if marker in sentence_lower: 259 principles.append({ 260 "content": sentence.strip()[:300], 261 "marker": marker, 262 "score": score, 263 "timestamp": msg.timestamp.isoformat() if msg.timestamp else None 264 }) 265 break 266 267 return principles 268 269 def extract_nodes(self, thread: ConversationThread) -> List[SessionNode]: 270 """Extract graph nodes from a conversation thread.""" 271 nodes = [] 272 session_id = thread.thread_id 273 timestamp = thread.start_time.isoformat() if thread.start_time else None 274 275 # Track session order for lineage 276 self.session_order.append(session_id) 277 278 # Check if this is a foundational session 279 all_text = ' '.join(m.content for m in thread.messages) 280 is_foundational_session, foundation_score = self._is_foundational(all_text) 281 282 # Session node (represents the conversation itself) 283 session_node = SessionNode( 284 id=f"SESSION-{session_id}", 285 content=f"Session: {', '.join(thread.topics[:3]) if thread.topics else 'conversation'}", 286 node_type="session", 287 source_session=session_id, 288 timestamp=timestamp, 289 axioms=[thread.dominant_axiom[0]] if thread.dominant_axiom else [], 290 is_foundational=is_foundational_session, 291 metadata={ 292 "exchanges": thread.exchange_count, 293 "duration_min": thread.duration_minutes, 294 "is_signal_rich": thread.is_signal_rich, 295 "peak_engagement": thread.peak_engagement, 296 "foundation_score": foundation_score 297 } 298 ) 299 nodes.append(session_node) 300 301 # Concept nodes 302 for concept in thread.concepts: 303 node_id = f"CONCEPT-{hashlib.md5(concept.encode()).hexdigest()[:12]}" 304 nodes.append(SessionNode( 305 id=node_id, 306 content=concept, 307 node_type="concept", 308 source_session=session_id, 309 timestamp=timestamp 310 )) 311 self.concept_sessions[concept].append(session_id) 312 313 # Insight nodes 314 for insight in thread.key_insights: 315 node_id = f"INSIGHT-{hashlib.md5(insight.encode()).hexdigest()[:12]}" 316 nodes.append(SessionNode( 317 id=node_id, 318 content=insight, 319 node_type="insight", 320 source_session=session_id, 321 timestamp=timestamp 322 )) 323 324 # Decision nodes 325 for decision in thread.decisions: 326 node_id = f"DECISION-{hashlib.md5(decision.encode()).hexdigest()[:12]}" 327 nodes.append(SessionNode( 328 id=node_id, 329 content=decision, 330 node_type="decision", 331 source_session=session_id, 332 timestamp=timestamp 333 )) 334 335 # Artifact nodes 336 for artifact in thread.artifacts: 337 node_id = f"ARTIFACT-{hashlib.md5(artifact.encode()).hexdigest()[:12]}" 338 nodes.append(SessionNode( 339 id=node_id, 340 content=artifact, 341 node_type="artifact", 342 source_session=session_id, 343 timestamp=timestamp 344 )) 345 self.artifact_sessions[artifact].append(session_id) 346 347 # High-resonance segment nodes 348 for seg in thread.high_resonance_segments[:5]: # Top 5 349 content = seg.get("preview", "") 350 if len(content) > 20: 351 node_id = f"SEGMENT-{hashlib.md5(content.encode()).hexdigest()[:12]}" 352 dominant = seg.get("dominant") 353 nodes.append(SessionNode( 354 id=node_id, 355 content=content, 356 node_type="high_resonance", 357 source_session=session_id, 358 timestamp=timestamp, 359 axioms=[dominant[0]] if dominant else [], 360 metadata={"resonance": seg.get("resonance", 0)} 361 )) 362 363 # Extract foundational principles (THE TORAH) 364 principles = self._extract_principles(thread) 365 for i, principle in enumerate(principles[:10]): # Limit per session 366 content = principle["content"] 367 node_id = f"PRINCIPLE-{hashlib.md5(content.encode()).hexdigest()[:12]}" 368 369 # Track lineage - is this the first time we see this? 370 content_hash = hashlib.md5(content[:100].lower().encode()).hexdigest()[:16] 371 is_origin = content_hash not in self.insight_first_seen 372 derives_from = [] 373 374 if is_origin: 375 self.insight_first_seen[content_hash] = session_id 376 self.insight_lineage[content_hash] = InsightLineage( 377 insight_id=node_id, 378 content=content, 379 origin_session=session_id, 380 origin_timestamp=timestamp or "", 381 is_foundational=True 382 ) 383 else: 384 # This is a derivative - link to origin 385 origin_session = self.insight_first_seen[content_hash] 386 derives_from = [f"SESSION-{origin_session}"] 387 if content_hash in self.insight_lineage: 388 self.insight_lineage[content_hash].evolution_chain.append({ 389 "session": session_id, 390 "timestamp": timestamp, 391 "form": content[:100] 392 }) 393 394 nodes.append(SessionNode( 395 id=node_id, 396 content=content, 397 node_type="principle", 398 source_session=session_id, 399 timestamp=timestamp, 400 is_foundational=is_origin, 401 depth=0 if is_origin else len(self.insight_lineage.get(content_hash, InsightLineage("", "", "", "")).evolution_chain), 402 derives_from=derives_from, 403 metadata={ 404 "marker": principle["marker"], 405 "score": principle["score"], 406 "content_hash": content_hash 407 } 408 )) 409 410 return nodes 411 412 def discover_edges(self) -> List[SessionEdge]: 413 """Discover edges across all processed sessions.""" 414 edges = [] 415 416 # Lineage edges (MOST IMPORTANT - insight provenance) 417 for node in self.nodes: 418 if node.derives_from: 419 for parent_id in node.derives_from: 420 edges.append(SessionEdge( 421 source_id=parent_id, 422 target_id=node.id, 423 edge_type="derives_from", 424 strength=0.9, 425 evidence=f"Insight builds on origin from {parent_id}" 426 )) 427 428 # Temporal edges (session A -> session B if B came after A) 429 timeline = sorted(self.state.session_timeline, key=lambda x: x.get("timestamp", "")) 430 for i in range(1, len(timeline)): 431 prev = timeline[i-1] 432 curr = timeline[i] 433 edges.append(SessionEdge( 434 source_id=f"SESSION-{prev['session_id']}", 435 target_id=f"SESSION-{curr['session_id']}", 436 edge_type="temporal", 437 strength=0.3, 438 evidence="Sequential sessions" 439 )) 440 441 # Concept-based edges (sessions that share concepts) 442 for concept, sessions in self.concept_sessions.items(): 443 if len(sessions) > 1: 444 # Connect all sessions that share this concept 445 concept_id = f"CONCEPT-{hashlib.md5(concept.encode()).hexdigest()[:12]}" 446 for session_id in sessions: 447 edges.append(SessionEdge( 448 source_id=f"SESSION-{session_id}", 449 target_id=concept_id, 450 edge_type="discusses", 451 strength=0.6, 452 evidence=f"Session discusses '{concept}'" 453 )) 454 455 # Artifact-based edges (sessions that touch same files) 456 for artifact, sessions in self.artifact_sessions.items(): 457 if len(sessions) > 1: 458 artifact_id = f"ARTIFACT-{hashlib.md5(artifact.encode()).hexdigest()[:12]}" 459 for i, session_id in enumerate(sessions): 460 edge_type = "creates" if i == 0 else "modifies" 461 edges.append(SessionEdge( 462 source_id=f"SESSION-{session_id}", 463 target_id=artifact_id, 464 edge_type=edge_type, 465 strength=0.8, 466 evidence=f"Session {edge_type} '{artifact}'" 467 )) 468 469 return edges 470 471 def ingest_to_graph(self, nodes: List[SessionNode], edges: List[SessionEdge]): 472 """Ingest extracted nodes and edges to the graph sink.""" 473 if not HAS_GRAPH_SINK: 474 print("[backhaul] Graph sink not available - skipping ingest") 475 return 476 477 sink = get_sink() 478 479 for node in nodes: 480 sink.ingest( 481 content=node.content, 482 node_type=node.node_type, 483 source=f"backhaul:{node.source_session}", 484 metadata={ 485 "backhaul_id": node.id, 486 "timestamp": node.timestamp, 487 "axioms": node.axioms, 488 **node.metadata 489 } 490 ) 491 492 for edge in edges: 493 sink.connect( 494 source_id=edge.source_id, 495 target_id=edge.target_id, 496 edge_type=edge.edge_type, 497 strength=edge.strength, 498 source="backhaul" 499 ) 500 501 result = sink.flush() 502 return result 503 504 def run(self, scan_only: bool = False, since: Optional[datetime] = None): 505 """Run the full backhaul process.""" 506 print("=" * 60) 507 print("HISTORICAL BACKHAUL") 508 print("Extracting value from linear time") 509 print("=" * 60) 510 print() 511 512 # Find transcripts 513 transcripts = self.find_transcripts(since=since) 514 new_transcripts = [t for t in transcripts if t.stem not in self.state.processed_sessions] 515 516 print(f"Found {len(transcripts)} total transcripts") 517 print(f" Already processed: {len(self.state.processed_sessions)}") 518 print(f" New to process: {len(new_transcripts)}") 519 print() 520 521 if scan_only: 522 print("[Scan mode - not ingesting]") 523 return 524 525 if not new_transcripts: 526 print("No new transcripts to process.") 527 return 528 529 # Process each new transcript 530 total_nodes = 0 531 total_sessions = 0 532 533 for i, transcript_path in enumerate(new_transcripts): 534 session_id = transcript_path.stem 535 print(f"[{i+1}/{len(new_transcripts)}] Processing {session_id[:20]}...") 536 537 thread = self.process_session(transcript_path) 538 if not thread: 539 continue 540 541 # Extract nodes 542 nodes = self.extract_nodes(thread) 543 self.nodes.extend(nodes) 544 total_nodes += len(nodes) 545 total_sessions += 1 546 547 # Track in timeline 548 self.state.session_timeline.append({ 549 "session_id": session_id, 550 "timestamp": thread.start_time.isoformat() if thread.start_time else None, 551 "exchanges": thread.exchange_count, 552 "topics": thread.topics[:3], 553 "is_signal_rich": thread.is_signal_rich 554 }) 555 556 # Mark as processed 557 self.state.processed_sessions.add(session_id) 558 559 # Progress 560 if (i + 1) % 10 == 0: 561 print(f" ... {i+1} sessions, {total_nodes} nodes extracted") 562 563 print() 564 print(f"Extracted {total_nodes} nodes from {total_sessions} sessions") 565 566 # Discover edges 567 print("\nDiscovering cross-session edges...") 568 edges = self.discover_edges() 569 self.edges.extend(edges) 570 print(f" Found {len(edges)} edges") 571 572 # Ingest to graph 573 if HAS_GRAPH_SINK: 574 print("\nIngesting to graph...") 575 result = self.ingest_to_graph(self.nodes, self.edges) 576 if result: 577 print(f" Ingested: {result.get('nodes_added', 0)} nodes, {result.get('edges_added', 0)} edges") 578 579 # Update state 580 self.state.total_nodes += total_nodes 581 self.state.total_edges += len(edges) 582 self.state.last_run = datetime.now().isoformat() 583 self.state.save() 584 585 # Generate report 586 self.generate_report(total_sessions, total_nodes, len(edges)) 587 588 print() 589 print("=" * 60) 590 print("BACKHAUL COMPLETE") 591 print(f" Sessions processed: {total_sessions}") 592 print(f" Nodes extracted: {total_nodes}") 593 print(f" Edges discovered: {len(edges)}") 594 print(f" Report: {BACKHAUL_REPORT}") 595 print("=" * 60) 596 597 def query_provenance(self, query: str) -> List[Dict]: 598 """Query for insight provenance - trace etymology of an idea.""" 599 query_lower = query.lower() 600 results = [] 601 602 for content_hash, lineage in self.insight_lineage.items(): 603 if query_lower in lineage.content.lower(): 604 results.append({ 605 "insight": lineage.content[:200], 606 "origin_session": lineage.origin_session, 607 "origin_timestamp": lineage.origin_timestamp, 608 "is_foundational": lineage.is_foundational, 609 "evolution_count": len(lineage.evolution_chain), 610 "evolution_chain": lineage.evolution_chain[:5] 611 }) 612 613 # Sort by foundational first, then by evolution count 614 results.sort(key=lambda x: (-x["is_foundational"], -x["evolution_count"])) 615 return results 616 617 def get_foundational_insights(self) -> List[Dict]: 618 """Get all foundational (origin) insights.""" 619 foundational = [] 620 for node in self.nodes: 621 if node.is_foundational and node.node_type == "principle": 622 foundational.append({ 623 "id": node.id, 624 "content": node.content, 625 "source_session": node.source_session, 626 "timestamp": node.timestamp, 627 "axioms": node.axioms, 628 "score": node.metadata.get("score", 0) 629 }) 630 631 # Sort by score (highest first) 632 foundational.sort(key=lambda x: -x.get("score", 0)) 633 return foundational 634 635 def generate_report(self, sessions: int, nodes: int, edges: int): 636 """Generate backhaul report.""" 637 638 # Get foundational insights for report 639 foundational = self.get_foundational_insights() 640 641 report = f"""# Historical Backhaul Report 642 643 *Generated: {datetime.now().strftime('%Y-%m-%d %H:%M')}* 644 645 --- 646 647 ## Summary 648 649 - **Sessions processed this run:** {sessions} 650 - **Nodes extracted:** {nodes} 651 - **Edges discovered:** {edges} 652 - **Total sessions processed (all time):** {len(self.state.processed_sessions)} 653 - **Total nodes (all time):** {self.state.total_nodes} 654 - **Total edges (all time):** {self.state.total_edges} 655 656 --- 657 658 ## Concept Distribution 659 660 Top concepts appearing across multiple sessions: 661 662 """ 663 # Top shared concepts 664 sorted_concepts = sorted( 665 self.concept_sessions.items(), 666 key=lambda x: len(x[1]), 667 reverse=True 668 )[:20] 669 670 for concept, sessions in sorted_concepts: 671 if len(sessions) > 1: 672 report += f"- **{concept}**: {len(sessions)} sessions\n" 673 674 report += """ 675 676 --- 677 678 ## Artifact Timeline 679 680 Files touched across sessions: 681 682 """ 683 # Top shared artifacts 684 sorted_artifacts = sorted( 685 self.artifact_sessions.items(), 686 key=lambda x: len(x[1]), 687 reverse=True 688 )[:15] 689 690 for artifact, sessions in sorted_artifacts: 691 if len(sessions) > 1: 692 report += f"- `{artifact}`: {len(sessions)} sessions\n" 693 694 report += """ 695 696 --- 697 698 ## Foundational Insights (Torah Core) 699 700 These are origin insights - the bedrock from which derivatives flow: 701 702 """ 703 # Include foundational insights 704 if foundational: 705 for i, insight in enumerate(foundational[:15]): # Top 15 706 score = insight.get("score", 0) 707 session = insight.get("source_session", "")[:20] 708 content = insight.get("content", "")[:150] 709 report += f"{i+1}. **[Score: {score:.2f}]** {content}...\n" 710 report += f" *Origin: {session}*\n\n" 711 else: 712 report += "*No foundational insights extracted yet.*\n" 713 714 report += """ 715 --- 716 717 ## Insight Lineage (Etymology) 718 719 Insights that evolved across multiple sessions: 720 721 """ 722 # Show insights with lineage 723 lineage_items = [ 724 (h, l) for h, l in self.insight_lineage.items() 725 if len(l.evolution_chain) > 0 726 ] 727 lineage_items.sort(key=lambda x: len(x[1].evolution_chain), reverse=True) 728 729 for content_hash, lineage in lineage_items[:10]: 730 report += f"### {lineage.content[:80]}...\n\n" 731 report += f"- **Origin:** {lineage.origin_session[:20]} ({lineage.origin_timestamp[:10] if lineage.origin_timestamp else 'unknown'})\n" 732 report += f"- **Evolutions:** {len(lineage.evolution_chain)}\n" 733 for evo in lineage.evolution_chain[:3]: 734 report += f" - {evo.get('session', '')[:20]} → {evo.get('form', '')[:50]}...\n" 735 report += "\n" 736 737 if not lineage_items: 738 report += "*No multi-session lineages discovered yet.*\n" 739 740 report += """ 741 --- 742 743 ## Timeline (Recent Sessions) 744 745 """ 746 for entry in self.state.session_timeline[-20:]: 747 ts = entry.get("timestamp", "unknown")[:10] 748 topics = ", ".join(entry.get("topics", [])[:2]) or "general" 749 signal = "📡" if entry.get("is_signal_rich") else "" 750 report += f"- [{ts}] {topics} ({entry.get('exchanges', 0)} exchanges) {signal}\n" 751 752 report += """ 753 754 --- 755 756 ## Provenance Query 757 758 To trace the etymology of any insight: 759 ``` 760 python3 scripts/historical_backhaul.py --provenance "your query" 761 ``` 762 763 --- 764 765 *Generated by Historical Backhaul* 766 """ 767 768 BACKHAUL_REPORT.parent.mkdir(parents=True, exist_ok=True) 769 BACKHAUL_REPORT.write_text(report) 770 771 772 def main(): 773 scan_only = "--scan" in sys.argv 774 since = None 775 provenance_query = None 776 show_foundational = "--foundational" in sys.argv 777 778 # Parse arguments 779 for i, arg in enumerate(sys.argv): 780 if arg == "--since" and i + 1 < len(sys.argv): 781 try: 782 since = datetime.fromisoformat(sys.argv[i + 1]) 783 except: 784 print(f"Invalid date format: {sys.argv[i + 1]}") 785 sys.exit(1) 786 elif arg == "--provenance" and i + 1 < len(sys.argv): 787 provenance_query = sys.argv[i + 1] 788 789 backhaul = HistoricalBackhaul() 790 791 # Handle provenance query 792 if provenance_query: 793 print("=" * 60) 794 print(f"PROVENANCE QUERY: '{provenance_query}'") 795 print("=" * 60) 796 print() 797 798 # Load existing state and rebuild lineage from nodes 799 # First do a scan to build the lineage 800 transcripts = backhaul.find_transcripts(since=None) 801 processed = [t for t in transcripts if t.stem in backhaul.state.processed_sessions] 802 803 if not processed: 804 print("No sessions processed yet. Run full backhaul first.") 805 return 806 807 print(f"Scanning {len(processed)} processed sessions for lineage...") 808 for transcript_path in processed: 809 # Temporarily allow reprocessing 810 session_id = transcript_path.stem 811 backhaul.state.processed_sessions.discard(session_id) 812 thread = backhaul.process_session(transcript_path) 813 if thread: 814 nodes = backhaul.extract_nodes(thread) 815 backhaul.nodes.extend(nodes) 816 # Mark as processed again 817 backhaul.state.processed_sessions.add(session_id) 818 819 results = backhaul.query_provenance(provenance_query) 820 821 if not results: 822 print("No matching insights found.") 823 else: 824 print(f"Found {len(results)} matching insights:\n") 825 for i, r in enumerate(results[:10]): 826 print(f"{i+1}. [{r['origin_session'][:20]}] {r['insight'][:100]}...") 827 print(f" Origin: {r['origin_timestamp'][:10] if r['origin_timestamp'] else 'unknown'}") 828 print(f" Foundational: {'Yes' if r['is_foundational'] else 'No'}") 829 print(f" Evolutions: {r['evolution_count']}") 830 if r['evolution_chain']: 831 print(f" Chain:") 832 for evo in r['evolution_chain'][:3]: 833 print(f" -> {evo.get('session', '')[:20]}: {evo.get('form', '')[:40]}...") 834 print() 835 return 836 837 # Handle foundational query 838 if show_foundational: 839 print("=" * 60) 840 print("FOUNDATIONAL INSIGHTS (Torah Core)") 841 print("=" * 60) 842 print() 843 844 # Rebuild from processed sessions 845 transcripts = backhaul.find_transcripts(since=None) 846 processed = [t for t in transcripts if t.stem in backhaul.state.processed_sessions] 847 848 if not processed: 849 print("No sessions processed yet. Run full backhaul first.") 850 return 851 852 print(f"Scanning {len(processed)} processed sessions...") 853 for transcript_path in processed: 854 session_id = transcript_path.stem 855 backhaul.state.processed_sessions.discard(session_id) 856 thread = backhaul.process_session(transcript_path) 857 if thread: 858 nodes = backhaul.extract_nodes(thread) 859 backhaul.nodes.extend(nodes) 860 backhaul.state.processed_sessions.add(session_id) 861 862 foundational = backhaul.get_foundational_insights() 863 864 if not foundational: 865 print("No foundational insights found.") 866 else: 867 print(f"Found {len(foundational)} foundational insights:\n") 868 for i, f in enumerate(foundational[:20]): 869 print(f"{i+1}. [Score: {f['score']:.2f}] {f['content'][:120]}...") 870 print(f" Origin: {f['source_session'][:20]}") 871 print() 872 return 873 874 # Normal backhaul run 875 backhaul.run(scan_only=scan_only, since=since) 876 877 878 if __name__ == "__main__": 879 main()