/ plugins / google_meet / process_manager.py
process_manager.py
  1  """Subprocess lifecycle manager for the google_meet bot.
  2  
  3  Single active meeting at a time. Stores the running pid + out_dir in a
  4  session-scoped state file under ``$HERMES_HOME/workspace/meetings/.active.json``
  5  so tool calls across turns can find the bot, and ``on_session_end`` can clean
  6  it up.
  7  
  8  The bot runs as a detached subprocess — we don't hold file descriptors open,
  9  so the parent agent loop can't block on it. We communicate via files only.
 10  """
 11  
 12  from __future__ import annotations
 13  
 14  import json
 15  import os
 16  import signal
 17  import subprocess
 18  import sys
 19  import time
 20  from pathlib import Path
 21  from typing import Any, Dict, Optional
 22  
 23  from hermes_constants import get_hermes_home
 24  
 25  # File + directory layout (under $HERMES_HOME):
 26  #
 27  #   workspace/meetings/
 28  #       .active.json                # pointer to current session's bot
 29  #       <meeting-id>/
 30  #           status.json             # live bot state (written by bot each tick)
 31  #           transcript.txt          # scraped captions
 32  #
 33  # .active.json holds:
 34  #   {"pid": 12345, "meeting_id": "abc-defg-hij", "out_dir": "...",
 35  #    "url": "https://meet.google.com/...", "started_at": 1714159200.0,
 36  #    "session_id": "optional"}
 37  
 38  
 39  def _root() -> Path:
 40      return Path(get_hermes_home()) / "workspace" / "meetings"
 41  
 42  
 43  def _active_file() -> Path:
 44      return _root() / ".active.json"
 45  
 46  
 47  def _read_active() -> Optional[Dict[str, Any]]:
 48      p = _active_file()
 49      if not p.is_file():
 50          return None
 51      try:
 52          return json.loads(p.read_text(encoding="utf-8"))
 53      except Exception:
 54          return None
 55  
 56  
 57  def _write_active(data: Dict[str, Any]) -> None:
 58      p = _active_file()
 59      p.parent.mkdir(parents=True, exist_ok=True)
 60      tmp = p.with_suffix(".json.tmp")
 61      tmp.write_text(json.dumps(data, indent=2), encoding="utf-8")
 62      tmp.replace(p)
 63  
 64  
 65  def _clear_active() -> None:
 66      try:
 67          _active_file().unlink()
 68      except FileNotFoundError:
 69          pass
 70  
 71  
 72  def _pid_alive(pid: int) -> bool:
 73      try:
 74          os.kill(pid, 0)
 75      except ProcessLookupError:
 76          return False
 77      except PermissionError:
 78          # Process exists but we can't signal it — treat as alive.
 79          return True
 80      return True
 81  
 82  
 83  # ---------------------------------------------------------------------------
 84  # Public API — used by tool handlers + CLI
 85  # ---------------------------------------------------------------------------
 86  
 87  def start(
 88      url: str,
 89      *,
 90      out_dir: Optional[Path] = None,
 91      headed: bool = False,
 92      auth_state: Optional[str] = None,
 93      guest_name: str = "Hermes Agent",
 94      duration: Optional[str] = None,
 95      session_id: Optional[str] = None,
 96      mode: str = "transcribe",
 97      realtime_model: Optional[str] = None,
 98      realtime_voice: Optional[str] = None,
 99      realtime_instructions: Optional[str] = None,
100      realtime_api_key: Optional[str] = None,
101  ) -> Dict[str, Any]:
102      """Spawn the meet_bot subprocess for *url*.
103  
104      If a bot is already running for this hermes install, leave it first —
105      we enforce single-active-meeting semantics.
106  
107      Returns a dict summarizing the started bot.
108      """
109      from plugins.google_meet.meet_bot import _is_safe_meet_url, _meeting_id_from_url
110  
111      if not _is_safe_meet_url(url):
112          return {
113              "ok": False,
114              "error": (
115                  "refusing: only https://meet.google.com/ URLs are allowed. "
116                  "got: " + repr(url)
117              ),
118          }
119  
120      existing = _read_active()
121      if existing and _pid_alive(int(existing.get("pid", 0))):
122          stop(reason="replaced by new meet_join")
123  
124      meeting_id = _meeting_id_from_url(url)
125      out = out_dir or (_root() / meeting_id)
126      out.mkdir(parents=True, exist_ok=True)
127  
128      # Wipe any stale transcript/status files from a previous run of this
129      # meeting id so polling isn't confused.
130      for name in ("transcript.txt", "status.json"):
131          f = out / name
132          if f.exists():
133              try:
134                  f.unlink()
135              except OSError:
136                  pass
137  
138      env = os.environ.copy()
139      env["HERMES_MEET_URL"] = url
140      env["HERMES_MEET_OUT_DIR"] = str(out)
141      env["HERMES_MEET_GUEST_NAME"] = guest_name
142      if headed:
143          env["HERMES_MEET_HEADED"] = "1"
144      if auth_state:
145          env["HERMES_MEET_AUTH_STATE"] = auth_state
146      if duration:
147          env["HERMES_MEET_DURATION"] = duration
148      # v2: realtime mode + passthroughs. The bot defaults to transcribe
149      # mode if HERMES_MEET_MODE isn't set, matching v1 behavior.
150      if mode:
151          env["HERMES_MEET_MODE"] = mode
152      if realtime_model:
153          env["HERMES_MEET_REALTIME_MODEL"] = realtime_model
154      if realtime_voice:
155          env["HERMES_MEET_REALTIME_VOICE"] = realtime_voice
156      if realtime_instructions:
157          env["HERMES_MEET_REALTIME_INSTRUCTIONS"] = realtime_instructions
158      if realtime_api_key:
159          env["HERMES_MEET_REALTIME_KEY"] = realtime_api_key
160  
161      log_path = out / "bot.log"
162      # Detach: stdin=devnull, stdout/stderr → log file, new session so parent
163      # signals don't propagate.
164      log_fh = open(log_path, "ab", buffering=0)
165      try:
166          proc = subprocess.Popen(
167              [sys.executable, "-m", "plugins.google_meet.meet_bot"],
168              stdin=subprocess.DEVNULL,
169              stdout=log_fh,
170              stderr=subprocess.STDOUT,
171              env=env,
172              start_new_session=True,
173              close_fds=True,
174          )
175      finally:
176          # The subprocess now owns the log fd; we can close ours.
177          log_fh.close()
178  
179      record = {
180          "pid": proc.pid,
181          "meeting_id": meeting_id,
182          "out_dir": str(out),
183          "url": url,
184          "started_at": time.time(),
185          "session_id": session_id,
186          "log_path": str(log_path),
187          "mode": mode,
188      }
189      _write_active(record)
190      return {"ok": True, **record}
191  
192  
193  def status() -> Dict[str, Any]:
194      """Return the current meeting state, or ``{"ok": False, "reason": ...}``."""
195      active = _read_active()
196      if not active:
197          return {"ok": False, "reason": "no active meeting"}
198  
199      pid = int(active.get("pid", 0))
200      alive = _pid_alive(pid) if pid else False
201  
202      status_path = Path(active.get("out_dir", "")) / "status.json"
203      bot_status: Dict[str, Any] = {}
204      if status_path.is_file():
205          try:
206              bot_status = json.loads(status_path.read_text(encoding="utf-8"))
207          except Exception:
208              pass
209  
210      return {
211          "ok": True,
212          "alive": alive,
213          "pid": pid,
214          "meetingId": active.get("meeting_id"),
215          "url": active.get("url"),
216          "startedAt": active.get("started_at"),
217          "outDir": active.get("out_dir"),
218          **bot_status,
219      }
220  
221  
222  def transcript(last: Optional[int] = None) -> Dict[str, Any]:
223      """Read the current transcript file. Returns ok=False if none exists."""
224      active = _read_active()
225      if not active:
226          return {"ok": False, "reason": "no active meeting"}
227  
228      tp = Path(active.get("out_dir", "")) / "transcript.txt"
229      if not tp.is_file():
230          return {
231              "ok": True,
232              "meetingId": active.get("meeting_id"),
233              "lines": [],
234              "total": 0,
235              "path": str(tp),
236          }
237      text = tp.read_text(encoding="utf-8", errors="replace")
238      all_lines = [ln for ln in text.splitlines() if ln.strip()]
239      lines = all_lines[-last:] if last else all_lines
240      return {
241          "ok": True,
242          "meetingId": active.get("meeting_id"),
243          "lines": lines,
244          "total": len(all_lines),
245          "path": str(tp),
246      }
247  
248  
249  def enqueue_say(text: str) -> Dict[str, Any]:
250      """Append a ``say`` request to the active bot's JSONL queue.
251  
252      Returns ``{"ok": False, "reason": ...}`` when no meeting is active or
253      the active bot is in transcribe-only mode. Otherwise writes a line to
254      ``<out_dir>/say_queue.jsonl`` that the bot's realtime speaker thread
255      will consume.
256      """
257      import uuid
258  
259      text = (text or "").strip()
260      if not text:
261          return {"ok": False, "reason": "text is required"}
262  
263      active = _read_active()
264      if not active:
265          return {"ok": False, "reason": "no active meeting"}
266      if active.get("mode") != "realtime":
267          return {
268              "ok": False,
269              "reason": (
270                  "active meeting is in transcribe mode — pass mode='realtime' "
271                  "to meet_join to enable agent speech"
272              ),
273          }
274  
275      out_dir = Path(active.get("out_dir", ""))
276      if not out_dir.is_dir():
277          return {"ok": False, "reason": f"out_dir missing: {out_dir}"}
278  
279      queue_path = out_dir / "say_queue.jsonl"
280      entry = {"id": uuid.uuid4().hex[:12], "text": text}
281      with queue_path.open("a", encoding="utf-8") as f:
282          f.write(json.dumps(entry) + "\n")
283      return {
284          "ok": True,
285          "meetingId": active.get("meeting_id"),
286          "enqueued_id": entry["id"],
287          "queue_path": str(queue_path),
288      }
289  
290  
291  def stop(*, reason: str = "requested") -> Dict[str, Any]:
292      """Signal the active bot to leave cleanly, then clear the active pointer.
293  
294      Sends SIGTERM and waits up to 10s for the bot to exit. Falls back to
295      SIGKILL if the bot doesn't respond.
296      """
297      active = _read_active()
298      if not active:
299          return {"ok": False, "reason": "no active meeting"}
300  
301      pid = int(active.get("pid", 0))
302      out_dir = active.get("out_dir")
303      transcript_path = Path(out_dir) / "transcript.txt" if out_dir else None
304  
305      if pid and _pid_alive(pid):
306          try:
307              os.kill(pid, signal.SIGTERM)
308          except ProcessLookupError:
309              pass
310          for _ in range(20):
311              if not _pid_alive(pid):
312                  break
313              time.sleep(0.5)
314          if _pid_alive(pid):
315              try:
316                  os.kill(pid, signal.SIGKILL)
317              except ProcessLookupError:
318                  pass
319  
320      _clear_active()
321      return {
322          "ok": True,
323          "reason": reason,
324          "meetingId": active.get("meeting_id"),
325          "transcriptPath": str(transcript_path) if transcript_path else None,
326      }