sync.py
1 """ 2 Hypercore Sync - Automatic synchronization of attention context. 3 4 Integrates: 5 - SessionMembrane → Hypercore attractors 6 - CrossSessionTracker → Hypercore events 7 - PhoenixExtractor → Hypercore storage 8 - AttentionOrchestrator → Hypercore attention stream 9 10 This creates a P2P-replicated attention layer across all machines. 11 """ 12 13 from dataclasses import dataclass 14 from datetime import datetime 15 from typing import Optional, List, Dict, Callable, TYPE_CHECKING 16 import threading 17 import time 18 19 from .client import HypercoreClient, HypercoreBridge, Attractor 20 21 if TYPE_CHECKING: 22 from ..attention.membrane import SessionMembrane, CrossSessionSignal 23 from ..attention.cross_session import CrossSessionTracker 24 from ..flight.phoenix_extractor import PhoenixState, PhoenixExtractor 25 from ..flight.session import Session 26 27 28 @dataclass 29 class SyncStats: 30 """Statistics for sync operations.""" 31 events_synced: int = 0 32 topics_synced: int = 0 33 phoenix_states_synced: int = 0 34 attention_events_synced: int = 0 35 last_sync: Optional[datetime] = None 36 errors: int = 0 37 38 39 class ContextSync: 40 """ 41 Automatic context synchronization to Hypercore. 42 43 Watches the attention system and syncs changes to Hypercore in real-time. 44 Other machines receive updates via Hyperswarm P2P replication. 45 46 Usage: 47 from core.attention.membrane import SessionMembrane 48 from core.attention.cross_session import CrossSessionTracker 49 from core.hypercore.sync import ContextSync 50 51 membrane = SessionMembrane() 52 tracker = CrossSessionTracker() 53 54 sync = ContextSync(membrane=membrane, tracker=tracker) 55 sync.start() 56 57 # Now all membrane/tracker updates auto-sync to Hypercore 58 membrane.update_session_topics('session-001', ['attention']) 59 # → Automatically synced to Hypercore attractors 60 """ 61 62 def __init__( 63 self, 64 membrane: 'SessionMembrane' = None, 65 tracker: 'CrossSessionTracker' = None, 66 client: HypercoreClient = None, 67 sync_interval: float = 1.0, 68 auto_start: bool = False 69 ): 70 self.membrane = membrane 71 self.tracker = tracker 72 self.client = client or HypercoreClient() 73 self.sync_interval = sync_interval 74 75 self.stats = SyncStats() 76 self._running = False 77 self._thread: Optional[threading.Thread] = None 78 self._last_membrane_state: Dict[str, set] = {} 79 self._last_tracker_state: Dict[str, int] = {} 80 81 # Callbacks for sync events 82 self._on_attractor_sync: List[Callable[[str, int], None]] = [] 83 self._on_cross_session: List[Callable[[List[Attractor]], None]] = [] 84 85 if auto_start: 86 self.start() 87 88 def start(self) -> bool: 89 """Start automatic synchronization.""" 90 if not self.client.is_connected(): 91 print("[ContextSync] Warning: Daemon not connected, sync disabled") 92 return False 93 94 if self._running: 95 return True 96 97 self._running = True 98 self._thread = threading.Thread(target=self._sync_loop, daemon=True) 99 self._thread.start() 100 101 print(f"[ContextSync] Started (interval: {self.sync_interval}s)") 102 return True 103 104 def stop(self): 105 """Stop automatic synchronization.""" 106 self._running = False 107 if self._thread: 108 self._thread.join(timeout=2.0) 109 print("[ContextSync] Stopped") 110 111 def _sync_loop(self): 112 """Main sync loop.""" 113 while self._running: 114 try: 115 self._sync_membrane() 116 self._sync_tracker() 117 self._check_cross_session_attractors() 118 self.stats.last_sync = datetime.now() 119 except Exception as e: 120 self.stats.errors += 1 121 print(f"[ContextSync] Error: {e}") 122 123 time.sleep(self.sync_interval) 124 125 def _sync_membrane(self): 126 """Sync membrane topics to Hypercore.""" 127 if not self.membrane: 128 return 129 130 for session_id, topics in self.membrane._session_topics.items(): 131 prev_topics = self._last_membrane_state.get(session_id, set()) 132 new_topics = topics - prev_topics 133 134 for topic in new_topics: 135 result = self.client.register_topic(topic, session_id) 136 if result: 137 self.stats.topics_synced += 1 138 for cb in self._on_attractor_sync: 139 cb(topic, result.get('count', 1)) 140 141 self._last_membrane_state[session_id] = topics.copy() 142 143 def _sync_tracker(self): 144 """Sync tracker events to Hypercore.""" 145 if not self.tracker: 146 return 147 148 state = self.tracker.get_state() 149 150 # Sync cross-session events 151 event_count = len(state.cross_events) 152 last_count = self._last_tracker_state.get('cross_events', 0) 153 154 for event in state.cross_events[last_count:]: 155 self.client.write_event( 156 event_type=event.event_type, 157 session_id=event.target_session, 158 topics=event.shared_topics, 159 content=event.content, 160 metadata={ 161 'source_session': event.source_session, 162 'strength': event.strength 163 } 164 ) 165 self.stats.events_synced += 1 166 167 self._last_tracker_state['cross_events'] = event_count 168 169 def _check_cross_session_attractors(self): 170 """Check for multi-session/multi-machine attractors.""" 171 attractors = self.client.get_attractors() 172 173 # Filter to cross-session (2+ sessions) 174 cross_session = [a for a in attractors if len(a.sessions) >= 2] 175 176 # Filter to multi-machine 177 multi_machine = [a for a in attractors if len(a.machines) > 1] 178 179 if cross_session or multi_machine: 180 for cb in self._on_cross_session: 181 cb(cross_session + multi_machine) 182 183 def on_attractor_sync(self, callback: Callable[[str, int], None]): 184 """Register callback for attractor sync events.""" 185 self._on_attractor_sync.append(callback) 186 187 def on_cross_session(self, callback: Callable[[List[Attractor]], None]): 188 """Register callback for cross-session attractors.""" 189 self._on_cross_session.append(callback) 190 191 def sync_session( 192 self, 193 session_id: str, 194 topics: List[str] = None, 195 altitude: str = None 196 ) -> bool: 197 """ 198 Manually sync a session to Hypercore. 199 200 Args: 201 session_id: Session identifier 202 topics: Session topics 203 altitude: Current altitude 204 205 Returns: 206 True if successful 207 """ 208 success = self.client.register_session(session_id, topics, altitude) 209 if success and topics: 210 for topic in topics: 211 self.client.register_topic(topic, session_id) 212 self.stats.topics_synced += 1 213 return success 214 215 def sync_event( 216 self, 217 event_type: str, 218 session_id: str, 219 topics: List[str] = None, 220 content: str = None 221 ) -> bool: 222 """Manually sync an event.""" 223 success = self.client.write_event(event_type, session_id, topics, content) 224 if success: 225 self.stats.events_synced += 1 226 return success 227 228 def get_stats(self) -> SyncStats: 229 """Get sync statistics.""" 230 return self.stats 231 232 233 class PhoenixSync: 234 """ 235 Phoenix State synchronization to Hypercore. 236 237 Stores and retrieves Phoenix states via P2P replication. 238 Enables resurrection of cognitive configuration from any machine. 239 240 Usage: 241 from core.flight import PhoenixExtractor, SessionManager 242 from core.hypercore.sync import PhoenixSync 243 244 sync = PhoenixSync() 245 246 # Extract and sync Phoenix state 247 manager = SessionManager() 248 session = manager.close_session('session-001') 249 250 extractor = PhoenixExtractor() 251 phoenix = extractor.extract(session) 252 253 sync.store(phoenix) 254 255 # Later, from any machine: 256 state = sync.retrieve('2026-01-15-attention-architecture') 257 """ 258 259 def __init__(self, client: HypercoreClient = None): 260 self.client = client or HypercoreClient() 261 self.stats = SyncStats() 262 263 def is_connected(self) -> bool: 264 """Check if daemon is reachable.""" 265 return self.client.is_connected() 266 267 def store(self, phoenix_state: 'PhoenixState') -> bool: 268 """ 269 Store a Phoenix state to Hypercore. 270 271 Args: 272 phoenix_state: PhoenixState instance 273 274 Returns: 275 True if successful 276 """ 277 state_dict = { 278 'session_id': phoenix_state.session_id, 279 'created': phoenix_state.created.isoformat(), 280 'operator': phoenix_state.operator, 281 'domain': phoenix_state.domain, 282 'operator_altitude': phoenix_state.operator_altitude.name, 283 'system_altitude': phoenix_state.system_altitude.name, 284 'pull_rate': phoenix_state.pull_rate, 285 'gravity_wells': [ 286 { 287 'concept': gw.concept, 288 'resonance': gw.resonance, 289 'mention_count': gw.mention_count 290 } 291 for gw in phoenix_state.gravity_wells[:10] 292 ], 293 'momentum': { 294 'rising': phoenix_state.momentum.rising[:5], 295 'almost_formed': [ 296 list(pair) for pair in phoenix_state.momentum.almost_formed[:5] 297 ] 298 }, 299 'open_threads': [ 300 { 301 'content': t.content, 302 'importance': t.importance 303 } 304 for t in phoenix_state.open_threads[:10] 305 ], 306 'ready_to_implement': phoenix_state.ready_to_implement[:10], 307 'paths_not_taken': [ 308 { 309 'description': p.description, 310 'reason': p.reason, 311 'reversible': p.reversible 312 } 313 for p in phoenix_state.paths_not_taken[:10] 314 ], 315 'key_files': phoenix_state.key_files[:10] 316 } 317 318 success = self.client.store_phoenix(phoenix_state.session_id, state_dict) 319 if success: 320 self.stats.phoenix_states_synced += 1 321 print(f"[PhoenixSync] Stored: {phoenix_state.session_id}") 322 return success 323 324 def retrieve(self, session_id: str) -> Optional[Dict]: 325 """ 326 Retrieve a Phoenix state from Hypercore. 327 328 Args: 329 session_id: Session identifier 330 331 Returns: 332 Phoenix state dictionary or None 333 """ 334 return self.client.get_phoenix(session_id) 335 336 def list_states(self) -> List[Dict]: 337 """List all stored Phoenix states.""" 338 return self.client.list_phoenix_states() 339 340 def get_latest(self) -> Optional[Dict]: 341 """Get the most recently stored Phoenix state.""" 342 states = self.list_states() 343 if not states: 344 return None 345 346 # Sort by storedAt descending 347 sorted_states = sorted( 348 states, 349 key=lambda s: s.get('storedAt', ''), 350 reverse=True 351 ) 352 353 if sorted_states: 354 # Fetch full state 355 return self.retrieve(sorted_states[0]['sessionId']) 356 357 return None 358 359 def generate_resurrection_prompt(self, session_id: str) -> Optional[str]: 360 """ 361 Generate a resurrection prompt for a Phoenix state. 362 363 Args: 364 session_id: Session identifier 365 366 Returns: 367 Resurrection prompt string or None 368 """ 369 state = self.retrieve(session_id) 370 if not state: 371 return None 372 373 gravity_wells = state.get('gravity_wells', []) 374 open_threads = state.get('open_threads', []) 375 376 lines = [ 377 f"Resurrecting Phoenix state {session_id}.", 378 f"Operator altitude: {state.get('operator_altitude', 'unknown')}", 379 ] 380 381 if gravity_wells: 382 lines.append(f'Primary gravity well: "{gravity_wells[0].get("concept", "unknown")}"') 383 lines.append("") 384 lines.append("Ready to continue. We were exploring:") 385 for gw in gravity_wells[:3]: 386 lines.append(f" - {gw.get('concept', '')}") 387 388 if open_threads: 389 lines.append("") 390 lines.append("Open threads:") 391 for t in open_threads[:3]: 392 content = t.get('content', t) if isinstance(t, dict) else t 393 lines.append(f" - {content}") 394 395 lines.append("") 396 lines.append("What would you like to pick up?") 397 398 return "\n".join(lines) 399 400 401 # Factory functions 402 def create_context_sync( 403 membrane: 'SessionMembrane' = None, 404 tracker: 'CrossSessionTracker' = None, 405 auto_start: bool = True 406 ) -> ContextSync: 407 """ 408 Create a context sync instance. 409 410 Args: 411 membrane: Optional SessionMembrane 412 tracker: Optional CrossSessionTracker 413 auto_start: Start sync automatically 414 415 Returns: 416 ContextSync instance 417 """ 418 return ContextSync( 419 membrane=membrane, 420 tracker=tracker, 421 auto_start=auto_start 422 ) 423 424 425 def create_phoenix_sync() -> PhoenixSync: 426 """Create a Phoenix sync instance.""" 427 return PhoenixSync() 428 429 430 if __name__ == "__main__": 431 print("=== Hypercore Sync Test ===\n") 432 433 # Test Phoenix sync 434 sync = PhoenixSync() 435 436 if not sync.is_connected(): 437 print("Daemon not running. Start with: node ~/scripts/hypercore_daemon.js") 438 exit(1) 439 440 print("Phoenix states:") 441 for state in sync.list_states(): 442 print(f" {state.get('sessionId')}: stored {state.get('storedAt')}") 443 444 latest = sync.get_latest() 445 if latest: 446 print(f"\nLatest state gravity wells:") 447 for gw in latest.get('gravity_wells', [])[:3]: 448 print(f" - {gw.get('concept')}: {gw.get('resonance')}") 449 450 print("\n=== Test Complete ===")