/ core / hypercore / sync.py
sync.py
  1  """
  2  Hypercore Sync - Automatic synchronization of attention context.
  3  
  4  Integrates:
  5  - SessionMembrane → Hypercore attractors
  6  - CrossSessionTracker → Hypercore events
  7  - PhoenixExtractor → Hypercore storage
  8  - AttentionOrchestrator → Hypercore attention stream
  9  
 10  This creates a P2P-replicated attention layer across all machines.
 11  """
 12  
 13  from dataclasses import dataclass
 14  from datetime import datetime
 15  from typing import Optional, List, Dict, Callable, TYPE_CHECKING
 16  import threading
 17  import time
 18  
 19  from .client import HypercoreClient, HypercoreBridge, Attractor
 20  
 21  if TYPE_CHECKING:
 22      from ..attention.membrane import SessionMembrane, CrossSessionSignal
 23      from ..attention.cross_session import CrossSessionTracker
 24      from ..flight.phoenix_extractor import PhoenixState, PhoenixExtractor
 25      from ..flight.session import Session
 26  
 27  
 28  @dataclass
 29  class SyncStats:
 30      """Statistics for sync operations."""
 31      events_synced: int = 0
 32      topics_synced: int = 0
 33      phoenix_states_synced: int = 0
 34      attention_events_synced: int = 0
 35      last_sync: Optional[datetime] = None
 36      errors: int = 0
 37  
 38  
 39  class ContextSync:
 40      """
 41      Automatic context synchronization to Hypercore.
 42  
 43      Watches the attention system and syncs changes to Hypercore in real-time.
 44      Other machines receive updates via Hyperswarm P2P replication.
 45  
 46      Usage:
 47          from core.attention.membrane import SessionMembrane
 48          from core.attention.cross_session import CrossSessionTracker
 49          from core.hypercore.sync import ContextSync
 50  
 51          membrane = SessionMembrane()
 52          tracker = CrossSessionTracker()
 53  
 54          sync = ContextSync(membrane=membrane, tracker=tracker)
 55          sync.start()
 56  
 57          # Now all membrane/tracker updates auto-sync to Hypercore
 58          membrane.update_session_topics('session-001', ['attention'])
 59          # → Automatically synced to Hypercore attractors
 60      """
 61  
 62      def __init__(
 63          self,
 64          membrane: 'SessionMembrane' = None,
 65          tracker: 'CrossSessionTracker' = None,
 66          client: HypercoreClient = None,
 67          sync_interval: float = 1.0,
 68          auto_start: bool = False
 69      ):
 70          self.membrane = membrane
 71          self.tracker = tracker
 72          self.client = client or HypercoreClient()
 73          self.sync_interval = sync_interval
 74  
 75          self.stats = SyncStats()
 76          self._running = False
 77          self._thread: Optional[threading.Thread] = None
 78          self._last_membrane_state: Dict[str, set] = {}
 79          self._last_tracker_state: Dict[str, int] = {}
 80  
 81          # Callbacks for sync events
 82          self._on_attractor_sync: List[Callable[[str, int], None]] = []
 83          self._on_cross_session: List[Callable[[List[Attractor]], None]] = []
 84  
 85          if auto_start:
 86              self.start()
 87  
 88      def start(self) -> bool:
 89          """Start automatic synchronization."""
 90          if not self.client.is_connected():
 91              print("[ContextSync] Warning: Daemon not connected, sync disabled")
 92              return False
 93  
 94          if self._running:
 95              return True
 96  
 97          self._running = True
 98          self._thread = threading.Thread(target=self._sync_loop, daemon=True)
 99          self._thread.start()
