/ core / hypercore / keet_bridge.py
keet_bridge.py
  1  """
  2  Keet Context Bridge - P2P transport for context synchronization
  3  
  4  The operator is the pilot. The mnemonic is their license number.
  5  
  6  This bridge enables Claude instances to share context via Keet P2P rooms,
  7  regardless of whether they have file system access. All keys derive from
  8  the operator's 2-word BIP-39 mnemonic (e.g., "amber-falcon").
  9  
 10  See: docs/architecture/OPERATOR_ARCHITECTURE.md
 11  
 12  KEY INSIGHT: The transport layer IS the sandbox escape.
 13  - Phone Claude can't write files, but CAN send P2P messages
 14  - Desktop Claude receives messages, persists to local store
 15  - Phone Claude reads context via P2P, not files
 16  
 17  Architecture:
 18      ┌─────────────────────────────────────────────────────────┐
 19      │                 KEET CONTEXT ROOM                        │
 20      │            "sovereign-os-context-{user}"                 │
 21      ├─────────────────────────────────────────────────────────┤
 22      │                                                          │
 23      │  Phone Claude ←──────────────────────→ Desktop Claude   │
 24      │  (no files)         P2P messages         (has files)    │
 25      │                                                          │
 26      │  ContextPacket:                                          │
 27      │  - type: "resonance" | "phoenix" | "attractor" | "ping" │
 28      │  - payload: context data                                 │
 29      │  - machine: source machine ID                            │
 30      │  - timestamp: ISO8601                                    │
 31      │                                                          │
 32      └─────────────────────────────────────────────────────────┘
 33  
 34  Transport modes:
 35  1. WEBSOCKET: Connect to local Keet CLI daemon (ws://localhost:7778)
 36  2. HTTP: Connect to local Keet HTTP bridge (http://localhost:7778)
 37  3. SUBPROCESS: Spawn keet-cli directly (fallback)
 38  """
 39  
 40  import asyncio
 41  import json
 42  import hashlib
 43  import logging
 44  import threading
 45  import time
 46  from dataclasses import dataclass, field
 47  from datetime import datetime, timedelta
 48  from pathlib import Path
 49  from typing import Dict, List, Optional, Any, Callable, Set
 50  from enum import Enum
 51  import queue
 52  
 53  logger = logging.getLogger(__name__)
 54  
 55  # Try to import async HTTP client
 56  try:
 57      import aiohttp
 58      HAS_AIOHTTP = True
 59  except ImportError:
 60      HAS_AIOHTTP = False
 61  
 62  # Try to import websockets
 63  try:
 64      import websockets
 65      HAS_WEBSOCKETS = True
 66  except ImportError:
 67      HAS_WEBSOCKETS = False
 68  
 69  
 70  # =============================================================================
 71  # PROTOCOL DEFINITIONS
 72  # =============================================================================
 73  
 74  class PacketType(Enum):
 75      """Types of context packets."""
 76      RESONANCE = "resonance"      # Resonance state update
 77      PHOENIX = "phoenix"          # Phoenix state checkpoint
 78      ATTRACTOR = "attractor"      # Attractor update
 79      PING = "ping"                # Presence ping
 80      PONG = "pong"                # Presence response
 81      REQUEST = "request"          # Request context from peers
 82      RESPONSE = "response"        # Response to request
 83  
 84  
 85  @dataclass
 86  class ContextPacket:
 87      """
 88      A packet of context for P2P transport.
 89  
 90      Designed to be compact and self-describing.
 91      Includes operator mnemonic prefix for routing.
 92      """
 93      type: PacketType
 94      machine_id: str
 95      timestamp: datetime
 96      payload: Dict = field(default_factory=dict)
 97      request_id: Optional[str] = None  # For request/response correlation
 98      operator: Optional[str] = None    # Operator mnemonic prefix (e.g., "amber-falcon")
 99  
