server.py
1 """Remote node server. 2 3 Runs on the machine that will host the Meet bot (typically the user's 4 Mac laptop with a signed-in Chrome). Exposes a WebSocket endpoint that 5 accepts signed RPC requests and dispatches them to the existing 6 ``plugins.google_meet.process_manager`` module. 7 8 Launched by ``hermes meet node run``. 9 10 Token handling 11 -------------- 12 On first boot we mint 32 hex chars of entropy and persist them at 13 ``$HERMES_HOME/workspace/meetings/node_token.json``. Subsequent boots 14 reuse the same token so previously-approved gateways don't need to be 15 re-paired. The operator copies this token out-of-band to the gateway 16 via ``hermes meet node approve <name> <url> <token>``. 17 18 Dependencies 19 ------------ 20 ``websockets`` is an optional dep. We import it lazily inside 21 :meth:`serve` so installing the plugin doesn't require it unless you 22 actually host a node. 23 """ 24 25 from __future__ import annotations 26 27 import json 28 import secrets 29 import time 30 from pathlib import Path 31 from typing import Any, Dict, Optional 32 33 from hermes_constants import get_hermes_home 34 from plugins.google_meet.node import protocol as _proto 35 36 37 def _default_token_path() -> Path: 38 return Path(get_hermes_home()) / "workspace" / "meetings" / "node_token.json" 39 40 41 class NodeServer: 42 """WebSocket server that executes meet bot RPCs locally.""" 43 44 def __init__( 45 self, 46 host: str = "127.0.0.1", 47 port: int = 18789, 48 token_path: Optional[Path] = None, 49 display_name: str = "hermes-meet-node", 50 ) -> None: 51 self.host = host 52 self.port = port 53 self.display_name = display_name 54 self.token_path = Path(token_path) if token_path is not None else _default_token_path() 55 self._token: Optional[str] = None 56 57 # ----- token management -------------------------------------------- 58 59 def ensure_token(self) -> str: 60 """Return the persisted shared secret, generating one on first use.""" 61 if self._token: 62 return self._token 63 if self.token_path.is_file(): 64 try: 65 data = json.loads(self.token_path.read_text(encoding="utf-8")) 66 tok = data.get("token") 67 if isinstance(tok, str) and tok: 68 self._token = tok 69 return tok 70 except (OSError, json.JSONDecodeError): 71 pass 72 tok = secrets.token_hex(16) # 32 hex chars 73 self.token_path.parent.mkdir(parents=True, exist_ok=True) 74 tmp = self.token_path.with_suffix(".json.tmp") 75 tmp.write_text( 76 json.dumps({"token": tok, "generated_at": time.time()}, indent=2), 77 encoding="utf-8", 78 ) 79 # Restrict to owner-read-write only — the token grants full RPC 80 # access to the meet bot (start, transcribe, speak in meetings). 81 try: 82 tmp.chmod(0o600) 83 except (OSError, NotImplementedError): 84 # Best-effort on non-POSIX filesystems; mode is set on POSIX. 85 pass 86 tmp.replace(self.token_path) 87 self._token = tok 88 return tok 89 90 def get_token(self) -> str: 91 """Alias for :meth:`ensure_token`; does not mutate on subsequent calls.""" 92 return self.ensure_token() 93 94 # ----- dispatch ----------------------------------------------------- 95 96 async def _handle_request(self, msg: Dict[str, Any]) -> Dict[str, Any]: 97 """Validate + dispatch a single decoded request envelope. 98 99 Always returns a response envelope (success or error); never 100 raises. Errors from inside the process_manager are wrapped into 101 the response payload's ``ok``/``error`` keys (which pm already 102 does) rather than being re-encoded as error envelopes — the 103 envelope-level error channel is reserved for auth / protocol 104 failures. 105 """ 106 expected = self.ensure_token() 107 ok, reason = _proto.validate_request(msg, expected) 108 if not ok: 109 return _proto.make_error(str(msg.get("id") or ""), reason) 110 111 req_id = msg["id"] 112 t = msg["type"] 113 payload = msg["payload"] 114 115 # Import lazily so test mocks can monkeypatch freely. 116 from plugins.google_meet import process_manager as pm 117 118 try: 119 if t == "ping": 120 return {"type": "pong", "id": req_id, 121 "payload": {"display_name": self.display_name, 122 "ts": time.time()}} 123 if t == "start_bot": 124 # Whitelist kwargs we pass through to pm.start. 125 kwargs = { 126 k: payload[k] 127 for k in ("url", "guest_name", "duration", "headed", 128 "auth_state", "session_id", "out_dir") 129 if k in payload 130 } 131 if "url" not in kwargs: 132 return _proto.make_error(req_id, "missing 'url' in payload") 133 result = pm.start(**kwargs) 134 return _proto.make_response(req_id, result) 135 if t == "stop": 136 reason_arg = payload.get("reason", "requested") 137 result = pm.stop(reason=reason_arg) 138 return _proto.make_response(req_id, result) 139 if t == "status": 140 return _proto.make_response(req_id, pm.status()) 141 if t == "transcript": 142 last = payload.get("last") 143 result = pm.transcript(last=last) 144 return _proto.make_response(req_id, result) 145 if t == "say": 146 # v2 wiring: enqueue into say_queue.jsonl inside the 147 # active meeting's out_dir when present. The bot-side 148 # consumer is v3+ (for v1 this is a stub returning ok). 149 text = payload.get("text", "") 150 active = pm._read_active() # type: ignore[attr-defined] 151 enqueued = False 152 if active and active.get("out_dir"): 153 queue = Path(active["out_dir"]) / "say_queue.jsonl" 154 try: 155 queue.parent.mkdir(parents=True, exist_ok=True) 156 with queue.open("a", encoding="utf-8") as fh: 157 fh.write(json.dumps({"text": text, "ts": time.time()}) + "\n") 158 enqueued = True 159 except OSError: 160 enqueued = False 161 return _proto.make_response( 162 req_id, 163 {"ok": True, "enqueued": enqueued, "text": text}, 164 ) 165 except Exception as exc: # noqa: BLE001 — surface any pm crash to client 166 return _proto.make_error(req_id, f"{type(exc).__name__}: {exc}") 167 168 return _proto.make_error(req_id, f"unhandled type: {t!r}") 169 170 # ----- server loop -------------------------------------------------- 171 172 async def serve(self) -> None: 173 """Run the WebSocket server until cancelled. 174 175 Blocks forever. Callers typically wrap this in ``asyncio.run``. 176 """ 177 try: 178 import websockets # type: ignore 179 except ImportError as exc: 180 raise RuntimeError( 181 "NodeServer.serve requires the 'websockets' package. " 182 "Install it with: pip install websockets" 183 ) from exc 184 185 self.ensure_token() 186 187 async def _handler(ws): 188 async for raw in ws: 189 try: 190 msg = _proto.decode(raw if isinstance(raw, str) else raw.decode("utf-8")) 191 except ValueError as exc: 192 await ws.send(_proto.encode(_proto.make_error("", f"decode: {exc}"))) 193 continue 194 reply = await self._handle_request(msg) 195 await ws.send(_proto.encode(reply)) 196 197 async with websockets.serve(_handler, self.host, self.port): 198 # Run until cancelled. 199 import asyncio 200 await asyncio.Future()