wiring.py
1 """ 2 Hypercore Wiring - Auto-sync between Python components and Hypercore daemon. 3 4 This module wires together: 5 - SessionMembrane → Hypercore attractors 6 - PhoenixExtractor → Hypercore Phoenix storage 7 - CrossSessionTracker → Hypercore events 8 - DeviceFleet → Hypercore fleet state 9 10 The wiring is automatic - once connected, all updates propagate to the P2P network. 11 12 Architecture: 13 ┌──────────────────┐ 14 │ SessionMembrane │──┐ 15 └──────────────────┘ │ 16 ┌──────────────────┐ │ ┌─────────────────┐ ┌─────────────────┐ 17 │ PhoenixExtractor │──┼───►│ HypercoreWiring │────►│ Hypercore Daemon│ 18 └──────────────────┘ │ └─────────────────┘ └─────────────────┘ 19 ┌──────────────────┐ │ │ 20 │CrossSessionTracker├─┘ ▼ 21 └──────────────────┘ ┌─────────────────┐ 22 │ P2P Network │ 23 │ (Hyperswarm) │ 24 └─────────────────┘ 25 26 Usage: 27 from core.hypercore.wiring import SovereignWiring 28 29 wiring = SovereignWiring() 30 wiring.connect() 31 32 # Now all components auto-sync to Hypercore 33 wiring.membrane.update_session_topics('session-001', ['attention']) 34 # → Topics automatically synced to Hypercore attractors 35 36 wiring.extract_phoenix(session) 37 # → Phoenix state automatically stored in Hypercore 38 """ 39 40 from dataclasses import dataclass, field 41 from datetime import datetime 42 from typing import Optional, List, Dict, Any, Callable 43 from threading import Thread, Event 44 import time 45 46 from .client import HypercoreClient, HypercoreBridge, Attractor 47 from ..attention.membrane import SessionMembrane, CrossSessionSignal, PermeabilityLevel 48 from ..attention.cross_session import CrossSessionTracker, CrossSessionEvent 49 from ..flight.phoenix_extractor import PhoenixExtractor, PhoenixState 50 from ..flight.session import Session 51 52 53 @dataclass 54 class WiringConfig: 55 """Configuration for the Hypercore wiring.""" 56 daemon_host: str = 'localhost' 57 daemon_port: int = 7777 58 auto_sync_interval: float = 5.0 # seconds 59 sync_phoenix_on_close: bool = True 60 sync_events_to_hypercore: bool = True 61 sync_attractors_from_hypercore: bool = True 62 permeability: PermeabilityLevel = PermeabilityLevel.ATTRACTORS 63 64 65 class SovereignWiring: 66 """ 67 Wires all Sovereign OS components to Hypercore for P2P sync. 68 69 This is the central integration point that makes local Python components 70 part of the distributed system. 71 """ 72 73 def __init__(self, config: WiringConfig = None): 74 self.config = config or WiringConfig() 75 76 # Core components 77 self.client: Optional[HypercoreClient] = None 78 self.bridge: Optional[HypercoreBridge] = None 79 self.membrane: Optional[SessionMembrane] = None 80 self.tracker: Optional[CrossSessionTracker] = None 81 self.extractor: Optional[PhoenixExtractor] = None 82 83 # Background sync 84 self._sync_thread: Optional[Thread] = None 85 self._stop_event = Event() 86 87 # Callbacks for external integration 88 self._on_attractor_update: List[Callable[[List[Attractor]], None]] = [] 89 self._on_cross_session_event: List[Callable[[CrossSessionEvent], None]] = [] 90 91 # State tracking 92 self._connected = False 93 self._last_sync = None 94 self._synced_sessions: set = set() 95 96 def connect(self) -> bool: 97 """ 98 Connect to Hypercore daemon and initialize all components. 99 100 Returns: 101 True if connected successfully 102 """ 103 # Initialize client 104 self.client = HypercoreClient( 105 host=self.config.daemon_host, 106 port=self.config.daemon_port 107 ) 108 109 if not self.client.is_connected(): 110 print(f"Warning: Hypercore daemon not reachable at " 111 f"{self.config.daemon_host}:{self.config.daemon_port}") 112 print("Run: node hypercore_daemon.js to start it") 113 return False 114 115 # Initialize membrane 116 self.membrane = SessionMembrane( 117 permeability=self.config.permeability 118 ) 119 120 # Initialize tracker 121 self.tracker = CrossSessionTracker() 122 123 # Initialize extractor 124 self.extractor = PhoenixExtractor() 125 126 # Initialize bridge 127 self.bridge = HypercoreBridge( 128 client=self.client, 129 membrane=self.membrane, 130 tracker=self.tracker 131 ) 132 133 # Wire up automatic syncing 134 self._wire_membrane() 135 self._wire_tracker() 136 137 # Start background sync if configured 138 if self.config.sync_attractors_from_hypercore: 139 self._start_sync_thread() 140 141 self._connected = True 142 print(f"Sovereign wiring connected to Hypercore at " 143 f"{self.config.daemon_host}:{self.config.daemon_port}") 144 145 return True 146 147 def _wire_membrane(self): 148 """Wire SessionMembrane to auto-sync topics.""" 149 # Store original method 150 original_update = self.membrane.update_session_topics 151 152 def synced_update(session_id: str, topics: List[str]): 153 # Call original 154 signals = original_update(session_id, topics) 155 156 # Sync to Hypercore 157 if self.bridge and self.config.sync_events_to_hypercore: 158 self.bridge.sync_membrane_topics(session_id, topics) 159 160 # Also sync any cross-session signals 161 for signal in signals: 162 self.client.write_event( 163 event_type='cross_session_signal', 164 session_id=session_id, 165 topics=[signal.content] if signal.signal_type == 'attractor' else [], 166 content=signal.to_context_line(), 167 metadata={ 168 'signal_type': signal.signal_type, 169 'strength': signal.strength, 170 'source_sessions': signal.source_sessions 171 } 172 ) 173 174 return signals 175 176 # Replace method 177 self.membrane.update_session_topics = synced_update 178 179 def _wire_tracker(self): 180 """Wire CrossSessionTracker to auto-sync events.""" 181 # Store original method 182 original_record = self.tracker.record_activity 183 184 def synced_record( 185 session_id: str, 186 atom_uuid: str = None, 187 content: str = None, 188 topics: List[str] = None 189 ): 190 # Call original 191 events = original_record(session_id, atom_uuid, content, topics) 192 193 # Sync to Hypercore 194 if self.bridge and self.config.sync_events_to_hypercore: 195 self.client.write_event( 196 event_type='attention_activity', 197 session_id=session_id, 198 topics=topics, 199 content=content[:200] if content else None, 200 metadata={ 201 'atom_uuid': atom_uuid 202 } 203 ) 204 205 # Sync any cross-session events 206 if events: 207 for event in events: 208 self.client.write_event( 209 event_type=event.event_type, 210 session_id=event.target_session, 211 topics=event.shared_topics, 212 content=event.content, 213 metadata={ 214 'source_session': event.source_session, 215 'strength': event.strength 216 } 217 ) 218 219 # Notify callbacks 220 for callback in self._on_cross_session_event: 221 callback(event) 222 223 return events 224 225 # Replace method 226 self.tracker.record_activity = synced_record 227 228 def extract_phoenix( 229 self, 230 session: Session, 231 operator: str = "rick", 232 domain: str = "Estate", 233 key_files: List[str] = None, 234 auto_store: bool = True 235 ) -> PhoenixState: 236 """ 237 Extract Phoenix state from session and optionally store to Hypercore. 238 239 Args: 240 session: Session to extract from 241 operator: Operator name 242 domain: Domain name 243 key_files: Key file paths 244 auto_store: Whether to auto-store to Hypercore 245 246 Returns: 247 PhoenixState 248 """ 249 phoenix = self.extractor.extract( 250 session=session, 251 operator=operator, 252 domain=domain, 253 key_files=key_files 254 ) 255 256 if auto_store and self.bridge: 257 success = self.bridge.sync_phoenix_state(phoenix) 258 if success: 259 print(f"Phoenix state stored: {phoenix.session_id}") 260 else: 261 print(f"Warning: Failed to store Phoenix state") 262 263 return phoenix 264 265 def register_session( 266 self, 267 session_id: str, 268 source: str = "python", 269 topics: List[str] = None 270 ): 271 """ 272 Register a session with all components. 273 274 Args: 275 session_id: Session identifier 276 source: Session source (claude-code, claude-app, etc.) 277 topics: Initial topics 278 """ 279 # Register with membrane 280 self.membrane.register_session(session_id) 281 282 # Register with tracker 283 self.tracker.register_session(session_id, source=source) 284 285 # Register with Hypercore 286 if self.client: 287 self.client.register_session(session_id, topics=topics) 288 289 self._synced_sessions.add(session_id) 290 291 def update_topics(self, session_id: str, topics: List[str]): 292 """ 293 Update topics for a session (syncs to all components). 294 295 Args: 296 session_id: Session identifier 297 topics: Topics to update 298 """ 299 # Ensure session is registered 300 if session_id not in self._synced_sessions: 301 self.register_session(session_id) 302 303 # Update membrane (auto-syncs to Hypercore via wiring) 304 self.membrane.update_session_topics(session_id, topics) 305 306 def record_activity( 307 self, 308 session_id: str, 309 content: str = None, 310 topics: List[str] = None 311 ): 312 """ 313 Record activity in a session (syncs to all components). 314 315 Args: 316 session_id: Session identifier 317 content: Activity content 318 topics: Related topics 319 """ 320 # Ensure session is registered 321 if session_id not in self._synced_sessions: 322 self.register_session(session_id) 323 324 # Record with tracker (auto-syncs to Hypercore via wiring) 325 import uuid 326 atom_uuid = str(uuid.uuid4())[:8] 327 self.tracker.record_activity( 328 session_id=session_id, 329 atom_uuid=atom_uuid, 330 content=content, 331 topics=topics 332 ) 333 334 def get_cross_session_context(self, session_id: str) -> str: 335 """ 336 Get cross-session context for a session. 337 338 Combines local membrane state with Hypercore attractors. 339 340 Args: 341 session_id: Session to get context for 342 343 Returns: 344 Formatted context string 345 """ 346 # Get local membrane context 347 from ..attention.membrane import CrossSessionContextInjector 348 injector = CrossSessionContextInjector(self.membrane) 349 local_context = injector.format_for_claude(session_id) 350 351 # Get Hypercore attractors 352 hypercore_context = "" 353 if self.client: 354 attractors = self.client.get_attractors() 355 multi_machine = [a for a in attractors if len(a.machines) > 1] 356 357 if multi_machine: 358 lines = [ 359 "<multi-machine-attractors>", 360 "Topics active across your device fleet:", 361 "" 362 ] 363 for a in multi_machine[:5]: 364 lines.append(f" - {a.topic} (on {', '.join(a.machines)})") 365 lines.append("</multi-machine-attractors>") 366 hypercore_context = "\n".join(lines) 367 368 return f"{local_context}\n\n{hypercore_context}".strip() 369 370 def _start_sync_thread(self): 371 """Start background thread for syncing attractors from Hypercore.""" 372 self._stop_event.clear() 373 self._sync_thread = Thread(target=self._sync_loop, daemon=True) 374 self._sync_thread.start() 375 376 def _sync_loop(self): 377 """Background loop that syncs attractors from Hypercore.""" 378 while not self._stop_event.wait(timeout=self.config.auto_sync_interval): 379 try: 380 # Get attractors from Hypercore 381 attractors = self.client.get_attractors() 382 383 # Update membrane with cross-machine attractors 384 for a in attractors: 385 if len(a.sessions) >= 2: 386 # This is a cross-session attractor 387 for session_id in a.sessions: 388 if session_id in self._synced_sessions: 389 # Update membrane (without re-syncing to avoid loop) 390 self.membrane._session_topics.setdefault(session_id, set()).add(a.topic) 391 392 # Notify callbacks 393 for callback in self._on_attractor_update: 394 callback(attractors) 395 396 self._last_sync = datetime.now() 397 398 except Exception as e: 399 print(f"Sync error: {e}") 400 401 def on_attractor_update(self, callback: Callable[[List[Attractor]], None]): 402 """Register callback for attractor updates from Hypercore.""" 403 self._on_attractor_update.append(callback) 404 405 def on_cross_session_event(self, callback: Callable[[CrossSessionEvent], None]): 406 """Register callback for cross-session events.""" 407 self._on_cross_session_event.append(callback) 408 409 def get_status(self) -> Dict[str, Any]: 410 """Get wiring status.""" 411 status = { 412 'connected': self._connected, 413 'daemon_reachable': self.client.is_connected() if self.client else False, 414 'synced_sessions': list(self._synced_sessions), 415 'last_sync': self._last_sync.isoformat() if self._last_sync else None, 416 } 417 418 if self.client and self._connected: 419 daemon_status = self.client.status() 420 if daemon_status: 421 status['daemon'] = { 422 'machine': daemon_status.machine, 423 'peers': daemon_status.peers, 424 'attractors': daemon_status.attractor_count, 425 'phoenix_states': daemon_status.phoenix_count 426 } 427 428 if self.membrane: 429 status['membrane'] = { 430 'sessions': len(self.membrane._session_topics), 431 'attractors': len(self.membrane._get_attractors()) 432 } 433 434 if self.tracker: 435 state = self.tracker.get_state() 436 status['tracker'] = { 437 'sessions': len(state.sessions), 438 'cross_events': len(state.cross_events), 439 'attractors': state.cross_session_attractors 440 } 441 442 return status 443 444 def disconnect(self): 445 """Disconnect from Hypercore and stop background sync.""" 446 self._stop_event.set() 447 if self._sync_thread: 448 self._sync_thread.join(timeout=2.0) 449 self._connected = False 450 451 452 # Singleton instance 453 _wiring: Optional[SovereignWiring] = None 454 455 456 def get_wiring(config: WiringConfig = None) -> SovereignWiring: 457 """Get or create the global wiring instance.""" 458 global _wiring 459 if _wiring is None: 460 _wiring = SovereignWiring(config) 461 return _wiring 462 463 464 def connect( 465 host: str = 'localhost', 466 port: int = 7777, 467 auto_sync: bool = True 468 ) -> SovereignWiring: 469 """ 470 Connect to Hypercore and initialize wiring. 471 472 Args: 473 host: Daemon host 474 port: Daemon port 475 auto_sync: Whether to auto-sync attractors 476 477 Returns: 478 Connected SovereignWiring instance 479 """ 480 config = WiringConfig( 481 daemon_host=host, 482 daemon_port=port, 483 sync_attractors_from_hypercore=auto_sync 484 ) 485 wiring = get_wiring(config) 486 wiring.connect() 487 return wiring 488 489 490 if __name__ == "__main__": 491 print("=== Sovereign Wiring Test ===\n") 492 493 wiring = SovereignWiring() 494 495 if not wiring.connect(): 496 print("Start the Hypercore daemon first:") 497 print(" node hypercore_daemon.js") 498 exit(1) 499 500 print("\n--- Testing membrane sync ---") 501 wiring.register_session('test-session-001', source='python-test') 502 wiring.update_topics('test-session-001', ['attention', 'hypercore', 'wiring']) 503 print("Topics updated and synced to Hypercore") 504 505 print("\n--- Testing activity recording ---") 506 wiring.record_activity( 507 'test-session-001', 508 content="Testing the wiring between Python components and Hypercore", 509 topics=['testing', 'integration'] 510 ) 511 print("Activity recorded and synced") 512 513 print("\n--- Testing cross-session context ---") 514 wiring.register_session('test-session-002', source='python-test') 515 wiring.update_topics('test-session-002', ['attention', 'membrane']) # Shares 'attention' 516 517 context = wiring.get_cross_session_context('test-session-001') 518 print("Cross-session context for session-001:") 519 print(context[:500] if context else "(no context)") 520 521 print("\n--- Wiring Status ---") 522 status = wiring.get_status() 523 for key, value in status.items(): 524 print(f" {key}: {value}") 525 526 wiring.disconnect() 527 print("\n=== Test Complete ===")