100      def to_dict(self) -> Dict:
101          return {
102              "type": self.type.value,
103              "machine_id": self.machine_id,
104              "timestamp": self.timestamp.isoformat(),
105              "payload": self.payload,
106              "request_id": self.request_id,
107              "operator": self.operator,
108          }
109  
110      def to_json(self) -> str:
111          return json.dumps(self.to_dict())
112  
113      @classmethod
114      def from_dict(cls, data: Dict) -> "ContextPacket":
115          return cls(
116              type=PacketType(data["type"]),
117              machine_id=data["machine_id"],
118              timestamp=datetime.fromisoformat(data["timestamp"]),
119              payload=data.get("payload", {}),
120              request_id=data.get("request_id"),
121              operator=data.get("operator"),
122          )
123  
124      @classmethod
125      def from_json(cls, json_str: str) -> "ContextPacket":
126          return cls.from_dict(json.loads(json_str))
127  
128  
129  # =============================================================================
130  # KEET BRIDGE
131  # =============================================================================
132  
133  class KeetContextBridge:
134      """
135      Bridge for synchronizing context via Keet P2P rooms.
136  
137      This is the TRANSPORT LAYER that enables:
138      - Sandboxed Claude instances (phone) to share context
139      - Desktop Claude to persist context received from phone
140      - Any Claude to request context from peers
141  
142      Usage:
143          bridge = KeetContextBridge(room_name="my-context-room")
144          bridge.start()
145  
146          # Send context
147          bridge.broadcast_resonance(resonance_state)
148          bridge.broadcast_phoenix(phoenix_state)
149  
150          # Receive context
151          bridge.on_resonance(callback)
152          bridge.on_phoenix(callback)
153  
154          # Request context from peers
155          context = await bridge.request_context()
156      """
157  
158      # Default room name (deterministic from user)
159      DEFAULT_ROOM_PREFIX = "sovereign-os-context"
160  
161      # Connection endpoints
162      KEET_WS_PORT = 7778
163      KEET_HTTP_PORT = 7778
164  
165      # Timeouts
166      CONNECT_TIMEOUT = 5.0
167      REQUEST_TIMEOUT = 10.0
168      PING_INTERVAL = 30.0
169  
170      def __init__(
171          self,
172          room_name: str = None,
173          machine_id: str = None,
174          keet_endpoint: str = None,
175          operator_mnemonic: str = None,
176      ):
177          """
178          Initialize the Keet bridge.
179  
180          Args:
181              room_name: Name of the Keet room to join (derived from mnemonic if not provided)
182              machine_id: This machine's identifier
183              keet_endpoint: Override Keet daemon endpoint
184              operator_mnemonic: 2-word BIP-39 mnemonic (e.g., "amber-falcon")
185          """
186          self.operator_mnemonic = operator_mnemonic or self._load_operator_mnemonic()
187          self.room_name = room_name or self._default_room_name()
188          self.machine_id = machine_id or self._get_machine_id()
189          self.keet_endpoint = keet_endpoint or f"http://localhost:{self.KEET_HTTP_PORT}"
190  
191          # Connection state
192          self._connected = False
193          self._peers: Set[str] = set()
194          self._last_ping = datetime.min
195  
196          # Message queues
197          self._outbound_queue: queue.Queue = queue.Queue()
198          self._inbound_queue: queue.Queue = queue.Queue()
199  
200          # Callbacks
201          self._resonance_callbacks: List[Callable] = []
202          self._phoenix_callbacks: List[Callable] = []
203          self._attractor_callbacks: List[Callable] = []
204          self._peer_callbacks: List[Callable] = []
205  
206          # Request tracking
207          self._pending_requests: Dict[str, asyncio.Future] = {}
208  
209          # Background thread
210          self._running = False
211          self._thread: Optional[threading.Thread] = None
212  
213          logger.info(f"Keet bridge initialized: room={self.room_name}, machine={self.machine_id[:8]}")
214  
215      def _load_operator_mnemonic(self) -> Optional[str]:
216          """Load operator mnemonic from config file."""
217          import os
218          config_file = Path(os.path.expanduser("~/.sovereign/operator-mnemonic.json"))
219          if config_file.exists():
220              try:
221                  data = json.loads(config_file.read_text())
222                  return data.get("mnemonic")
223              except Exception:
224                  pass
225          return None
226  
227      def _default_room_name(self) -> str:
228          """Generate default room name from operator mnemonic or username."""
229          if self.operator_mnemonic:
230              # Room derived from operator mnemonic
231              return f"{self.DEFAULT_ROOM_PREFIX}-{self.operator_mnemonic}"
232          import os
233          username = os.environ.get("USER", "default")
234          return f"{self.DEFAULT_ROOM_PREFIX}-{username}"
235  
236      def _get_machine_id(self) -> str:
237          """Get stable machine identifier."""
238          import socket
239          hostname = socket.gethostname()
240          return hashlib.sha256(hostname.encode()).hexdigest()[:16]
241  
242      # =========================================================================
243      # CONNECTION MANAGEMENT
244      # =========================================================================
245  
246      def start(self) -> bool:
247          """
248          Start the bridge (connect to room, begin sync).
249  
250          Returns:
251              True if started successfully
252          """
253          if self._running:
254              return True
255  
256          self._running = True
257          self._thread = threading.Thread(target=self._run_loop, daemon=True)
258          self._thread.start()
259  
260          # Wait for initial connection
261          time.sleep(1)
262          return self._connected
263  
264      def stop(self) -> None:
265          """Stop the bridge."""
266          self._running = False
267          if self._thread:
268              self._thread.join(timeout=2)
269              self._thread = None
270          self._connected = False
271  
272      def is_connected(self) -> bool:
273          """Check if connected to room."""
274          return self._connected
275  
276      def get_peers(self) -> Set[str]:
277          """Get set of connected peer machine IDs."""
278          return self._peers.copy()
279  
280      def _run_loop(self) -> None:
281          """Background loop for P2P sync."""
282          while self._running:
283              try:
284                  # Try to connect if not connected
285                  if not self._connected:
286                      self._try_connect()
287  
288                  # Process outbound messages
289                  self._process_outbound()
290  
291                  # Process inbound messages
292                  self._process_inbound()
293  
294                  # Ping peers periodically
295                  if (datetime.now() - self._last_ping).total_seconds() > self.PING_INTERVAL:
296                      self._send_ping()
297  
298              except Exception as e:
299                  logger.error(f"Error in Keet bridge loop: {e}")
300                  self._connected = False
301  
302              time.sleep(0.1)
303  
304      def _try_connect(self) -> bool:
305          """Try to connect to Keet room."""
306          try:
307              # Try HTTP endpoint first
308              import urllib.request
309              url = f"{self.keet_endpoint}/status"
310              req = urllib.request.Request(url, method="GET")
311  
312              with urllib.request.urlopen(req, timeout=self.CONNECT_TIMEOUT) as resp:
313                  if resp.status == 200:
314                      self._connected = True
315                      logger.info(f"Connected to Keet daemon at {self.keet_endpoint}")
316  
317                      # Join room
318                      self._join_room()
319                      return True
320  
321          except Exception as e:
322              logger.debug(f"Keet connection failed: {e}")
323              self._connected = False
324  
325          return False
326  
327      def _join_room(self) -> None:
328          """Join the context room."""
329          try:
330              import urllib.request
331              url = f"{self.keet_endpoint}/room/join"
332              data = json.dumps({"room": self.room_name}).encode()
333              req = urllib.request.Request(url, data=data, method="POST")
334              req.add_header("Content-Type", "application/json")
335  
336              with urllib.request.urlopen(req, timeout=self.CONNECT_TIMEOUT) as resp:
337                  if resp.status == 200:
338                      logger.info(f"Joined Keet room: {self.room_name}")
339  
340          except Exception as e:
341              logger.warning(f"Failed to join room: {e}")
342  
343      # =========================================================================
344      # SENDING
345      # =========================================================================
346  
347      def broadcast_resonance(self, resonance_state: Dict) -> bool:
348          """
349          Broadcast resonance state to peers.
350  
351          Args:
352              resonance_state: Dict with axiom_scores, dominant_axiom, etc.
353  
354          Returns:
355              True if queued for sending
356          """
357          packet = ContextPacket(
358              type=PacketType.RESONANCE,
359              machine_id=self.machine_id,
360              timestamp=datetime.now(),
361              payload=resonance_state,
362          )
363          return self._queue_outbound(packet)
364  
365      def broadcast_phoenix(self, phoenix_state: Dict) -> bool:
366          """
367          Broadcast Phoenix state to peers.
368  
369          Args:
370              phoenix_state: Dict with checkpoint, gravity_wells, etc.
371  
372          Returns:
373              True if queued for sending
374          """
375          packet = ContextPacket(
376              type=PacketType.PHOENIX,
377              machine_id=self.machine_id,
378              timestamp=datetime.now(),
379              payload=phoenix_state,
380          )
381          return self._queue_outbound(packet)
382  
383      def broadcast_attractor(self, topic: str, strength: float, sessions: List[str] = None) -> bool:
384          """
385          Broadcast attractor update to peers.
386  
387          Args:
388              topic: The attractor topic
389              strength: Attractor strength (0-1)
390              sessions: Sessions where this attractor appears
391  
392          Returns:
393              True if queued for sending
394          """
395          packet = ContextPacket(
396              type=PacketType.ATTRACTOR,
397              machine_id=self.machine_id,
398              timestamp=datetime.now(),
399              payload={
400                  "topic": topic,
401                  "strength": strength,
402                  "sessions": sessions or [],
403              },
404          )
405          return self._queue_outbound(packet)
406  
407      def request_context(self, timeout: float = None) -> Optional[Dict]:
408          """
409          Request current context from peers (synchronous).
410  
411          Args:
412              timeout: How long to wait for response
413  
414          Returns:
415              Dict with combined context from peers, or None if timeout
416          """
417          timeout = timeout or self.REQUEST_TIMEOUT
418          request_id = hashlib.sha256(f"{self.machine_id}-{time.time()}".encode()).hexdigest()[:16]
419  
420          packet = ContextPacket(
421              type=PacketType.REQUEST,
422              machine_id=self.machine_id,
423              timestamp=datetime.now(),
424              request_id=request_id,
425          )
426  
427          # Send request
428          self._queue_outbound(packet)
429  
430          # Wait for response
431          start = time.time()
432          while time.time() - start < timeout:
433              # Check inbound queue for response
434              try:
435                  response = self._inbound_queue.get(timeout=0.1)
436                  if response.type == PacketType.RESPONSE and response.request_id == request_id:
437                      return response.payload
438                  else:
439                      # Put back non-matching packets
440                      self._inbound_queue.put(response)
441              except queue.Empty:
442                  pass
443  
444          return None
445  
446      def _send_ping(self) -> None:
447          """Send presence ping to room."""
448          packet = ContextPacket(
449              type=PacketType.PING,
450              machine_id=self.machine_id,
451              timestamp=datetime.now(),
452          )
453          self._queue_outbound(packet)
454          self._last_ping = datetime.now()
455  
456      def _queue_outbound(self, packet: ContextPacket) -> bool:
457          """Queue a packet for sending."""
458          if not self._connected:
459              logger.debug("Not connected, packet queued for later")
460  
461          self._outbound_queue.put(packet)
462          return True
463  
464      def _process_outbound(self) -> None:
465          """Process outbound queue."""
466          while not self._outbound_queue.empty():
467              try:
468                  packet = self._outbound_queue.get_nowait()
469                  self._send_to_room(packet)
470              except queue.Empty:
471                  break
472  
473      def _send_to_room(self, packet: ContextPacket) -> bool:
474          """Send packet to Keet room."""
475          if not self._connected:
476              return False
477  
478          try:
479              import urllib.request
480              url = f"{self.keet_endpoint}/room/message"
481              data = json.dumps({
482                  "room": self.room_name,
483                  "message": packet.to_json(),
484              }).encode()
485              req = urllib.request.Request(url, data=data, method="POST")
486              req.add_header("Content-Type", "application/json")
487  
488              with urllib.request.urlopen(req, timeout=self.CONNECT_TIMEOUT) as resp:
489                  return resp.status == 200
490  
491          except Exception as e:
492              logger.debug(f"Failed to send to room: {e}")
493              return False
494  
495      # =========================================================================
496      # RECEIVING
497      # =========================================================================
498  
499      def on_resonance(self, callback: Callable[[Dict, str], None]) -> None:
500          """
501          Register callback for resonance updates.
502  
503          Args:
504              callback: Function(resonance_state, machine_id)
505          """
506          self._resonance_callbacks.append(callback)
507  
508      def on_phoenix(self, callback: Callable[[Dict, str], None]) -> None:
509          """
510          Register callback for Phoenix updates.
511  
512          Args:
513              callback: Function(phoenix_state, machine_id)
514          """
515          self._phoenix_callbacks.append(callback)
516  
517      def on_attractor(self, callback: Callable[[str, float, str], None]) -> None:
518          """
519          Register callback for attractor updates.
520  
521          Args:
522              callback: Function(topic, strength, machine_id)
523          """
524          self._attractor_callbacks.append(callback)
525  
526      def on_peer(self, callback: Callable[[str, bool], None]) -> None:
527          """
528          Register callback for peer presence changes.
529  
530          Args:
531              callback: Function(machine_id, is_online)
532          """
533          self._peer_callbacks.append(callback)
534  
535      def _process_inbound(self) -> None:
536          """Process inbound messages from room."""
537          if not self._connected:
538              return
539  
540          try:
541              # Poll for new messages
542              import urllib.request
543              url = f"{self.keet_endpoint}/room/messages?room={self.room_name}&since=last"
544              req = urllib.request.Request(url, method="GET")
545  
546              with urllib.request.urlopen(req, timeout=1.0) as resp:
547                  if resp.status == 200:
548                      data = json.loads(resp.read().decode())
549                      for msg in data.get("messages", []):
550                          self._handle_message(msg)
551  
552          except Exception as e:
553              # Polling failures are normal, don't log
554              pass
555  
556      def _handle_message(self, msg: Dict) -> None:
557          """Handle an incoming message."""
558          try:
559              # Parse as ContextPacket
560              if isinstance(msg, str):
561                  packet = ContextPacket.from_json(msg)
562              elif isinstance(msg, dict) and "type" in msg:
563                  packet = ContextPacket.from_dict(msg)
564              else:
565                  # Try extracting from message wrapper
566                  content = msg.get("content") or msg.get("message") or msg.get("data")
567                  if content:
568                      packet = ContextPacket.from_json(content) if isinstance(content, str) else ContextPacket.from_dict(content)
569                  else:
570                      return
571  
572              # Ignore our own messages
573              if packet.machine_id == self.machine_id:
574                  return
575  
576              # Track peer
577              self._peers.add(packet.machine_id)
578  
579              # Dispatch by type
580              if packet.type == PacketType.RESONANCE:
581                  for cb in self._resonance_callbacks:
582                      cb(packet.payload, packet.machine_id)
583  
584              elif packet.type == PacketType.PHOENIX:
585                  for cb in self._phoenix_callbacks:
586                      cb(packet.payload, packet.machine_id)
587  
588              elif packet.type == PacketType.ATTRACTOR:
589                  topic = packet.payload.get("topic")
590                  strength = packet.payload.get("strength", 0.5)
591                  for cb in self._attractor_callbacks:
592                      cb(topic, strength, packet.machine_id)
593  
594              elif packet.type == PacketType.PING:
595                  # Respond with pong
596                  self._send_pong(packet.machine_id)
597                  for cb in self._peer_callbacks:
598                      cb(packet.machine_id, True)
599  
600              elif packet.type == PacketType.REQUEST:
601                  # Respond with our current context
602                  self._handle_context_request(packet)
603  
604              elif packet.type == PacketType.RESPONSE:
605                  # Queue for request handler
606                  self._inbound_queue.put(packet)
607  
608          except Exception as e:
609              logger.debug(f"Failed to handle message: {e}")
610  
611      def _send_pong(self, to_machine: str) -> None:
612          """Send pong response."""
613          packet = ContextPacket(
614              type=PacketType.PONG,
615              machine_id=self.machine_id,
616              timestamp=datetime.now(),
617              payload={"to": to_machine},
618          )
619          self._queue_outbound(packet)
620  
621      def _handle_context_request(self, request: ContextPacket) -> None:
622          """Handle a context request from peer."""
623          # Build response with our current context
624          # This will be populated by the context controller
625          response = ContextPacket(
626              type=PacketType.RESPONSE,
627              machine_id=self.machine_id,
628              timestamp=datetime.now(),
629              request_id=request.request_id,
630              payload={
631                  "from_machine": self.machine_id,
632                  "has_context": False,  # Will be overridden
633              },
634          )
635          self._queue_outbound(response)
636  
637      # =========================================================================
638      # STATUS
639      # =========================================================================
640  
641      def get_status(self) -> Dict:
642          """Get bridge status."""
643          return {
644              "connected": self._connected,
645              "room_name": self.room_name,
646              "machine_id": self.machine_id,
647              "keet_endpoint": self.keet_endpoint,
648              "peers": list(self._peers),
649              "peer_count": len(self._peers),
650              "outbound_queue_size": self._outbound_queue.qsize(),
651              "last_ping": self._last_ping.isoformat() if self._last_ping != datetime.min else None,
652          }
653  
654  
655  # =============================================================================
656  # FACTORY
657  # =============================================================================
658  
659  def create_keet_bridge(
660      room_name: str = None,
661      auto_start: bool = True
662  ) -> KeetContextBridge:
663      """
664      Create a Keet context bridge.
665  
666      Args:
667          room_name: Override default room name
668          auto_start: Start the bridge immediately
669  
670      Returns:
671          KeetContextBridge instance
672      """
673      bridge = KeetContextBridge(room_name=room_name)
674  
675      if auto_start:
676          bridge.start()
677  
678      return bridge
679  
680  
681  # =============================================================================
682  # CLI
683  # =============================================================================
684  
685  if __name__ == "__main__":
686      import argparse
687  
688      parser = argparse.ArgumentParser(description="Keet Context Bridge")
689      parser.add_argument("--room", type=str, help="Room name to join")
690      parser.add_argument("--status", action="store_true", help="Show bridge status")
691      parser.add_argument("--ping", action="store_true", help="Ping peers")
692      parser.add_argument("--request", action="store_true", help="Request context from peers")
693      parser.add_argument("--listen", action="store_true", help="Listen for messages")
694  
695      args = parser.parse_args()
696  
697      bridge = KeetContextBridge(room_name=args.room)
698  
699      if args.status:
700          bridge.start()
701          time.sleep(2)
702          status = bridge.get_status()
703          print(json.dumps(status, indent=2))
704          bridge.stop()
705  
706      elif args.ping:
707          bridge.start()
708          time.sleep(2)
709          print(f"Connected: {bridge.is_connected()}")
710          print(f"Peers: {bridge.get_peers()}")
711          bridge.stop()
712  
713      elif args.request:
714          bridge.start()
715          time.sleep(2)
716          print("Requesting context from peers...")
717          context = bridge.request_context(timeout=5.0)
718          if context:
719              print(json.dumps(context, indent=2))
720          else:
721              print("No response from peers")
722          bridge.stop()
723  
724      elif args.listen:
725          print(f"Listening on room: {bridge.room_name}")
726          print("Press Ctrl+C to stop\n")
727  
728          def on_resonance(state, machine):
729              print(f"[RESONANCE] from {machine[:8]}: {state.get('dominant_axiom', 'unknown')}")
730  
731          def on_phoenix(state, machine):
732              print(f"[PHOENIX] from {machine[:8]}: checkpoint {state.get('checkpoint', '?')}")
733  
734          def on_peer(machine, online):
735              status = "online" if online else "offline"
736              print(f"[PEER] {machine[:8]}: {status}")
737  
738          bridge.on_resonance(on_resonance)
739          bridge.on_phoenix(on_phoenix)
740          bridge.on_peer(on_peer)
741  
742          bridge.start()
743          try:
744              while True:
745                  time.sleep(1)
746          except KeyboardInterrupt:
747              print("\nStopping...")
748              bridge.stop()
749  
750      else:
751          parser.print_help()