graph_integration.py
1 #!/usr/bin/env python3 2 """ 3 Graph Integration Layer 4 5 Connects the Sovereign_Estate graph (atoms, edges, aha moments) with the 6 Sovereign_OS attention system and server. 7 8 Data flows: 9 stream_session.py → atoms.jsonl → GraphIntegration → Server endpoints 10 historical_processor.py → edges.jsonl → GraphIntegration → Correlation 11 edge_prediction.py → aha events → GraphIntegration → Alert system 12 13 This module: 14 1. Watches graph data files for changes 15 2. Streams new atoms/edges to the attention system 16 3. Exposes graph state via server endpoints 17 4. Correlates graph events with other sources (Monologue, Comet, artifacts) 18 """ 19 20 import json 21 import os 22 from dataclasses import dataclass, field 23 from datetime import datetime, timedelta 24 from pathlib import Path 25 from typing import Dict, List, Optional, Set, Any, Callable, Generator 26 from collections import defaultdict 27 import threading 28 import time 29 30 from .signal_words import detect_signals, SignalDetection 31 32 33 @dataclass 34 class GraphAtom: 35 """An atom from the semantic graph.""" 36 uuid: str 37 content: str 38 source_type: str # 'user', 'assistant' 39 session_id: str 40 timestamp: datetime 41 parent_uuid: Optional[str] = None 42 43 # Tags 44 visible_tags: List[str] = field(default_factory=list) 45 meta_tags: Dict[str, Any] = field(default_factory=dict) 46 47 # Edges 48 edges: List[Dict[str, Any]] = field(default_factory=list) 49 50 # Analysis 51 signals: Optional[SignalDetection] = None 52 weight: float = 1.0 53 54 @property 55 def altitude(self) -> str: 56 """Get altitude from meta tags (philosophical/strategic/operational/general).""" 57 return self.meta_tags.get('altitude', 'general') 58 59 @property 60 def resonance(self) -> float: 61 """Get resonance score from meta tags.""" 62 return self.meta_tags.get('resonance') or 0.5 63 64 65 @dataclass 66 class GraphEdge: 67 """An edge between atoms.""" 68 source: str 69 target: str 70 edge_type: str # follows, relates_to, extends, contradicts, etc. 71 strength: float 72 formed_at: datetime 73 formation_context: str = "" 74 75 76 @dataclass 77 class GraphState: 78 """Current state of the semantic graph.""" 79 atom_count: int = 0 80 edge_count: int = 0 81 last_atom_time: Optional[datetime] = None 82 last_edge_time: Optional[datetime] = None 83 84 # Distributions 85 atoms_by_session: Dict[str, int] = field(default_factory=dict) 86 atoms_by_altitude: Dict[str, int] = field(default_factory=dict) 87 edges_by_type: Dict[str, int] = field(default_factory=dict) 88 89 # Recent high-value items 90 high_resonance_atoms: List[str] = field(default_factory=list) 91 gravity_wells: List[str] = field(default_factory=list) # Topics with 10+ edges 92 93 94 @dataclass 95 class GraphEvent: 96 """An event from the graph (new atom, edge, aha, etc.).""" 97 event_type: str # 'atom', 'edge', 'aha', 'gravity_well' 98 timestamp: datetime 99 data: Dict[str, Any] 100 101 102 class GraphIntegration: 103 """ 104 Integrates the Sovereign_Estate semantic graph with Sovereign_OS. 105 106 Provides: 107 - Real-time streaming of graph updates 108 - Query interface for graph state 109 - Correlation with other data sources 110 - Event subscription for new atoms/edges/ahas 111 """ 112 113 # Default paths (from Sovereign_Estate) 114 DEFAULT_GRAPH_DIR = Path.home() / 'repos/Sovereign_Estate/data/graph' 115 DEFAULT_ATOMS_DIR = DEFAULT_GRAPH_DIR / 'atoms' 116 DEFAULT_EDGES_DIR = DEFAULT_GRAPH_DIR / 'edges' 117 118 def __init__( 119 self, 120 graph_dir: Optional[str] = None, 121 watch_interval: float = 5.0 122 ): 123 self.graph_dir = Path(graph_dir) if graph_dir else self.DEFAULT_GRAPH_DIR 124 self.atoms_dir = self.graph_dir / 'atoms' 125 self.edges_dir = self.graph_dir / 'edges' 126 self.watch_interval = watch_interval 127 128 # State 129 self._atoms: Dict[str, GraphAtom] = {} 130 self._edges: List[GraphEdge] = [] 131 self._state = GraphState() 132 133 # File tracking 134 self._atom_file_positions: Dict[str, int] = {} 135 self._edge_file_positions: Dict[str, int] = {} 136 137 # Event subscribers 138 self._subscribers: Dict[str, List[Callable]] = defaultdict(list) 139 140 # Threading 141 self._watching = False 142 self._watch_thread: Optional[threading.Thread] = None 143 144 def subscribe(self, event_type: str, callback: Callable[[GraphEvent], None]): 145 """Subscribe to graph events (atom, edge, aha, gravity_well).""" 146 self._subscribers[event_type].append(callback) 147 148 def _emit(self, event_type: str, data: Dict[str, Any]): 149 """Emit an event to subscribers.""" 150 event = GraphEvent( 151 event_type=event_type, 152 timestamp=datetime.now(), 153 data=data 154 ) 155 for callback in self._subscribers.get(event_type, []): 156 try: 157 callback(event) 158 except Exception as e: 159 print(f"[GraphIntegration] Subscriber error: {e}") 160 161 # Also emit to 'all' subscribers 162 for callback in self._subscribers.get('all', []): 163 try: 164 callback(event) 165 except Exception as e: 166 pass 167 168 def load_today(self) -> GraphState: 169 """Load today's graph data.""" 170 today = datetime.now().strftime('%Y-%m-%d') 171 172 # Load atoms 173 atoms_file = self.atoms_dir / f'{today}-atoms.jsonl' 174 if atoms_file.exists(): 175 self._load_atoms_file(atoms_file) 176 177 # Load edges 178 edges_file = self.edges_dir / f'{today}-edges.jsonl' 179 if edges_file.exists(): 180 self._load_edges_file(edges_file) 181 182 self._update_state() 183 return self._state 184 185 def load_range(self, days_back: int = 7) -> GraphState: 186 """Load graph data for a date range.""" 187 for i in range(days_back): 188 date = (datetime.now() - timedelta(days=i)).strftime('%Y-%m-%d') 189 190 atoms_file = self.atoms_dir / f'{date}-atoms.jsonl' 191 if atoms_file.exists(): 192 self._load_atoms_file(atoms_file) 193 194 edges_file = self.edges_dir / f'{date}-edges.jsonl' 195 if edges_file.exists(): 196 self._load_edges_file(edges_file) 197 198 self._update_state() 199 return self._state 200 201 def _load_atoms_file(self, filepath: Path, from_position: int = 0) -> int: 202 """Load atoms from a JSONL file, starting from position.""" 203 new_atoms = 0 204 205 try: 206 with open(filepath, 'r') as f: 207 if from_position > 0: 208 f.seek(from_position) 209 210 for line in f: 211 line = line.strip() 212 if not line: 213 continue 214 215 try: 216 data = json.loads(line) 217 atom = self._parse_atom(data) 218 if atom and atom.uuid not in self._atoms: 219 self._atoms[atom.uuid] = atom 220 new_atoms += 1 221 222 # Emit event 223 self._emit('atom', { 224 'uuid': atom.uuid, 225 'content': atom.content[:200], 226 'source_type': atom.source_type, 227 'altitude': atom.altitude, 228 'resonance': atom.resonance, 229 'tags': atom.visible_tags 230 }) 231 232 except json.JSONDecodeError: 233 continue 234 235 # Track position 236 self._atom_file_positions[str(filepath)] = f.tell() 237 238 except Exception as e: 239 print(f"[GraphIntegration] Error loading atoms: {e}") 240 241 return new_atoms 242 243 def _load_edges_file(self, filepath: Path, from_position: int = 0) -> int: 244 """Load edges from a JSONL file.""" 245 new_edges = 0 246 247 try: 248 with open(filepath, 'r') as f: 249 if from_position > 0: 250 f.seek(from_position) 251 252 for line in f: 253 line = line.strip() 254 if not line: 255 continue 256 257 try: 258 data = json.loads(line) 259 edge = self._parse_edge(data) 260 if edge: 261 self._edges.append(edge) 262 new_edges += 1 263 264 # Emit event 265 self._emit('edge', { 266 'source': edge.source, 267 'target': edge.target, 268 'type': edge.edge_type, 269 'strength': edge.strength 270 }) 271 272 except json.JSONDecodeError: 273 continue 274 275 self._edge_file_positions[str(filepath)] = f.tell() 276 277 except Exception as e: 278 print(f"[GraphIntegration] Error loading edges: {e}") 279 280 return new_edges 281 282 def _parse_atom(self, data: Dict[str, Any]) -> Optional[GraphAtom]: 283 """Parse atom from JSON data.""" 284 try: 285 # Parse timestamp 286 ts_str = data.get('timestamp', '') 287 try: 288 timestamp = datetime.fromisoformat(ts_str.replace('Z', '+00:00')) 289 timestamp = timestamp.replace(tzinfo=None) 290 except: 291 timestamp = datetime.now() 292 293 atom = GraphAtom( 294 uuid=data.get('uuid', ''), 295 content=data.get('content', ''), 296 source_type=data.get('source_type', 'unknown'), 297 session_id=data.get('session_id', ''), 298 timestamp=timestamp, 299 parent_uuid=data.get('parent_uuid'), 300 visible_tags=data.get('visible_tags', []), 301 meta_tags=data.get('meta_tags', {}), 302 edges=data.get('edges', []) 303 ) 304 305 # Analyze signals 306 atom.signals = detect_signals(atom.content) 307 atom.weight = atom.signals.weight_modifier * (1 + atom.resonance) 308 309 return atom 310 311 except Exception as e: 312 return None 313 314 def _parse_edge(self, data: Dict[str, Any]) -> Optional[GraphEdge]: 315 """Parse edge from JSON data.""" 316 try: 317 ts_str = data.get('formed_at', '') 318 try: 319 timestamp = datetime.fromisoformat(ts_str) 320 except: 321 timestamp = datetime.now() 322 323 return GraphEdge( 324 source=data.get('source', ''), 325 target=data.get('target', ''), 326 edge_type=data.get('edge_type', 'relates_to'), 327 strength=data.get('strength', 0.5), 328 formed_at=timestamp, 329 formation_context=data.get('formation_context', '') 330 ) 331 except: 332 return None 333 334 def _update_state(self): 335 """Update graph state statistics.""" 336 self._state.atom_count = len(self._atoms) 337 self._state.edge_count = len(self._edges) 338 339 # Reset distributions 340 self._state.atoms_by_session = defaultdict(int) 341 self._state.atoms_by_altitude = defaultdict(int) 342 self._state.edges_by_type = defaultdict(int) 343 344 # Compute distributions 345 for atom in self._atoms.values(): 346 self._state.atoms_by_session[atom.session_id[:8]] += 1 347 self._state.atoms_by_altitude[atom.altitude] += 1 348 if atom.timestamp and (not self._state.last_atom_time or 349 atom.timestamp > self._state.last_atom_time): 350 self._state.last_atom_time = atom.timestamp 351 352 for edge in self._edges: 353 self._state.edges_by_type[edge.edge_type] += 1 354 if edge.formed_at and (not self._state.last_edge_time or 355 edge.formed_at > self._state.last_edge_time): 356 self._state.last_edge_time = edge.formed_at 357 358 # Find high resonance atoms 359 sorted_atoms = sorted( 360 self._atoms.values(), 361 key=lambda a: a.resonance, 362 reverse=True 363 ) 364 self._state.high_resonance_atoms = [a.uuid for a in sorted_atoms[:20]] 365 366 # Find gravity wells (topics with many edges) 367 edge_counts = defaultdict(int) 368 for edge in self._edges: 369 edge_counts[edge.source] += 1 370 edge_counts[edge.target] += 1 371 372 self._state.gravity_wells = [ 373 uuid for uuid, count in edge_counts.items() 374 if count >= 10 375 ] 376 377 # ==================== Query Interface ==================== 378 379 def get_state(self) -> Dict[str, Any]: 380 """Get current graph state.""" 381 return { 382 'atom_count': self._state.atom_count, 383 'edge_count': self._state.edge_count, 384 'last_atom_time': self._state.last_atom_time.isoformat() if self._state.last_atom_time else None, 385 'last_edge_time': self._state.last_edge_time.isoformat() if self._state.last_edge_time else None, 386 'atoms_by_altitude': dict(self._state.atoms_by_altitude), 387 'edges_by_type': dict(self._state.edges_by_type), 388 'gravity_well_count': len(self._state.gravity_wells), 389 'high_resonance_count': len(self._state.high_resonance_atoms) 390 } 391 392 def get_atom(self, uuid: str) -> Optional[Dict[str, Any]]: 393 """Get a specific atom by UUID.""" 394 atom = self._atoms.get(uuid) 395 if not atom: 396 return None 397 398 return { 399 'uuid': atom.uuid, 400 'content': atom.content, 401 'source_type': atom.source_type, 402 'session_id': atom.session_id, 403 'timestamp': atom.timestamp.isoformat(), 404 'altitude': atom.altitude, 405 'resonance': atom.resonance, 406 'weight': atom.weight, 407 'visible_tags': atom.visible_tags, 408 'edge_count': len(atom.edges) 409 } 410 411 def get_recent_atoms( 412 self, 413 limit: int = 50, 414 source_type: Optional[str] = None, 415 min_weight: float = 0.0 416 ) -> List[Dict[str, Any]]: 417 """Get recent atoms, optionally filtered.""" 418 atoms = sorted( 419 self._atoms.values(), 420 key=lambda a: a.timestamp, 421 reverse=True 422 ) 423 424 if source_type: 425 atoms = [a for a in atoms if a.source_type == source_type] 426 427 if min_weight > 0: 428 atoms = [a for a in atoms if a.weight >= min_weight] 429 430 return [ 431 { 432 'uuid': a.uuid, 433 'content': a.content[:300], 434 'source_type': a.source_type, 435 'timestamp': a.timestamp.isoformat(), 436 'altitude': a.altitude, 437 'resonance': a.resonance, 438 'weight': a.weight, 439 'tags': a.visible_tags 440 } 441 for a in atoms[:limit] 442 ] 443 444 def get_high_resonance_atoms(self, limit: int = 20) -> List[Dict[str, Any]]: 445 """Get atoms with highest resonance scores.""" 446 atoms = sorted( 447 self._atoms.values(), 448 key=lambda a: a.resonance, 449 reverse=True 450 ) 451 452 return [ 453 { 454 'uuid': a.uuid, 455 'content': a.content[:300], 456 'resonance': a.resonance, 457 'altitude': a.altitude, 458 'tags': a.visible_tags 459 } 460 for a in atoms[:limit] 461 ] 462 463 def get_gravity_wells(self) -> List[Dict[str, Any]]: 464 """Get gravity wells (highly connected nodes).""" 465 wells = [] 466 467 for uuid in self._state.gravity_wells: 468 atom = self._atoms.get(uuid) 469 if atom: 470 # Count incoming/outgoing edges 471 incoming = sum(1 for e in self._edges if e.target == uuid) 472 outgoing = sum(1 for e in self._edges if e.source == uuid) 473 474 wells.append({ 475 'uuid': uuid, 476 'content': atom.content[:200], 477 'incoming_edges': incoming, 478 'outgoing_edges': outgoing, 479 'total_edges': incoming + outgoing, 480 'altitude': atom.altitude 481 }) 482 483 return sorted(wells, key=lambda w: -w['total_edges']) 484 485 def search_atoms(self, query: str, limit: int = 20) -> List[Dict[str, Any]]: 486 """Simple text search across atoms.""" 487 query_lower = query.lower() 488 matches = [] 489 490 for atom in self._atoms.values(): 491 if query_lower in atom.content.lower(): 492 matches.append(atom) 493 494 # Sort by relevance (basic: prefer shorter content with match) 495 matches.sort(key=lambda a: ( 496 -a.weight, 497 len(a.content) 498 )) 499 500 return [ 501 { 502 'uuid': a.uuid, 503 'content': a.content[:300], 504 'weight': a.weight, 505 'timestamp': a.timestamp.isoformat() 506 } 507 for a in matches[:limit] 508 ] 509 510 # ==================== Streaming ==================== 511 512 def start_watching(self): 513 """Start watching for graph updates in background.""" 514 if self._watching: 515 return 516 517 self._watching = True 518 self._watch_thread = threading.Thread(target=self._watch_loop, daemon=True) 519 self._watch_thread.start() 520 print("[GraphIntegration] Started watching graph files") 521 522 def stop_watching(self): 523 """Stop watching for updates.""" 524 self._watching = False 525 if self._watch_thread: 526 self._watch_thread.join(timeout=2) 527 print("[GraphIntegration] Stopped watching") 528 529 def _watch_loop(self): 530 """Background loop to watch for file changes.""" 531 while self._watching: 532 try: 533 today = datetime.now().strftime('%Y-%m-%d') 534 535 # Check atoms file 536 atoms_file = self.atoms_dir / f'{today}-atoms.jsonl' 537 if atoms_file.exists(): 538 pos = self._atom_file_positions.get(str(atoms_file), 0) 539 current_size = atoms_file.stat().st_size 540 if current_size > pos: 541 new = self._load_atoms_file(atoms_file, pos) 542 if new > 0: 543 self._update_state() 544 545 # Check edges file 546 edges_file = self.edges_dir / f'{today}-edges.jsonl' 547 if edges_file.exists(): 548 pos = self._edge_file_positions.get(str(edges_file), 0) 549 current_size = edges_file.stat().st_size 550 if current_size > pos: 551 new = self._load_edges_file(edges_file, pos) 552 if new > 0: 553 self._update_state() 554 555 # Check for new gravity wells 556 self._check_gravity_wells() 557 558 except Exception as e: 559 print(f"[GraphIntegration] Watch error: {e}") 560 561 time.sleep(self.watch_interval) 562 563 def _check_gravity_wells(self): 564 """Check for new gravity wells and emit events.""" 565 edge_counts = defaultdict(int) 566 for edge in self._edges: 567 edge_counts[edge.source] += 1 568 edge_counts[edge.target] += 1 569 570 for uuid, count in edge_counts.items(): 571 if count >= 10 and uuid not in self._state.gravity_wells: 572 self._state.gravity_wells.append(uuid) 573 atom = self._atoms.get(uuid) 574 if atom: 575 self._emit('gravity_well', { 576 'uuid': uuid, 577 'content': atom.content[:200], 578 'edge_count': count 579 }) 580 581 def stream_atoms( 582 self, 583 since: Optional[datetime] = None 584 ) -> Generator[Dict[str, Any], None, None]: 585 """Generator that yields atoms, starting from a timestamp.""" 586 atoms = sorted(self._atoms.values(), key=lambda a: a.timestamp) 587 588 for atom in atoms: 589 if since and atom.timestamp <= since: 590 continue 591 592 yield { 593 'uuid': atom.uuid, 594 'content': atom.content, 595 'source_type': atom.source_type, 596 'timestamp': atom.timestamp.isoformat(), 597 'altitude': atom.altitude, 598 'resonance': atom.resonance, 599 'tags': atom.visible_tags 600 } 601 602 603 # Singleton instance 604 _graph_integration: Optional[GraphIntegration] = None 605 606 607 def get_graph_integration() -> GraphIntegration: 608 """Get or create the graph integration singleton.""" 609 global _graph_integration 610 if _graph_integration is None: 611 _graph_integration = GraphIntegration() 612 _graph_integration.load_today() 613 _graph_integration.start_watching() 614 return _graph_integration 615 616 617 if __name__ == '__main__': 618 print("=== Graph Integration ===\n") 619 620 gi = GraphIntegration() 621 state = gi.load_range(days_back=1) 622 623 print(f"Atoms: {state.atom_count}") 624 print(f"Edges: {state.edge_count}") 625 print(f"Gravity wells: {len(state.gravity_wells)}") 626 print(f"\nAtoms by altitude:") 627 for alt, count in state.atoms_by_altitude.items(): 628 print(f" {alt}: {count}") 629 print(f"\nEdges by type:") 630 for etype, count in state.edges_by_type.items(): 631 print(f" {etype}: {count}") 632 633 print("\nHigh resonance atoms:") 634 for atom in gi.get_high_resonance_atoms(5): 635 print(f" [{atom['resonance']:.2f}] {atom['content'][:60]}...")