process_manager.py
1 """Subprocess lifecycle manager for the google_meet bot. 2 3 Single active meeting at a time. Stores the running pid + out_dir in a 4 session-scoped state file under ``$HERMES_HOME/workspace/meetings/.active.json`` 5 so tool calls across turns can find the bot, and ``on_session_end`` can clean 6 it up. 7 8 The bot runs as a detached subprocess — we don't hold file descriptors open, 9 so the parent agent loop can't block on it. We communicate via files only. 10 """ 11 12 from __future__ import annotations 13 14 import json 15 import os 16 import signal 17 import subprocess 18 import sys 19 import time 20 from pathlib import Path 21 from typing import Any, Dict, Optional 22 23 from hermes_constants import get_hermes_home 24 25 # File + directory layout (under $HERMES_HOME): 26 # 27 # workspace/meetings/ 28 # .active.json # pointer to current session's bot 29 # <meeting-id>/ 30 # status.json # live bot state (written by bot each tick) 31 # transcript.txt # scraped captions 32 # 33 # .active.json holds: 34 # {"pid": 12345, "meeting_id": "abc-defg-hij", "out_dir": "...", 35 # "url": "https://meet.google.com/...", "started_at": 1714159200.0, 36 # "session_id": "optional"} 37 38 39 def _root() -> Path: 40 return Path(get_hermes_home()) / "workspace" / "meetings" 41 42 43 def _active_file() -> Path: 44 return _root() / ".active.json" 45 46 47 def _read_active() -> Optional[Dict[str, Any]]: 48 p = _active_file() 49 if not p.is_file(): 50 return None 51 try: 52 return json.loads(p.read_text(encoding="utf-8")) 53 except Exception: 54 return None 55 56 57 def _write_active(data: Dict[str, Any]) -> None: 58 p = _active_file() 59 p.parent.mkdir(parents=True, exist_ok=True) 60 tmp = p.with_suffix(".json.tmp") 61 tmp.write_text(json.dumps(data, indent=2), encoding="utf-8") 62 tmp.replace(p) 63 64 65 def _clear_active() -> None: 66 try: 67 _active_file().unlink() 68 except FileNotFoundError: 69 pass 70 71 72 def _pid_alive(pid: int) -> bool: 73 try: 74 os.kill(pid, 0) 75 except ProcessLookupError: 76 return False 77 except PermissionError: 78 # Process exists but we can't signal it — treat as alive. 79 return True 80 return True 81 82 83 # --------------------------------------------------------------------------- 84 # Public API — used by tool handlers + CLI 85 # --------------------------------------------------------------------------- 86 87 def start( 88 url: str, 89 *, 90 out_dir: Optional[Path] = None, 91 headed: bool = False, 92 auth_state: Optional[str] = None, 93 guest_name: str = "Hermes Agent", 94 duration: Optional[str] = None, 95 session_id: Optional[str] = None, 96 mode: str = "transcribe", 97 realtime_model: Optional[str] = None, 98 realtime_voice: Optional[str] = None, 99 realtime_instructions: Optional[str] = None, 100 realtime_api_key: Optional[str] = None, 101 ) -> Dict[str, Any]: 102 """Spawn the meet_bot subprocess for *url*. 103 104 If a bot is already running for this hermes install, leave it first — 105 we enforce single-active-meeting semantics. 106 107 Returns a dict summarizing the started bot. 108 """ 109 from plugins.google_meet.meet_bot import _is_safe_meet_url, _meeting_id_from_url 110 111 if not _is_safe_meet_url(url): 112 return { 113 "ok": False, 114 "error": ( 115 "refusing: only https://meet.google.com/ URLs are allowed. " 116 "got: " + repr(url) 117 ), 118 } 119 120 existing = _read_active() 121 if existing and _pid_alive(int(existing.get("pid", 0))): 122 stop(reason="replaced by new meet_join") 123 124 meeting_id = _meeting_id_from_url(url) 125 out = out_dir or (_root() / meeting_id) 126 out.mkdir(parents=True, exist_ok=True) 127 128 # Wipe any stale transcript/status files from a previous run of this 129 # meeting id so polling isn't confused. 130 for name in ("transcript.txt", "status.json"): 131 f = out / name 132 if f.exists(): 133 try: 134 f.unlink() 135 except OSError: 136 pass 137 138 env = os.environ.copy() 139 env["HERMES_MEET_URL"] = url 140 env["HERMES_MEET_OUT_DIR"] = str(out) 141 env["HERMES_MEET_GUEST_NAME"] = guest_name 142 if headed: 143 env["HERMES_MEET_HEADED"] = "1" 144 if auth_state: 145 env["HERMES_MEET_AUTH_STATE"] = auth_state 146 if duration: 147 env["HERMES_MEET_DURATION"] = duration 148 # v2: realtime mode + passthroughs. The bot defaults to transcribe 149 # mode if HERMES_MEET_MODE isn't set, matching v1 behavior. 150 if mode: 151 env["HERMES_MEET_MODE"] = mode 152 if realtime_model: 153 env["HERMES_MEET_REALTIME_MODEL"] = realtime_model 154 if realtime_voice: 155 env["HERMES_MEET_REALTIME_VOICE"] = realtime_voice 156 if realtime_instructions: 157 env["HERMES_MEET_REALTIME_INSTRUCTIONS"] = realtime_instructions 158 if realtime_api_key: 159 env["HERMES_MEET_REALTIME_KEY"] = realtime_api_key 160 161 log_path = out / "bot.log" 162 # Detach: stdin=devnull, stdout/stderr → log file, new session so parent 163 # signals don't propagate. 164 log_fh = open(log_path, "ab", buffering=0) 165 try: 166 proc = subprocess.Popen( 167 [sys.executable, "-m", "plugins.google_meet.meet_bot"], 168 stdin=subprocess.DEVNULL, 169 stdout=log_fh, 170 stderr=subprocess.STDOUT, 171 env=env, 172 start_new_session=True, 173 close_fds=True, 174 ) 175 finally: 176 # The subprocess now owns the log fd; we can close ours. 177 log_fh.close() 178 179 record = { 180 "pid": proc.pid, 181 "meeting_id": meeting_id, 182 "out_dir": str(out), 183 "url": url, 184 "started_at": time.time(), 185 "session_id": session_id, 186 "log_path": str(log_path), 187 "mode": mode, 188 } 189 _write_active(record) 190 return {"ok": True, **record} 191 192 193 def status() -> Dict[str, Any]: 194 """Return the current meeting state, or ``{"ok": False, "reason": ...}``.""" 195 active = _read_active() 196 if not active: 197 return {"ok": False, "reason": "no active meeting"} 198 199 pid = int(active.get("pid", 0)) 200 alive = _pid_alive(pid) if pid else False 201 202 status_path = Path(active.get("out_dir", "")) / "status.json" 203 bot_status: Dict[str, Any] = {} 204 if status_path.is_file(): 205 try: 206 bot_status = json.loads(status_path.read_text(encoding="utf-8")) 207 except Exception: 208 pass 209 210 return { 211 "ok": True, 212 "alive": alive, 213 "pid": pid, 214 "meetingId": active.get("meeting_id"), 215 "url": active.get("url"), 216 "startedAt": active.get("started_at"), 217 "outDir": active.get("out_dir"), 218 **bot_status, 219 } 220 221 222 def transcript(last: Optional[int] = None) -> Dict[str, Any]: 223 """Read the current transcript file. Returns ok=False if none exists.""" 224 active = _read_active() 225 if not active: 226 return {"ok": False, "reason": "no active meeting"} 227 228 tp = Path(active.get("out_dir", "")) / "transcript.txt" 229 if not tp.is_file(): 230 return { 231 "ok": True, 232 "meetingId": active.get("meeting_id"), 233 "lines": [], 234 "total": 0, 235 "path": str(tp), 236 } 237 text = tp.read_text(encoding="utf-8", errors="replace") 238 all_lines = [ln for ln in text.splitlines() if ln.strip()] 239 lines = all_lines[-last:] if last else all_lines 240 return { 241 "ok": True, 242 "meetingId": active.get("meeting_id"), 243 "lines": lines, 244 "total": len(all_lines), 245 "path": str(tp), 246 } 247 248 249 def enqueue_say(text: str) -> Dict[str, Any]: 250 """Append a ``say`` request to the active bot's JSONL queue. 251 252 Returns ``{"ok": False, "reason": ...}`` when no meeting is active or 253 the active bot is in transcribe-only mode. Otherwise writes a line to 254 ``<out_dir>/say_queue.jsonl`` that the bot's realtime speaker thread 255 will consume. 256 """ 257 import uuid 258 259 text = (text or "").strip() 260 if not text: 261 return {"ok": False, "reason": "text is required"} 262 263 active = _read_active() 264 if not active: 265 return {"ok": False, "reason": "no active meeting"} 266 if active.get("mode") != "realtime": 267 return { 268 "ok": False, 269 "reason": ( 270 "active meeting is in transcribe mode — pass mode='realtime' " 271 "to meet_join to enable agent speech" 272 ), 273 } 274 275 out_dir = Path(active.get("out_dir", "")) 276 if not out_dir.is_dir(): 277 return {"ok": False, "reason": f"out_dir missing: {out_dir}"} 278 279 queue_path = out_dir / "say_queue.jsonl" 280 entry = {"id": uuid.uuid4().hex[:12], "text": text} 281 with queue_path.open("a", encoding="utf-8") as f: 282 f.write(json.dumps(entry) + "\n") 283 return { 284 "ok": True, 285 "meetingId": active.get("meeting_id"), 286 "enqueued_id": entry["id"], 287 "queue_path": str(queue_path), 288 } 289 290 291 def stop(*, reason: str = "requested") -> Dict[str, Any]: 292 """Signal the active bot to leave cleanly, then clear the active pointer. 293 294 Sends SIGTERM and waits up to 10s for the bot to exit. Falls back to 295 SIGKILL if the bot doesn't respond. 296 """ 297 active = _read_active() 298 if not active: 299 return {"ok": False, "reason": "no active meeting"} 300 301 pid = int(active.get("pid", 0)) 302 out_dir = active.get("out_dir") 303 transcript_path = Path(out_dir) / "transcript.txt" if out_dir else None 304 305 if pid and _pid_alive(pid): 306 try: 307 os.kill(pid, signal.SIGTERM) 308 except ProcessLookupError: 309 pass 310 for _ in range(20): 311 if not _pid_alive(pid): 312 break 313 time.sleep(0.5) 314 if _pid_alive(pid): 315 try: 316 os.kill(pid, signal.SIGKILL) 317 except ProcessLookupError: 318 pass 319 320 _clear_active() 321 return { 322 "ok": True, 323 "reason": reason, 324 "meetingId": active.get("meeting_id"), 325 "transcriptPath": str(transcript_path) if transcript_path else None, 326 }