/ core / hypercore / wiring.py
wiring.py
  1  """
  2  Hypercore Wiring - Auto-sync between Python components and Hypercore daemon.
  3  
  4  This module wires together:
  5  - SessionMembrane → Hypercore attractors
  6  - PhoenixExtractor → Hypercore Phoenix storage
  7  - CrossSessionTracker → Hypercore events
  8  - DeviceFleet → Hypercore fleet state
  9  
 10  The wiring is automatic - once connected, all updates propagate to the P2P network.
 11  
 12  Architecture:
 13      ┌──────────────────┐
 14      │  SessionMembrane │──┐
 15      └──────────────────┘  │
 16      ┌──────────────────┐  │    ┌─────────────────┐     ┌─────────────────┐
 17      │ PhoenixExtractor │──┼───►│ HypercoreWiring │────►│ Hypercore Daemon│
 18      └──────────────────┘  │    └─────────────────┘     └─────────────────┘
 19      ┌──────────────────┐  │                                    │
 20      │CrossSessionTracker├─┘                                    ▼
 21      └──────────────────┘                               ┌─────────────────┐
 22                                                         │   P2P Network   │
 23                                                         │  (Hyperswarm)   │
 24                                                         └─────────────────┘
 25  
 26  Usage:
 27      from core.hypercore.wiring import SovereignWiring
 28  
 29      wiring = SovereignWiring()
 30      wiring.connect()
 31  
 32      # Now all components auto-sync to Hypercore
 33      wiring.membrane.update_session_topics('session-001', ['attention'])
 34      # → Topics automatically synced to Hypercore attractors
 35  
 36      wiring.extract_phoenix(session)
 37      # → Phoenix state automatically stored in Hypercore
 38  """
 39  
 40  from dataclasses import dataclass, field
 41  from datetime import datetime
 42  from typing import Optional, List, Dict, Any, Callable
 43  from threading import Thread, Event
 44  import time
 45  
 46  from .client import HypercoreClient, HypercoreBridge, Attractor
 47  from ..attention.membrane import SessionMembrane, CrossSessionSignal, PermeabilityLevel
 48  from ..attention.cross_session import CrossSessionTracker, CrossSessionEvent
 49  from ..flight.phoenix_extractor import PhoenixExtractor, PhoenixState
 50  from ..flight.session import Session
 51  
 52  
 53  @dataclass
 54  class WiringConfig:
 55      """Configuration for the Hypercore wiring."""
 56      daemon_host: str = 'localhost'
 57      daemon_port: int = 7777
 58      auto_sync_interval: float = 5.0  # seconds
 59      sync_phoenix_on_close: bool = True
 60      sync_events_to_hypercore: bool = True
 61      sync_attractors_from_hypercore: bool = True
 62      permeability: PermeabilityLevel = PermeabilityLevel.ATTRACTORS
 63  
 64  
 65  class SovereignWiring:
 66      """
 67      Wires all Sovereign OS components to Hypercore for P2P sync.
 68  
 69      This is the central integration point that makes local Python components
 70      part of the distributed system.
 71      """
 72  
 73      def __init__(self, config: WiringConfig = None):
 74          self.config = config or WiringConfig()
 75  
 76          # Core components
 77          self.client: Optional[HypercoreClient] = None
 78          self.bridge: Optional[HypercoreBridge] = None
 79          self.membrane: Optional[SessionMembrane] = None
 80          self.tracker: Optional[CrossSessionTracker] = None
 81          self.extractor: Optional[PhoenixExtractor] = None
 82  
 83          # Background sync
 84          self._sync_thread: Optional[Thread] = None
 85          self._stop_event = Event()
 86  
 87          # Callbacks for external integration
 88          self._on_attractor_update: List[Callable[[List[Attractor]], None]] = []
 89          self._on_cross_session_event: List[Callable[[CrossSessionEvent], None]] = []
 90  
 91          # State tracking
 92          self._connected = False
 93          self._last_sync = None
 94          self._synced_sessions: set = set()
 95  
 96      def connect(self) -> bool:
 97          """
 98          Connect to Hypercore daemon and initialize all components.
 99  
100          Returns:
101              True if connected successfully
102          """
103          # Initialize client
104          self.client = HypercoreClient(
105              host=self.config.daemon_host,
106              port=self.config.daemon_port
107          )
108  
109          if not self.client.is_connected():
110              print(f"Warning: Hypercore daemon not reachable at "
111                    f"{self.config.daemon_host}:{self.config.daemon_port}")
112              print("Run: node hypercore_daemon.js to start it")
113              return False
114  
115          # Initialize membrane
116          self.membrane = SessionMembrane(
117              permeability=self.config.permeability
118          )
119  
120          # Initialize tracker
121          self.tracker = CrossSessionTracker()
122  
123          # Initialize extractor
124          self.extractor = PhoenixExtractor()
125  
126          # Initialize bridge
127          self.bridge = HypercoreBridge(
128              client=self.client,
129              membrane=self.membrane,
130              tracker=self.tracker
131          )
132  
133          # Wire up automatic syncing
134          self._wire_membrane()
135          self._wire_tracker()
136  
137          # Start background sync if configured
138          if self.config.sync_attractors_from_hypercore:
139              self._start_sync_thread()
140  
141          self._connected = True
142          print(f"Sovereign wiring connected to Hypercore at "
143                f"{self.config.daemon_host}:{self.config.daemon_port}")
144  
145          return True
146  
147      def _wire_membrane(self):
148          """Wire SessionMembrane to auto-sync topics."""
149          # Store original method
150          original_update = self.membrane.update_session_topics
151  
152          def synced_update(session_id: str, topics: List[str]):
153              # Call original
154              signals = original_update(session_id, topics)
155  
156              # Sync to Hypercore
157              if self.bridge and self.config.sync_events_to_hypercore:
158                  self.bridge.sync_membrane_topics(session_id, topics)
159  
160                  # Also sync any cross-session signals
161                  for signal in signals:
162                      self.client.write_event(
163                          event_type='cross_session_signal',
164                          session_id=session_id,
165                          topics=[signal.content] if signal.signal_type == 'attractor' else [],
166                          content=signal.to_context_line(),
167                          metadata={
168                              'signal_type': signal.signal_type,
169                              'strength': signal.strength,
170                              'source_sessions': signal.source_sessions
171                          }
172                      )
173  
174              return signals
175  
176          # Replace method
177          self.membrane.update_session_topics = synced_update
178  
179      def _wire_tracker(self):
180          """Wire CrossSessionTracker to auto-sync events."""
181          # Store original method
182          original_record = self.tracker.record_activity
183  
184          def synced_record(
185              session_id: str,
186              atom_uuid: str = None,
187              content: str = None,
188              topics: List[str] = None
189          ):
190              # Call original
191              events = original_record(session_id, atom_uuid, content, topics)
192  
193              # Sync to Hypercore
194              if self.bridge and self.config.sync_events_to_hypercore:
195                  self.client.write_event(
196                      event_type='attention_activity',
197                      session_id=session_id,
198                      topics=topics,
199                      content=content[:200] if content else None,
200                      metadata={
201                          'atom_uuid': atom_uuid
202                      }
203                  )
204  
205                  # Sync any cross-session events
206                  if events:
207                      for event in events:
208                          self.client.write_event(
209                              event_type=event.event_type,
210                              session_id=event.target_session,
211                              topics=event.shared_topics,
212                              content=event.content,
213                              metadata={
214                                  'source_session': event.source_session,
215                                  'strength': event.strength
216                              }
217                          )
218  
219                          # Notify callbacks
220                          for callback in self._on_cross_session_event:
221                              callback(event)
222  
223              return events
224  
225          # Replace method
226          self.tracker.record_activity = synced_record
227  
228      def extract_phoenix(
229          self,
230          session: Session,
231          operator: str = "rick",
232          domain: str = "Estate",
233          key_files: List[str] = None,
234          auto_store: bool = True
235      ) -> PhoenixState:
236          """
237          Extract Phoenix state from session and optionally store to Hypercore.
238  
239          Args:
240              session: Session to extract from
241              operator: Operator name
242              domain: Domain name
243              key_files: Key file paths
244              auto_store: Whether to auto-store to Hypercore
245  
246          Returns:
247              PhoenixState
248          """
249          phoenix = self.extractor.extract(
250              session=session,
251              operator=operator,
252              domain=domain,
253              key_files=key_files
254          )
255  
256          if auto_store and self.bridge:
257              success = self.bridge.sync_phoenix_state(phoenix)
258              if success:
259                  print(f"Phoenix state stored: {phoenix.session_id}")
260              else:
261                  print(f"Warning: Failed to store Phoenix state")
262  
263          return phoenix
264  
265      def register_session(
266          self,
267          session_id: str,
268          source: str = "python",
269          topics: List[str] = None
270      ):
271          """
272          Register a session with all components.
273  
274          Args:
275              session_id: Session identifier
276              source: Session source (claude-code, claude-app, etc.)
277              topics: Initial topics
278          """
279          # Register with membrane
280          self.membrane.register_session(session_id)
281  
282          # Register with tracker
283          self.tracker.register_session(session_id, source=source)
284  
285          # Register with Hypercore
286          if self.client:
287              self.client.register_session(session_id, topics=topics)
288  
289          self._synced_sessions.add(session_id)
290  
291      def update_topics(self, session_id: str, topics: List[str]):
292          """
293          Update topics for a session (syncs to all components).
294  
295          Args:
296              session_id: Session identifier
297              topics: Topics to update
298          """
299          # Ensure session is registered
300          if session_id not in self._synced_sessions:
301              self.register_session(session_id)
302  
303          # Update membrane (auto-syncs to Hypercore via wiring)
304          self.membrane.update_session_topics(session_id, topics)
305  
306      def record_activity(
307          self,
308          session_id: str,
309          content: str = None,
310          topics: List[str] = None
311      ):
312          """
313          Record activity in a session (syncs to all components).
314  
315          Args:
316              session_id: Session identifier
317              content: Activity content
318              topics: Related topics
319          """
320          # Ensure session is registered
321          if session_id not in self._synced_sessions:
322              self.register_session(session_id)
323  
324          # Record with tracker (auto-syncs to Hypercore via wiring)
325          import uuid
326          atom_uuid = str(uuid.uuid4())[:8]
327          self.tracker.record_activity(
328              session_id=session_id,
329              atom_uuid=atom_uuid,
330              content=content,
331              topics=topics
332          )
333  
334      def get_cross_session_context(self, session_id: str) -> str:
335          """
336          Get cross-session context for a session.
337  
338          Combines local membrane state with Hypercore attractors.
339  
340          Args:
341              session_id: Session to get context for
342  
343          Returns:
344              Formatted context string
345          """
346          # Get local membrane context
347          from ..attention.membrane import CrossSessionContextInjector
348          injector = CrossSessionContextInjector(self.membrane)
349          local_context = injector.format_for_claude(session_id)
350  
351          # Get Hypercore attractors
352          hypercore_context = ""
353          if self.client:
354              attractors = self.client.get_attractors()
355              multi_machine = [a for a in attractors if len(a.machines) > 1]
356  
357              if multi_machine:
358                  lines = [
359                      "<multi-machine-attractors>",
360                      "Topics active across your device fleet:",
361                      ""
362                  ]
363                  for a in multi_machine[:5]:
364                      lines.append(f"  - {a.topic} (on {', '.join(a.machines)})")
365                  lines.append("</multi-machine-attractors>")
366                  hypercore_context = "\n".join(lines)
367  
368          return f"{local_context}\n\n{hypercore_context}".strip()
369  
370      def _start_sync_thread(self):
371          """Start background thread for syncing attractors from Hypercore."""
372          self._stop_event.clear()
373          self._sync_thread = Thread(target=self._sync_loop, daemon=True)
374          self._sync_thread.start()
375  
376      def _sync_loop(self):
377          """Background loop that syncs attractors from Hypercore."""
378          while not self._stop_event.wait(timeout=self.config.auto_sync_interval):
379              try:
380                  # Get attractors from Hypercore
381                  attractors = self.client.get_attractors()
382  
383                  # Update membrane with cross-machine attractors
384                  for a in attractors:
385                      if len(a.sessions) >= 2:
386                          # This is a cross-session attractor
387                          for session_id in a.sessions:
388                              if session_id in self._synced_sessions:
389                                  # Update membrane (without re-syncing to avoid loop)
390                                  self.membrane._session_topics.setdefault(session_id, set()).add(a.topic)
391  
392                  # Notify callbacks
393                  for callback in self._on_attractor_update:
394                      callback(attractors)
395  
396                  self._last_sync = datetime.now()
397  
398              except Exception as e:
399                  print(f"Sync error: {e}")
400  
401      def on_attractor_update(self, callback: Callable[[List[Attractor]], None]):
402          """Register callback for attractor updates from Hypercore."""
403          self._on_attractor_update.append(callback)
404  
405      def on_cross_session_event(self, callback: Callable[[CrossSessionEvent], None]):
406          """Register callback for cross-session events."""
407          self._on_cross_session_event.append(callback)
408  
409      def get_status(self) -> Dict[str, Any]:
410          """Get wiring status."""
411          status = {
412              'connected': self._connected,
413              'daemon_reachable': self.client.is_connected() if self.client else False,
414              'synced_sessions': list(self._synced_sessions),
415              'last_sync': self._last_sync.isoformat() if self._last_sync else None,
416          }
417  
418          if self.client and self._connected:
419              daemon_status = self.client.status()
420              if daemon_status:
421                  status['daemon'] = {
422                      'machine': daemon_status.machine,
423                      'peers': daemon_status.peers,
424                      'attractors': daemon_status.attractor_count,
425                      'phoenix_states': daemon_status.phoenix_count
426                  }
427  
428          if self.membrane:
429              status['membrane'] = {
430                  'sessions': len(self.membrane._session_topics),
431                  'attractors': len(self.membrane._get_attractors())
432              }
433  
434          if self.tracker:
435              state = self.tracker.get_state()
436              status['tracker'] = {
437                  'sessions': len(state.sessions),
438                  'cross_events': len(state.cross_events),
439                  'attractors': state.cross_session_attractors
440              }
441  
442          return status
443  
444      def disconnect(self):
445          """Disconnect from Hypercore and stop background sync."""
446          self._stop_event.set()
447          if self._sync_thread:
448              self._sync_thread.join(timeout=2.0)
449          self._connected = False
450  
451  
452  # Singleton instance
453  _wiring: Optional[SovereignWiring] = None
454  
455  
456  def get_wiring(config: WiringConfig = None) -> SovereignWiring:
457      """Get or create the global wiring instance."""
458      global _wiring
459      if _wiring is None:
460          _wiring = SovereignWiring(config)
461      return _wiring
462  
463  
464  def connect(
465      host: str = 'localhost',
466      port: int = 7777,
467      auto_sync: bool = True
468  ) -> SovereignWiring:
469      """
470      Connect to Hypercore and initialize wiring.
471  
472      Args:
473          host: Daemon host
474          port: Daemon port
475          auto_sync: Whether to auto-sync attractors
476  
477      Returns:
478          Connected SovereignWiring instance
479      """
480      config = WiringConfig(
481          daemon_host=host,
482          daemon_port=port,
483          sync_attractors_from_hypercore=auto_sync
484      )
485      wiring = get_wiring(config)
486      wiring.connect()
487      return wiring
488  
489  
490  if __name__ == "__main__":
491      print("=== Sovereign Wiring Test ===\n")
492  
493      wiring = SovereignWiring()
494  
495      if not wiring.connect():
496          print("Start the Hypercore daemon first:")
497          print("  node hypercore_daemon.js")
498          exit(1)
499  
500      print("\n--- Testing membrane sync ---")
501      wiring.register_session('test-session-001', source='python-test')
502      wiring.update_topics('test-session-001', ['attention', 'hypercore', 'wiring'])
503      print("Topics updated and synced to Hypercore")
504  
505      print("\n--- Testing activity recording ---")
506      wiring.record_activity(
507          'test-session-001',
508          content="Testing the wiring between Python components and Hypercore",
509          topics=['testing', 'integration']
510      )
511      print("Activity recorded and synced")
512  
513      print("\n--- Testing cross-session context ---")
514      wiring.register_session('test-session-002', source='python-test')
515      wiring.update_topics('test-session-002', ['attention', 'membrane'])  # Shares 'attention'
516  
517      context = wiring.get_cross_session_context('test-session-001')
518      print("Cross-session context for session-001:")
519      print(context[:500] if context else "(no context)")
520  
521      print("\n--- Wiring Status ---")
522      status = wiring.get_status()
523      for key, value in status.items():
524          print(f"  {key}: {value}")
525  
526      wiring.disconnect()
527      print("\n=== Test Complete ===")