/ plugins / google_meet / node / server.py
server.py
  1  """Remote node server.
  2  
  3  Runs on the machine that will host the Meet bot (typically the user's
  4  Mac laptop with a signed-in Chrome). Exposes a WebSocket endpoint that
  5  accepts signed RPC requests and dispatches them to the existing
  6  ``plugins.google_meet.process_manager`` module.
  7  
  8  Launched by ``hermes meet node run``.
  9  
 10  Token handling
 11  --------------
 12  On first boot we mint 32 hex chars of entropy and persist them at
 13  ``$HERMES_HOME/workspace/meetings/node_token.json``. Subsequent boots
 14  reuse the same token so previously-approved gateways don't need to be
 15  re-paired. The operator copies this token out-of-band to the gateway
 16  via ``hermes meet node approve <name> <url> <token>``.
 17  
 18  Dependencies
 19  ------------
 20  ``websockets`` is an optional dep. We import it lazily inside
 21  :meth:`serve` so installing the plugin doesn't require it unless you
 22  actually host a node.
 23  """
 24  
 25  from __future__ import annotations
 26  
 27  import json
 28  import secrets
 29  import time
 30  from pathlib import Path
 31  from typing import Any, Dict, Optional
 32  
 33  from hermes_constants import get_hermes_home
 34  from plugins.google_meet.node import protocol as _proto
 35  
 36  
 37  def _default_token_path() -> Path:
 38      return Path(get_hermes_home()) / "workspace" / "meetings" / "node_token.json"
 39  
 40  
 41  class NodeServer:
 42      """WebSocket server that executes meet bot RPCs locally."""
 43  
 44      def __init__(
 45          self,
 46          host: str = "127.0.0.1",
 47          port: int = 18789,
 48          token_path: Optional[Path] = None,
 49          display_name: str = "hermes-meet-node",
 50      ) -> None:
 51          self.host = host
 52          self.port = port
 53          self.display_name = display_name
 54          self.token_path = Path(token_path) if token_path is not None else _default_token_path()
 55          self._token: Optional[str] = None
 56  
 57      # ----- token management --------------------------------------------
 58  
 59      def ensure_token(self) -> str:
 60          """Return the persisted shared secret, generating one on first use."""
 61          if self._token:
 62              return self._token
 63          if self.token_path.is_file():
 64              try:
 65                  data = json.loads(self.token_path.read_text(encoding="utf-8"))
 66                  tok = data.get("token")
 67                  if isinstance(tok, str) and tok:
 68                      self._token = tok
 69                      return tok
 70              except (OSError, json.JSONDecodeError):
 71                  pass
 72          tok = secrets.token_hex(16)  # 32 hex chars
 73          self.token_path.parent.mkdir(parents=True, exist_ok=True)
 74          tmp = self.token_path.with_suffix(".json.tmp")
 75          tmp.write_text(
 76              json.dumps({"token": tok, "generated_at": time.time()}, indent=2),
 77              encoding="utf-8",
 78          )
 79          # Restrict to owner-read-write only — the token grants full RPC
 80          # access to the meet bot (start, transcribe, speak in meetings).
 81          try:
 82              tmp.chmod(0o600)
 83          except (OSError, NotImplementedError):
 84              # Best-effort on non-POSIX filesystems; mode is set on POSIX.
 85              pass
 86          tmp.replace(self.token_path)
 87          self._token = tok
 88          return tok
 89  
 90      def get_token(self) -> str:
 91          """Alias for :meth:`ensure_token`; does not mutate on subsequent calls."""
 92          return self.ensure_token()
 93  
 94      # ----- dispatch -----------------------------------------------------
 95  
 96      async def _handle_request(self, msg: Dict[str, Any]) -> Dict[str, Any]:
 97          """Validate + dispatch a single decoded request envelope.
 98  
 99          Always returns a response envelope (success or error); never
100          raises. Errors from inside the process_manager are wrapped into
101          the response payload's ``ok``/``error`` keys (which pm already
102          does) rather than being re-encoded as error envelopes — the
103          envelope-level error channel is reserved for auth / protocol
104          failures.
105          """
106          expected = self.ensure_token()
107          ok, reason = _proto.validate_request(msg, expected)
108          if not ok:
109              return _proto.make_error(str(msg.get("id") or ""), reason)
110  
111          req_id = msg["id"]
112          t = msg["type"]
113          payload = msg["payload"]
114  
115          # Import lazily so test mocks can monkeypatch freely.
116          from plugins.google_meet import process_manager as pm
117  
118          try:
119              if t == "ping":
120                  return {"type": "pong", "id": req_id,
121                          "payload": {"display_name": self.display_name,
122                                      "ts": time.time()}}
123              if t == "start_bot":
124                  # Whitelist kwargs we pass through to pm.start.
125                  kwargs = {
126                      k: payload[k]
127                      for k in ("url", "guest_name", "duration", "headed",
128                                "auth_state", "session_id", "out_dir")
129                      if k in payload
130                  }
131                  if "url" not in kwargs:
132                      return _proto.make_error(req_id, "missing 'url' in payload")
133                  result = pm.start(**kwargs)
134                  return _proto.make_response(req_id, result)
135              if t == "stop":
136                  reason_arg = payload.get("reason", "requested")
137                  result = pm.stop(reason=reason_arg)
138                  return _proto.make_response(req_id, result)
139              if t == "status":
140                  return _proto.make_response(req_id, pm.status())
141              if t == "transcript":
142                  last = payload.get("last")
143                  result = pm.transcript(last=last)
144                  return _proto.make_response(req_id, result)
145              if t == "say":
146                  # v2 wiring: enqueue into say_queue.jsonl inside the
147                  # active meeting's out_dir when present. The bot-side
148                  # consumer is v3+ (for v1 this is a stub returning ok).
149                  text = payload.get("text", "")
150                  active = pm._read_active()  # type: ignore[attr-defined]
151                  enqueued = False
152                  if active and active.get("out_dir"):
153                      queue = Path(active["out_dir"]) / "say_queue.jsonl"
154                      try:
155                          queue.parent.mkdir(parents=True, exist_ok=True)
156                          with queue.open("a", encoding="utf-8") as fh:
157                              fh.write(json.dumps({"text": text, "ts": time.time()}) + "\n")
158                          enqueued = True
159                      except OSError:
160                          enqueued = False
161                  return _proto.make_response(
162                      req_id,
163                      {"ok": True, "enqueued": enqueued, "text": text},
164                  )
165          except Exception as exc:  # noqa: BLE001 — surface any pm crash to client
166              return _proto.make_error(req_id, f"{type(exc).__name__}: {exc}")
167  
168          return _proto.make_error(req_id, f"unhandled type: {t!r}")
169  
170      # ----- server loop --------------------------------------------------
171  
172      async def serve(self) -> None:
173          """Run the WebSocket server until cancelled.
174  
175          Blocks forever. Callers typically wrap this in ``asyncio.run``.
176          """
177          try:
178              import websockets  # type: ignore
179          except ImportError as exc:
180              raise RuntimeError(
181                  "NodeServer.serve requires the 'websockets' package. "
182                  "Install it with: pip install websockets"
183              ) from exc
184  
185          self.ensure_token()
186  
187          async def _handler(ws):
188              async for raw in ws:
189                  try:
190                      msg = _proto.decode(raw if isinstance(raw, str) else raw.decode("utf-8"))
191                  except ValueError as exc:
192                      await ws.send(_proto.encode(_proto.make_error("", f"decode: {exc}")))
193                      continue
194                  reply = await self._handle_request(msg)
195                  await ws.send(_proto.encode(reply))
196  
197          async with websockets.serve(_handler, self.host, self.port):
198              # Run until cancelled.
199              import asyncio
200              await asyncio.Future()