keet_bridge.py
1 """ 2 Keet Context Bridge - P2P transport for context synchronization 3 4 The operator is the pilot. The mnemonic is their license number. 5 6 This bridge enables Claude instances to share context via Keet P2P rooms, 7 regardless of whether they have file system access. All keys derive from 8 the operator's 2-word BIP-39 mnemonic (e.g., "amber-falcon"). 9 10 See: docs/architecture/OPERATOR_ARCHITECTURE.md 11 12 KEY INSIGHT: The transport layer IS the sandbox escape. 13 - Phone Claude can't write files, but CAN send P2P messages 14 - Desktop Claude receives messages, persists to local store 15 - Phone Claude reads context via P2P, not files 16 17 Architecture: 18 ┌─────────────────────────────────────────────────────────┐ 19 │ KEET CONTEXT ROOM │ 20 │ "sovereign-os-context-{user}" │ 21 ├─────────────────────────────────────────────────────────┤ 22 │ │ 23 │ Phone Claude ←──────────────────────→ Desktop Claude │ 24 │ (no files) P2P messages (has files) │ 25 │ │ 26 │ ContextPacket: │ 27 │ - type: "resonance" | "phoenix" | "attractor" | "ping" │ 28 │ - payload: context data │ 29 │ - machine: source machine ID │ 30 │ - timestamp: ISO8601 │ 31 │ │ 32 └─────────────────────────────────────────────────────────┘ 33 34 Transport modes: 35 1. WEBSOCKET: Connect to local Keet CLI daemon (ws://localhost:7778) 36 2. HTTP: Connect to local Keet HTTP bridge (http://localhost:7778) 37 3. SUBPROCESS: Spawn keet-cli directly (fallback) 38 """ 39 40 import asyncio 41 import json 42 import hashlib 43 import logging 44 import threading 45 import time 46 from dataclasses import dataclass, field 47 from datetime import datetime, timedelta 48 from pathlib import Path 49 from typing import Dict, List, Optional, Any, Callable, Set 50 from enum import Enum 51 import queue 52 53 logger = logging.getLogger(__name__) 54 55 # Try to import async HTTP client 56 try: 57 import aiohttp 58 HAS_AIOHTTP = True 59 except ImportError: 60 HAS_AIOHTTP = False 61 62 # Try to import websockets 63 try: 64 import websockets 65 HAS_WEBSOCKETS = True 66 except ImportError: 67 HAS_WEBSOCKETS = False 68 69 70 # ============================================================================= 71 # PROTOCOL DEFINITIONS 72 # ============================================================================= 73 74 class PacketType(Enum): 75 """Types of context packets.""" 76 RESONANCE = "resonance" # Resonance state update 77 PHOENIX = "phoenix" # Phoenix state checkpoint 78 ATTRACTOR = "attractor" # Attractor update 79 PING = "ping" # Presence ping 80 PONG = "pong" # Presence response 81 REQUEST = "request" # Request context from peers 82 RESPONSE = "response" # Response to request 83 84 85 @dataclass 86 class ContextPacket: 87 """ 88 A packet of context for P2P transport. 89 90 Designed to be compact and self-describing. 91 Includes operator mnemonic prefix for routing. 92 """ 93 type: PacketType 94 machine_id: str 95 timestamp: datetime 96 payload: Dict = field(default_factory=dict) 97 request_id: Optional[str] = None # For request/response correlation 98 operator: Optional[str] = None # Operator mnemonic prefix (e.g., "amber-falcon") 99 100 def to_dict(self) -> Dict: 101 return { 102 "type": self.type.value, 103 "machine_id": self.machine_id, 104 "timestamp": self.timestamp.isoformat(), 105 "payload": self.payload, 106 "request_id": self.request_id, 107 "operator": self.operator, 108 } 109 110 def to_json(self) -> str: 111 return json.dumps(self.to_dict()) 112 113 @classmethod 114 def from_dict(cls, data: Dict) -> "ContextPacket": 115 return cls( 116 type=PacketType(data["type"]), 117 machine_id=data["machine_id"], 118 timestamp=datetime.fromisoformat(data["timestamp"]), 119 payload=data.get("payload", {}), 120 request_id=data.get("request_id"), 121 operator=data.get("operator"), 122 ) 123 124 @classmethod 125 def from_json(cls, json_str: str) -> "ContextPacket": 126 return cls.from_dict(json.loads(json_str)) 127 128 129 # ============================================================================= 130 # KEET BRIDGE 131 # ============================================================================= 132 133 class KeetContextBridge: 134 """ 135 Bridge for synchronizing context via Keet P2P rooms. 136 137 This is the TRANSPORT LAYER that enables: 138 - Sandboxed Claude instances (phone) to share context 139 - Desktop Claude to persist context received from phone 140 - Any Claude to request context from peers 141 142 Usage: 143 bridge = KeetContextBridge(room_name="my-context-room") 144 bridge.start() 145 146 # Send context 147 bridge.broadcast_resonance(resonance_state) 148 bridge.broadcast_phoenix(phoenix_state) 149 150 # Receive context 151 bridge.on_resonance(callback) 152 bridge.on_phoenix(callback) 153 154 # Request context from peers 155 context = await bridge.request_context() 156 """ 157 158 # Default room name (deterministic from user) 159 DEFAULT_ROOM_PREFIX = "sovereign-os-context" 160 161 # Connection endpoints 162 KEET_WS_PORT = 7778 163 KEET_HTTP_PORT = 7778 164 165 # Timeouts 166 CONNECT_TIMEOUT = 5.0 167 REQUEST_TIMEOUT = 10.0 168 PING_INTERVAL = 30.0 169 170 def __init__( 171 self, 172 room_name: str = None, 173 machine_id: str = None, 174 keet_endpoint: str = None, 175 operator_mnemonic: str = None, 176 ): 177 """ 178 Initialize the Keet bridge. 179 180 Args: 181 room_name: Name of the Keet room to join (derived from mnemonic if not provided) 182 machine_id: This machine's identifier 183 keet_endpoint: Override Keet daemon endpoint 184 operator_mnemonic: 2-word BIP-39 mnemonic (e.g., "amber-falcon") 185 """ 186 self.operator_mnemonic = operator_mnemonic or self._load_operator_mnemonic() 187 self.room_name = room_name or self._default_room_name() 188 self.machine_id = machine_id or self._get_machine_id() 189 self.keet_endpoint = keet_endpoint or f"http://localhost:{self.KEET_HTTP_PORT}" 190 191 # Connection state 192 self._connected = False 193 self._peers: Set[str] = set() 194 self._last_ping = datetime.min 195 196 # Message queues 197 self._outbound_queue: queue.Queue = queue.Queue() 198 self._inbound_queue: queue.Queue = queue.Queue() 199 200 # Callbacks 201 self._resonance_callbacks: List[Callable] = [] 202 self._phoenix_callbacks: List[Callable] = [] 203 self._attractor_callbacks: List[Callable] = [] 204 self._peer_callbacks: List[Callable] = [] 205 206 # Request tracking 207 self._pending_requests: Dict[str, asyncio.Future] = {} 208 209 # Background thread 210 self._running = False 211 self._thread: Optional[threading.Thread] = None 212 213 logger.info(f"Keet bridge initialized: room={self.room_name}, machine={self.machine_id[:8]}") 214 215 def _load_operator_mnemonic(self) -> Optional[str]: 216 """Load operator mnemonic from config file.""" 217 import os 218 config_file = Path(os.path.expanduser("~/.sovereign/operator-mnemonic.json")) 219 if config_file.exists(): 220 try: 221 data = json.loads(config_file.read_text()) 222 return data.get("mnemonic") 223 except Exception: 224 pass 225 return None 226 227 def _default_room_name(self) -> str: 228 """Generate default room name from operator mnemonic or username.""" 229 if self.operator_mnemonic: 230 # Room derived from operator mnemonic 231 return f"{self.DEFAULT_ROOM_PREFIX}-{self.operator_mnemonic}" 232 import os 233 username = os.environ.get("USER", "default") 234 return f"{self.DEFAULT_ROOM_PREFIX}-{username}" 235 236 def _get_machine_id(self) -> str: 237 """Get stable machine identifier.""" 238 import socket 239 hostname = socket.gethostname() 240 return hashlib.sha256(hostname.encode()).hexdigest()[:16] 241 242 # ========================================================================= 243 # CONNECTION MANAGEMENT 244 # ========================================================================= 245 246 def start(self) -> bool: 247 """ 248 Start the bridge (connect to room, begin sync). 249 250 Returns: 251 True if started successfully 252 """ 253 if self._running: 254 return True 255 256 self._running = True 257 self._thread = threading.Thread(target=self._run_loop, daemon=True) 258 self._thread.start() 259 260 # Wait for initial connection 261 time.sleep(1) 262 return self._connected 263 264 def stop(self) -> None: 265 """Stop the bridge.""" 266 self._running = False 267 if self._thread: 268 self._thread.join(timeout=2) 269 self._thread = None 270 self._connected = False 271 272 def is_connected(self) -> bool: 273 """Check if connected to room.""" 274 return self._connected 275 276 def get_peers(self) -> Set[str]: 277 """Get set of connected peer machine IDs.""" 278 return self._peers.copy() 279 280 def _run_loop(self) -> None: 281 """Background loop for P2P sync.""" 282 while self._running: 283 try: 284 # Try to connect if not connected 285 if not self._connected: 286 self._try_connect() 287 288 # Process outbound messages 289 self._process_outbound() 290 291 # Process inbound messages 292 self._process_inbound() 293 294 # Ping peers periodically 295 if (datetime.now() - self._last_ping).total_seconds() > self.PING_INTERVAL: 296 self._send_ping() 297 298 except Exception as e: 299 logger.error(f"Error in Keet bridge loop: {e}") 300 self._connected = False 301 302 time.sleep(0.1) 303 304 def _try_connect(self) -> bool: 305 """Try to connect to Keet room.""" 306 try: 307 # Try HTTP endpoint first 308 import urllib.request 309 url = f"{self.keet_endpoint}/status" 310 req = urllib.request.Request(url, method="GET") 311 312 with urllib.request.urlopen(req, timeout=self.CONNECT_TIMEOUT) as resp: 313 if resp.status == 200: 314 self._connected = True 315 logger.info(f"Connected to Keet daemon at {self.keet_endpoint}") 316 317 # Join room 318 self._join_room() 319 return True 320 321 except Exception as e: 322 logger.debug(f"Keet connection failed: {e}") 323 self._connected = False 324 325 return False 326 327 def _join_room(self) -> None: 328 """Join the context room.""" 329 try: 330 import urllib.request 331 url = f"{self.keet_endpoint}/room/join" 332 data = json.dumps({"room": self.room_name}).encode() 333 req = urllib.request.Request(url, data=data, method="POST") 334 req.add_header("Content-Type", "application/json") 335 336 with urllib.request.urlopen(req, timeout=self.CONNECT_TIMEOUT) as resp: 337 if resp.status == 200: 338 logger.info(f"Joined Keet room: {self.room_name}") 339 340 except Exception as e: 341 logger.warning(f"Failed to join room: {e}") 342 343 # ========================================================================= 344 # SENDING 345 # ========================================================================= 346 347 def broadcast_resonance(self, resonance_state: Dict) -> bool: 348 """ 349 Broadcast resonance state to peers. 350 351 Args: 352 resonance_state: Dict with axiom_scores, dominant_axiom, etc. 353 354 Returns: 355 True if queued for sending 356 """ 357 packet = ContextPacket( 358 type=PacketType.RESONANCE, 359 machine_id=self.machine_id, 360 timestamp=datetime.now(), 361 payload=resonance_state, 362 ) 363 return self._queue_outbound(packet) 364 365 def broadcast_phoenix(self, phoenix_state: Dict) -> bool: 366 """ 367 Broadcast Phoenix state to peers. 368 369 Args: 370 phoenix_state: Dict with checkpoint, gravity_wells, etc. 371 372 Returns: 373 True if queued for sending 374 """ 375 packet = ContextPacket( 376 type=PacketType.PHOENIX, 377 machine_id=self.machine_id, 378 timestamp=datetime.now(), 379 payload=phoenix_state, 380 ) 381 return self._queue_outbound(packet) 382 383 def broadcast_attractor(self, topic: str, strength: float, sessions: List[str] = None) -> bool: 384 """ 385 Broadcast attractor update to peers. 386 387 Args: 388 topic: The attractor topic 389 strength: Attractor strength (0-1) 390 sessions: Sessions where this attractor appears 391 392 Returns: 393 True if queued for sending 394 """ 395 packet = ContextPacket( 396 type=PacketType.ATTRACTOR, 397 machine_id=self.machine_id, 398 timestamp=datetime.now(), 399 payload={ 400 "topic": topic, 401 "strength": strength, 402 "sessions": sessions or [], 403 }, 404 ) 405 return self._queue_outbound(packet) 406 407 def request_context(self, timeout: float = None) -> Optional[Dict]: 408 """ 409 Request current context from peers (synchronous). 410 411 Args: 412 timeout: How long to wait for response 413 414 Returns: 415 Dict with combined context from peers, or None if timeout 416 """ 417 timeout = timeout or self.REQUEST_TIMEOUT 418 request_id = hashlib.sha256(f"{self.machine_id}-{time.time()}".encode()).hexdigest()[:16] 419 420 packet = ContextPacket( 421 type=PacketType.REQUEST, 422 machine_id=self.machine_id, 423 timestamp=datetime.now(), 424 request_id=request_id, 425 ) 426 427 # Send request 428 self._queue_outbound(packet) 429 430 # Wait for response 431 start = time.time() 432 while time.time() - start < timeout: 433 # Check inbound queue for response 434 try: 435 response = self._inbound_queue.get(timeout=0.1) 436 if response.type == PacketType.RESPONSE and response.request_id == request_id: 437 return response.payload 438 else: 439 # Put back non-matching packets 440 self._inbound_queue.put(response) 441 except queue.Empty: 442 pass 443 444 return None 445 446 def _send_ping(self) -> None: 447 """Send presence ping to room.""" 448 packet = ContextPacket( 449 type=PacketType.PING, 450 machine_id=self.machine_id, 451 timestamp=datetime.now(), 452 ) 453 self._queue_outbound(packet) 454 self._last_ping = datetime.now() 455 456 def _queue_outbound(self, packet: ContextPacket) -> bool: 457 """Queue a packet for sending.""" 458 if not self._connected: 459 logger.debug("Not connected, packet queued for later") 460 461 self._outbound_queue.put(packet) 462 return True 463 464 def _process_outbound(self) -> None: 465 """Process outbound queue.""" 466 while not self._outbound_queue.empty(): 467 try: 468 packet = self._outbound_queue.get_nowait() 469 self._send_to_room(packet) 470 except queue.Empty: 471 break 472 473 def _send_to_room(self, packet: ContextPacket) -> bool: 474 """Send packet to Keet room.""" 475 if not self._connected: 476 return False 477 478 try: 479 import urllib.request 480 url = f"{self.keet_endpoint}/room/message" 481 data = json.dumps({ 482 "room": self.room_name, 483 "message": packet.to_json(), 484 }).encode() 485 req = urllib.request.Request(url, data=data, method="POST") 486 req.add_header("Content-Type", "application/json") 487 488 with urllib.request.urlopen(req, timeout=self.CONNECT_TIMEOUT) as resp: 489 return resp.status == 200 490 491 except Exception as e: 492 logger.debug(f"Failed to send to room: {e}") 493 return False 494 495 # ========================================================================= 496 # RECEIVING 497 # ========================================================================= 498 499 def on_resonance(self, callback: Callable[[Dict, str], None]) -> None: 500 """ 501 Register callback for resonance updates. 502 503 Args: 504 callback: Function(resonance_state, machine_id) 505 """ 506 self._resonance_callbacks.append(callback) 507 508 def on_phoenix(self, callback: Callable[[Dict, str], None]) -> None: 509 """ 510 Register callback for Phoenix updates. 511 512 Args: 513 callback: Function(phoenix_state, machine_id) 514 """ 515 self._phoenix_callbacks.append(callback) 516 517 def on_attractor(self, callback: Callable[[str, float, str], None]) -> None: 518 """ 519 Register callback for attractor updates. 520 521 Args: 522 callback: Function(topic, strength, machine_id) 523 """ 524 self._attractor_callbacks.append(callback) 525 526 def on_peer(self, callback: Callable[[str, bool], None]) -> None: 527 """ 528 Register callback for peer presence changes. 529 530 Args: 531 callback: Function(machine_id, is_online) 532 """ 533 self._peer_callbacks.append(callback) 534 535 def _process_inbound(self) -> None: 536 """Process inbound messages from room.""" 537 if not self._connected: 538 return 539 540 try: 541 # Poll for new messages 542 import urllib.request 543 url = f"{self.keet_endpoint}/room/messages?room={self.room_name}&since=last" 544 req = urllib.request.Request(url, method="GET") 545 546 with urllib.request.urlopen(req, timeout=1.0) as resp: 547 if resp.status == 200: 548 data = json.loads(resp.read().decode()) 549 for msg in data.get("messages", []): 550 self._handle_message(msg) 551 552 except Exception as e: 553 # Polling failures are normal, don't log 554 pass 555 556 def _handle_message(self, msg: Dict) -> None: 557 """Handle an incoming message.""" 558 try: 559 # Parse as ContextPacket 560 if isinstance(msg, str): 561 packet = ContextPacket.from_json(msg) 562 elif isinstance(msg, dict) and "type" in msg: 563 packet = ContextPacket.from_dict(msg) 564 else: 565 # Try extracting from message wrapper 566 content = msg.get("content") or msg.get("message") or msg.get("data") 567 if content: 568 packet = ContextPacket.from_json(content) if isinstance(content, str) else ContextPacket.from_dict(content) 569 else: 570 return 571 572 # Ignore our own messages 573 if packet.machine_id == self.machine_id: 574 return 575 576 # Track peer 577 self._peers.add(packet.machine_id) 578 579 # Dispatch by type 580 if packet.type == PacketType.RESONANCE: 581 for cb in self._resonance_callbacks: 582 cb(packet.payload, packet.machine_id) 583 584 elif packet.type == PacketType.PHOENIX: 585 for cb in self._phoenix_callbacks: 586 cb(packet.payload, packet.machine_id) 587 588 elif packet.type == PacketType.ATTRACTOR: 589 topic = packet.payload.get("topic") 590 strength = packet.payload.get("strength", 0.5) 591 for cb in self._attractor_callbacks: 592 cb(topic, strength, packet.machine_id) 593 594 elif packet.type == PacketType.PING: 595 # Respond with pong 596 self._send_pong(packet.machine_id) 597 for cb in self._peer_callbacks: 598 cb(packet.machine_id, True) 599 600 elif packet.type == PacketType.REQUEST: 601 # Respond with our current context 602 self._handle_context_request(packet) 603 604 elif packet.type == PacketType.RESPONSE: 605 # Queue for request handler 606 self._inbound_queue.put(packet) 607 608 except Exception as e: 609 logger.debug(f"Failed to handle message: {e}") 610 611 def _send_pong(self, to_machine: str) -> None: 612 """Send pong response.""" 613 packet = ContextPacket( 614 type=PacketType.PONG, 615 machine_id=self.machine_id, 616 timestamp=datetime.now(), 617 payload={"to": to_machine}, 618 ) 619 self._queue_outbound(packet) 620 621 def _handle_context_request(self, request: ContextPacket) -> None: 622 """Handle a context request from peer.""" 623 # Build response with our current context 624 # This will be populated by the context controller 625 response = ContextPacket( 626 type=PacketType.RESPONSE, 627 machine_id=self.machine_id, 628 timestamp=datetime.now(), 629 request_id=request.request_id, 630 payload={ 631 "from_machine": self.machine_id, 632 "has_context": False, # Will be overridden 633 }, 634 ) 635 self._queue_outbound(response) 636 637 # ========================================================================= 638 # STATUS 639 # ========================================================================= 640 641 def get_status(self) -> Dict: 642 """Get bridge status.""" 643 return { 644 "connected": self._connected, 645 "room_name": self.room_name, 646 "machine_id": self.machine_id, 647 "keet_endpoint": self.keet_endpoint, 648 "peers": list(self._peers), 649 "peer_count": len(self._peers), 650 "outbound_queue_size": self._outbound_queue.qsize(), 651 "last_ping": self._last_ping.isoformat() if self._last_ping != datetime.min else None, 652 } 653 654 655 # ============================================================================= 656 # FACTORY 657 # ============================================================================= 658 659 def create_keet_bridge( 660 room_name: str = None, 661 auto_start: bool = True 662 ) -> KeetContextBridge: 663 """ 664 Create a Keet context bridge. 665 666 Args: 667 room_name: Override default room name 668 auto_start: Start the bridge immediately 669 670 Returns: 671 KeetContextBridge instance 672 """ 673 bridge = KeetContextBridge(room_name=room_name) 674 675 if auto_start: 676 bridge.start() 677 678 return bridge 679 680 681 # ============================================================================= 682 # CLI 683 # ============================================================================= 684 685 if __name__ == "__main__": 686 import argparse 687 688 parser = argparse.ArgumentParser(description="Keet Context Bridge") 689 parser.add_argument("--room", type=str, help="Room name to join") 690 parser.add_argument("--status", action="store_true", help="Show bridge status") 691 parser.add_argument("--ping", action="store_true", help="Ping peers") 692 parser.add_argument("--request", action="store_true", help="Request context from peers") 693 parser.add_argument("--listen", action="store_true", help="Listen for messages") 694 695 args = parser.parse_args() 696 697 bridge = KeetContextBridge(room_name=args.room) 698 699 if args.status: 700 bridge.start() 701 time.sleep(2) 702 status = bridge.get_status() 703 print(json.dumps(status, indent=2)) 704 bridge.stop() 705 706 elif args.ping: 707 bridge.start() 708 time.sleep(2) 709 print(f"Connected: {bridge.is_connected()}") 710 print(f"Peers: {bridge.get_peers()}") 711 bridge.stop() 712 713 elif args.request: 714 bridge.start() 715 time.sleep(2) 716 print("Requesting context from peers...") 717 context = bridge.request_context(timeout=5.0) 718 if context: 719 print(json.dumps(context, indent=2)) 720 else: 721 print("No response from peers") 722 bridge.stop() 723 724 elif args.listen: 725 print(f"Listening on room: {bridge.room_name}") 726 print("Press Ctrl+C to stop\n") 727 728 def on_resonance(state, machine): 729 print(f"[RESONANCE] from {machine[:8]}: {state.get('dominant_axiom', 'unknown')}") 730 731 def on_phoenix(state, machine): 732 print(f"[PHOENIX] from {machine[:8]}: checkpoint {state.get('checkpoint', '?')}") 733 734 def on_peer(machine, online): 735 status = "online" if online else "offline" 736 print(f"[PEER] {machine[:8]}: {status}") 737 738 bridge.on_resonance(on_resonance) 739 bridge.on_phoenix(on_phoenix) 740 bridge.on_peer(on_peer) 741 742 bridge.start() 743 try: 744 while True: 745 time.sleep(1) 746 except KeyboardInterrupt: 747 print("\nStopping...") 748 bridge.stop() 749 750 else: 751 parser.print_help()