100  
101          print(f"[ContextSync] Started (interval: {self.sync_interval}s)")
102          return True
103  
104      def stop(self):
105          """Stop automatic synchronization."""
106          self._running = False
107          if self._thread:
108              self._thread.join(timeout=2.0)
109          print("[ContextSync] Stopped")
110  
111      def _sync_loop(self):
112          """Main sync loop."""
113          while self._running:
114              try:
115                  self._sync_membrane()
116                  self._sync_tracker()
117                  self._check_cross_session_attractors()
118                  self.stats.last_sync = datetime.now()
119              except Exception as e:
120                  self.stats.errors += 1
121                  print(f"[ContextSync] Error: {e}")
122  
123              time.sleep(self.sync_interval)
124  
125      def _sync_membrane(self):
126          """Sync membrane topics to Hypercore."""
127          if not self.membrane:
128              return
129  
130          for session_id, topics in self.membrane._session_topics.items():
131              prev_topics = self._last_membrane_state.get(session_id, set())
132              new_topics = topics - prev_topics
133  
134              for topic in new_topics:
135                  result = self.client.register_topic(topic, session_id)
136                  if result:
137                      self.stats.topics_synced += 1
138                      for cb in self._on_attractor_sync:
139                          cb(topic, result.get('count', 1))
140  
141              self._last_membrane_state[session_id] = topics.copy()
142  
143      def _sync_tracker(self):
144          """Sync tracker events to Hypercore."""
145          if not self.tracker:
146              return
147  
148          state = self.tracker.get_state()
149  
150          # Sync cross-session events
151          event_count = len(state.cross_events)
152          last_count = self._last_tracker_state.get('cross_events', 0)
153  
154          for event in state.cross_events[last_count:]:
155              self.client.write_event(
156                  event_type=event.event_type,
157                  session_id=event.target_session,
158                  topics=event.shared_topics,
159                  content=event.content,
160                  metadata={
161                      'source_session': event.source_session,
162                      'strength': event.strength
163                  }
164              )
165              self.stats.events_synced += 1
166  
167          self._last_tracker_state['cross_events'] = event_count
168  
169      def _check_cross_session_attractors(self):
170          """Check for multi-session/multi-machine attractors."""
171          attractors = self.client.get_attractors()
172  
173          # Filter to cross-session (2+ sessions)
174          cross_session = [a for a in attractors if len(a.sessions) >= 2]
175  
176          # Filter to multi-machine
177          multi_machine = [a for a in attractors if len(a.machines) > 1]
178  
179          if cross_session or multi_machine:
180              for cb in self._on_cross_session:
181                  cb(cross_session + multi_machine)
182  
183      def on_attractor_sync(self, callback: Callable[[str, int], None]):
184          """Register callback for attractor sync events."""
185          self._on_attractor_sync.append(callback)
186  
187      def on_cross_session(self, callback: Callable[[List[Attractor]], None]):
188          """Register callback for cross-session attractors."""
189          self._on_cross_session.append(callback)
190  
191      def sync_session(
192          self,
193          session_id: str,
194          topics: List[str] = None,
195          altitude: str = None
196      ) -> bool:
197          """
198          Manually sync a session to Hypercore.
199  
200          Args:
201              session_id: Session identifier
202              topics: Session topics
203              altitude: Current altitude
204  
205          Returns:
206              True if successful
207          """
208          success = self.client.register_session(session_id, topics, altitude)
209          if success and topics:
210              for topic in topics:
211                  self.client.register_topic(topic, session_id)
212                  self.stats.topics_synced += 1
213          return success
214  
215      def sync_event(
216          self,
217          event_type: str,
218          session_id: str,
219          topics: List[str] = None,
220          content: str = None
221      ) -> bool:
222          """Manually sync an event."""
223          success = self.client.write_event(event_type, session_id, topics, content)
224          if success:
225              self.stats.events_synced += 1
226          return success
227  
228      def get_stats(self) -> SyncStats:
229          """Get sync statistics."""
230          return self.stats
231  
232  
233  class PhoenixSync:
234      """
235      Phoenix State synchronization to Hypercore.
236  
237      Stores and retrieves Phoenix states via P2P replication.
238      Enables resurrection of cognitive configuration from any machine.
239  
240      Usage:
241          from core.flight import PhoenixExtractor, SessionManager
242          from core.hypercore.sync import PhoenixSync
243  
244          sync = PhoenixSync()
245  
246          # Extract and sync Phoenix state
247          manager = SessionManager()
248          session = manager.close_session('session-001')
249  
250          extractor = PhoenixExtractor()
251          phoenix = extractor.extract(session)
252  
253          sync.store(phoenix)
254  
255          # Later, from any machine:
256          state = sync.retrieve('2026-01-15-attention-architecture')
257      """
258  
259      def __init__(self, client: HypercoreClient = None):
260          self.client = client or HypercoreClient()
261          self.stats = SyncStats()
262  
263      def is_connected(self) -> bool:
264          """Check if daemon is reachable."""
265          return self.client.is_connected()
266  
267      def store(self, phoenix_state: 'PhoenixState') -> bool:
268          """
269          Store a Phoenix state to Hypercore.
270  
271          Args:
272              phoenix_state: PhoenixState instance
273  
274          Returns:
275              True if successful
276          """
277          state_dict = {
278              'session_id': phoenix_state.session_id,
279              'created': phoenix_state.created.isoformat(),
280              'operator': phoenix_state.operator,
281              'domain': phoenix_state.domain,
282              'operator_altitude': phoenix_state.operator_altitude.name,
283              'system_altitude': phoenix_state.system_altitude.name,
284              'pull_rate': phoenix_state.pull_rate,
285              'gravity_wells': [
286                  {
287                      'concept': gw.concept,
288                      'resonance': gw.resonance,
289                      'mention_count': gw.mention_count
290                  }
291                  for gw in phoenix_state.gravity_wells[:10]
292              ],
293              'momentum': {
294                  'rising': phoenix_state.momentum.rising[:5],
295                  'almost_formed': [
296                      list(pair) for pair in phoenix_state.momentum.almost_formed[:5]
297                  ]
298              },
299              'open_threads': [
300                  {
301                      'content': t.content,
302                      'importance': t.importance
303                  }
304                  for t in phoenix_state.open_threads[:10]
305              ],
306              'ready_to_implement': phoenix_state.ready_to_implement[:10],
307              'paths_not_taken': [
308                  {
309                      'description': p.description,
310                      'reason': p.reason,
311                      'reversible': p.reversible
312                  }
313                  for p in phoenix_state.paths_not_taken[:10]
314              ],
315              'key_files': phoenix_state.key_files[:10]
316          }
317  
318          success = self.client.store_phoenix(phoenix_state.session_id, state_dict)
319          if success:
320              self.stats.phoenix_states_synced += 1
321              print(f"[PhoenixSync] Stored: {phoenix_state.session_id}")
322          return success
323  
324      def retrieve(self, session_id: str) -> Optional[Dict]:
325          """
326          Retrieve a Phoenix state from Hypercore.
327  
328          Args:
329              session_id: Session identifier
330  
331          Returns:
332              Phoenix state dictionary or None
333          """
334          return self.client.get_phoenix(session_id)
335  
336      def list_states(self) -> List[Dict]:
337          """List all stored Phoenix states."""
338          return self.client.list_phoenix_states()
339  
340      def get_latest(self) -> Optional[Dict]:
341          """Get the most recently stored Phoenix state."""
342          states = self.list_states()
343          if not states:
344              return None
345  
346          # Sort by storedAt descending
347          sorted_states = sorted(
348              states,
349              key=lambda s: s.get('storedAt', ''),
350              reverse=True
351          )
352  
353          if sorted_states:
354              # Fetch full state
355              return self.retrieve(sorted_states[0]['sessionId'])
356  
357          return None
358  
359      def generate_resurrection_prompt(self, session_id: str) -> Optional[str]:
360          """
361          Generate a resurrection prompt for a Phoenix state.
362  
363          Args:
364              session_id: Session identifier
365  
366          Returns:
367              Resurrection prompt string or None
368          """
369          state = self.retrieve(session_id)
370          if not state:
371              return None
372  
373          gravity_wells = state.get('gravity_wells', [])
374          open_threads = state.get('open_threads', [])
375  
376          lines = [
377              f"Resurrecting Phoenix state {session_id}.",
378              f"Operator altitude: {state.get('operator_altitude', 'unknown')}",
379          ]
380  
381          if gravity_wells:
382              lines.append(f'Primary gravity well: "{gravity_wells[0].get("concept", "unknown")}"')
383              lines.append("")
384              lines.append("Ready to continue. We were exploring:")
385              for gw in gravity_wells[:3]:
386                  lines.append(f"  - {gw.get('concept', '')}")
387  
388          if open_threads:
389              lines.append("")
390              lines.append("Open threads:")
391              for t in open_threads[:3]:
392                  content = t.get('content', t) if isinstance(t, dict) else t
393                  lines.append(f"  - {content}")
394  
395          lines.append("")
396          lines.append("What would you like to pick up?")
397  
398          return "\n".join(lines)
399  
400  
401  # Factory functions
402  def create_context_sync(
403      membrane: 'SessionMembrane' = None,
404      tracker: 'CrossSessionTracker' = None,
405      auto_start: bool = True
406  ) -> ContextSync:
407      """
408      Create a context sync instance.
409  
410      Args:
411          membrane: Optional SessionMembrane
412          tracker: Optional CrossSessionTracker
413          auto_start: Start sync automatically
414  
415      Returns:
416          ContextSync instance
417      """
418      return ContextSync(
419          membrane=membrane,
420          tracker=tracker,
421          auto_start=auto_start
422      )
423  
424  
425  def create_phoenix_sync() -> PhoenixSync:
426      """Create a Phoenix sync instance."""
427      return PhoenixSync()
428  
429  
430  if __name__ == "__main__":
431      print("=== Hypercore Sync Test ===\n")
432  
433      # Test Phoenix sync
434      sync = PhoenixSync()
435  
436      if not sync.is_connected():
437          print("Daemon not running. Start with: node ~/scripts/hypercore_daemon.js")
438          exit(1)
439  
440      print("Phoenix states:")
441      for state in sync.list_states():
442          print(f"  {state.get('sessionId')}: stored {state.get('storedAt')}")
443  
444      latest = sync.get_latest()
445      if latest:
446          print(f"\nLatest state gravity wells:")
447          for gw in latest.get('gravity_wells', [])[:3]:
448              print(f"  - {gw.get('concept')}: {gw.get('resonance')}")
449  
450      print("\n=== Test Complete ===")