browser_supervisor.py
1 """Persistent CDP supervisor for browser dialog + frame detection. 2 3 One ``CDPSupervisor`` runs per Hermes ``task_id`` that has a reachable CDP 4 endpoint. It holds a single persistent WebSocket to the backend, subscribes 5 to ``Page`` / ``Runtime`` / ``Target`` events on every attached session 6 (top-level page and every OOPIF / worker target that auto-attaches), and 7 surfaces observable state — pending dialogs and frame tree — through a 8 thread-safe snapshot object that tool handlers consume synchronously. 9 10 The supervisor is NOT in the agent's tool schema. Its output reaches the 11 agent via two channels: 12 13 1. ``browser_snapshot`` merges supervisor state into its return payload 14 (see ``tools/browser_tool.py``). 15 2. ``browser_dialog`` tool responds to a pending dialog by calling 16 ``respond_to_dialog()`` on the active supervisor. 17 18 Design spec: ``website/docs/developer-guide/browser-supervisor.md``. 19 """ 20 21 from __future__ import annotations 22 23 import asyncio 24 import json 25 import logging 26 import threading 27 import time 28 from dataclasses import dataclass 29 from typing import Any, Dict, List, Optional, Tuple 30 31 import websockets 32 from websockets.asyncio.client import ClientConnection 33 34 logger = logging.getLogger(__name__) 35 36 37 # ── Config defaults ─────────────────────────────────────────────────────────── 38 39 DIALOG_POLICY_MUST_RESPOND = "must_respond" 40 DIALOG_POLICY_AUTO_DISMISS = "auto_dismiss" 41 DIALOG_POLICY_AUTO_ACCEPT = "auto_accept" 42 43 _VALID_POLICIES = frozenset( 44 {DIALOG_POLICY_MUST_RESPOND, DIALOG_POLICY_AUTO_DISMISS, DIALOG_POLICY_AUTO_ACCEPT} 45 ) 46 47 DEFAULT_DIALOG_POLICY = DIALOG_POLICY_MUST_RESPOND 48 DEFAULT_DIALOG_TIMEOUT_S = 300.0 49 50 # Snapshot caps for frame_tree — keep payloads bounded on ad-heavy pages. 51 FRAME_TREE_MAX_ENTRIES = 30 52 FRAME_TREE_MAX_OOPIF_DEPTH = 2 53 54 # Ring buffer of recent console-level events (used later by PR 2 diagnostics). 55 CONSOLE_HISTORY_MAX = 50 56 57 # Keep the last N closed dialogs in ``recent_dialogs`` so agents on backends 58 # that auto-dismiss server-side (e.g. Browserbase) can still observe that a 59 # dialog fired, even if they couldn't respond to it in time. 60 RECENT_DIALOGS_MAX = 20 61 62 # Magic host the injected dialog bridge XHRs to. Intercepted via the CDP 63 # Fetch domain before any network resolution happens, so the hostname never 64 # has to exist. Keep this ASCII + URL-safe; we also gate Fetch patterns on it. 65 DIALOG_BRIDGE_HOST = "hermes-dialog-bridge.invalid" 66 DIALOG_BRIDGE_URL_PATTERN = f"http://{DIALOG_BRIDGE_HOST}/*" 67 68 # Script injected into every frame via Page.addScriptToEvaluateOnNewDocument. 69 # Overrides alert/confirm/prompt to round-trip through a sync XHR that we 70 # intercept via Fetch.requestPaused. Works on Browserbase (whose CDP proxy 71 # auto-dismisses REAL native dialogs) because the native dialogs never fire 72 # in the first place — the overrides take precedence. 73 _DIALOG_BRIDGE_SCRIPT = r""" 74 (() => { 75 if (window.__hermesDialogBridgeInstalled) return; 76 window.__hermesDialogBridgeInstalled = true; 77 const ENDPOINT = "http://hermes-dialog-bridge.invalid/"; 78 function ask(kind, message, defaultPrompt) { 79 try { 80 const xhr = new XMLHttpRequest(); 81 // Use GET with query params so we don't need to worry about request 82 // body encoding in the Fetch interceptor. 83 const params = new URLSearchParams({ 84 kind: String(kind || ""), 85 message: String(message == null ? "" : message), 86 default_prompt: String(defaultPrompt == null ? "" : defaultPrompt), 87 }); 88 xhr.open("GET", ENDPOINT + "?" + params.toString(), false); // sync 89 xhr.send(null); 90 if (xhr.status !== 200) return null; 91 const body = xhr.responseText || ""; 92 let parsed; 93 try { parsed = JSON.parse(body); } catch (e) { return null; } 94 if (kind === "alert") return undefined; 95 if (kind === "confirm") return Boolean(parsed && parsed.accept); 96 if (kind === "prompt") { 97 if (!parsed || !parsed.accept) return null; 98 return parsed.prompt_text == null ? "" : String(parsed.prompt_text); 99 } 100 return null; 101 } catch (e) { 102 // If the bridge is unreachable, fall back to the native call so the 103 // page still sees *some* behavior (the backend will auto-dismiss). 104 return null; 105 } 106 } 107 const realAlert = window.alert; 108 const realConfirm = window.confirm; 109 const realPrompt = window.prompt; 110 window.alert = function(message) { ask("alert", message, ""); }; 111 window.confirm = function(message) { 112 const r = ask("confirm", message, ""); 113 return r === null ? false : Boolean(r); 114 }; 115 window.prompt = function(message, def) { 116 const r = ask("prompt", message, def == null ? "" : def); 117 return r === null ? null : String(r); 118 }; 119 // onbeforeunload — we can't really synchronously prompt the user from this 120 // event without racing navigation. Leave native behavior for now; the 121 // supervisor's native-dialog fallback path still surfaces them in 122 // recent_dialogs. 123 })(); 124 """ 125 126 127 # ── Data model ──────────────────────────────────────────────────────────────── 128 129 130 @dataclass 131 class PendingDialog: 132 """A JS dialog currently open on some frame's session.""" 133 134 id: str 135 type: str # "alert" | "confirm" | "prompt" | "beforeunload" 136 message: str 137 default_prompt: str 138 opened_at: float 139 cdp_session_id: str # which attached CDP session the dialog fired in 140 frame_id: Optional[str] = None 141 # When set, the dialog was captured via the bridge XHR path (Fetch domain). 142 # Response must be delivered via Fetch.fulfillRequest, NOT 143 # Page.handleJavaScriptDialog — the native dialog never fired. 144 bridge_request_id: Optional[str] = None 145 146 def to_dict(self) -> Dict[str, Any]: 147 return { 148 "id": self.id, 149 "type": self.type, 150 "message": self.message, 151 "default_prompt": self.default_prompt, 152 "opened_at": self.opened_at, 153 "frame_id": self.frame_id, 154 } 155 156 157 @dataclass 158 class DialogRecord: 159 """A historical record of a dialog that was opened and then handled. 160 161 Retained in ``recent_dialogs`` for a short window so agents on backends 162 that auto-dismiss dialogs server-side (Browserbase) can still observe 163 that a dialog fired, even though they couldn't respond to it. 164 """ 165 166 id: str 167 type: str 168 message: str 169 opened_at: float 170 closed_at: float 171 closed_by: str # "agent" | "auto_policy" | "remote" | "watchdog" 172 frame_id: Optional[str] = None 173 174 def to_dict(self) -> Dict[str, Any]: 175 return { 176 "id": self.id, 177 "type": self.type, 178 "message": self.message, 179 "opened_at": self.opened_at, 180 "closed_at": self.closed_at, 181 "closed_by": self.closed_by, 182 "frame_id": self.frame_id, 183 } 184 185 186 @dataclass 187 class FrameInfo: 188 """One frame in the page's frame tree. 189 190 ``is_oopif`` means the frame has its own CDP target (separate process, 191 reachable via ``cdp_session_id``). Same-origin / srcdoc iframes share 192 the parent process and have ``is_oopif=False`` + ``cdp_session_id=None``. 193 """ 194 195 frame_id: str 196 url: str 197 origin: str 198 parent_frame_id: Optional[str] 199 is_oopif: bool 200 cdp_session_id: Optional[str] = None 201 name: str = "" 202 203 def to_dict(self) -> Dict[str, Any]: 204 d = { 205 "frame_id": self.frame_id, 206 "url": self.url, 207 "origin": self.origin, 208 "is_oopif": self.is_oopif, 209 } 210 if self.cdp_session_id: 211 d["session_id"] = self.cdp_session_id 212 if self.parent_frame_id: 213 d["parent_frame_id"] = self.parent_frame_id 214 if self.name: 215 d["name"] = self.name 216 return d 217 218 219 @dataclass 220 class ConsoleEvent: 221 """Ring buffer entry for console + exception traffic.""" 222 223 ts: float 224 level: str # "log" | "error" | "warning" | "exception" 225 text: str 226 url: Optional[str] = None 227 228 229 @dataclass(frozen=True) 230 class SupervisorSnapshot: 231 """Read-only snapshot of supervisor state. 232 233 Frozen dataclass so tool handlers can freely dereference without 234 worrying about mutation under their feet. 235 """ 236 237 pending_dialogs: Tuple[PendingDialog, ...] 238 recent_dialogs: Tuple[DialogRecord, ...] 239 frame_tree: Dict[str, Any] 240 console_errors: Tuple[ConsoleEvent, ...] 241 active: bool # False if supervisor is detached/stopped 242 cdp_url: str 243 task_id: str 244 245 def to_dict(self) -> Dict[str, Any]: 246 """Serialize for inclusion in ``browser_snapshot`` output.""" 247 out: Dict[str, Any] = { 248 "pending_dialogs": [d.to_dict() for d in self.pending_dialogs], 249 "frame_tree": self.frame_tree, 250 } 251 if self.recent_dialogs: 252 out["recent_dialogs"] = [d.to_dict() for d in self.recent_dialogs] 253 return out 254 255 256 # ── Supervisor core ─────────────────────────────────────────────────────────── 257 258 259 class CDPSupervisor: 260 """One supervisor per (task_id, cdp_url) pair. 261 262 Lifecycle: 263 * ``start()`` — kicked off by ``SupervisorRegistry.get_or_start``; spawns 264 a daemon thread running its own asyncio loop, connects the WebSocket, 265 attaches to the first page target, enables domains, starts 266 auto-attaching to child targets. 267 * ``snapshot()`` — sync, thread-safe, called from tool handlers. 268 * ``respond_to_dialog(action, ...)`` — sync bridge; schedules a coroutine 269 on the supervisor's loop and waits (with timeout) for the CDP ack. 270 * ``stop()`` — cancels task, closes WebSocket, joins thread. 271 272 All CDP I/O lives on the supervisor's own loop. External callers never 273 touch the loop directly; they go through the sync API above. 274 """ 275 276 def __init__( 277 self, 278 task_id: str, 279 cdp_url: str, 280 *, 281 dialog_policy: str = DEFAULT_DIALOG_POLICY, 282 dialog_timeout_s: float = DEFAULT_DIALOG_TIMEOUT_S, 283 ) -> None: 284 if dialog_policy not in _VALID_POLICIES: 285 raise ValueError( 286 f"Invalid dialog_policy {dialog_policy!r}; " 287 f"must be one of {sorted(_VALID_POLICIES)}" 288 ) 289 self.task_id = task_id 290 self.cdp_url = cdp_url 291 self.dialog_policy = dialog_policy 292 self.dialog_timeout_s = float(dialog_timeout_s) 293 294 # State protected by ``_state_lock`` for cross-thread reads. 295 self._state_lock = threading.Lock() 296 self._pending_dialogs: Dict[str, PendingDialog] = {} 297 self._recent_dialogs: List[DialogRecord] = [] 298 self._frames: Dict[str, FrameInfo] = {} 299 self._console_events: List[ConsoleEvent] = [] 300 self._active = False 301 302 # Supervisor loop machinery — populated in start(). 303 self._loop: Optional[asyncio.AbstractEventLoop] = None 304 self._thread: Optional[threading.Thread] = None 305 self._ready_event = threading.Event() 306 self._start_error: Optional[BaseException] = None 307 self._stop_requested = False 308 309 # CDP call tracking (runs on supervisor loop only). 310 self._next_call_id = 1 311 self._pending_calls: Dict[int, asyncio.Future] = {} 312 self._ws: Optional[ClientConnection] = None 313 self._page_session_id: Optional[str] = None 314 self._child_sessions: Dict[str, Dict[str, Any]] = {} # session_id -> info 315 316 # Dialog auto-dismiss watchdog handles (per dialog id). 317 self._dialog_watchdogs: Dict[str, asyncio.TimerHandle] = {} 318 # Monotonic id generator for dialogs (human-readable in snapshots). 319 self._dialog_seq = 0 320 321 # ── Public sync API ────────────────────────────────────────────────────── 322 323 def start(self, timeout: float = 15.0) -> None: 324 """Launch the background loop and wait until attachment is complete. 325 326 Raises whatever exception attach failed with (connect error, bad 327 WebSocket URL, CDP domain enable failure, etc.). On success, the 328 supervisor is fully wired up — pending-dialog events will be captured 329 as of the moment ``start()`` returns. 330 """ 331 if self._thread and self._thread.is_alive(): 332 return 333 self._ready_event.clear() 334 self._start_error = None 335 self._stop_requested = False 336 self._thread = threading.Thread( 337 target=self._thread_main, 338 name=f"cdp-supervisor-{self.task_id}", 339 daemon=True, 340 ) 341 self._thread.start() 342 if not self._ready_event.wait(timeout=timeout): 343 self.stop() 344 raise TimeoutError( 345 f"CDP supervisor did not attach within {timeout}s " 346 f"(cdp_url={self.cdp_url[:80]}...)" 347 ) 348 if self._start_error is not None: 349 err = self._start_error 350 self.stop() 351 raise err 352 353 def stop(self, timeout: float = 5.0) -> None: 354 """Cancel the supervisor task and join the thread.""" 355 self._stop_requested = True 356 loop = self._loop 357 if loop is not None and loop.is_running(): 358 # Close the WebSocket from inside the loop — this makes ``async for 359 # raw in self._ws`` return cleanly, ``_run`` hits its ``finally``, 360 # pending tasks get cancelled in order, THEN the thread exits. 361 async def _close_ws(): 362 ws = self._ws 363 self._ws = None 364 if ws is not None: 365 try: 366 await ws.close() 367 except Exception: 368 pass 369 370 try: 371 fut = asyncio.run_coroutine_threadsafe(_close_ws(), loop) 372 try: 373 fut.result(timeout=2.0) 374 except Exception: 375 pass 376 except RuntimeError: 377 pass # loop already shutting down 378 if self._thread is not None: 379 self._thread.join(timeout=timeout) 380 with self._state_lock: 381 self._active = False 382 383 def snapshot(self) -> SupervisorSnapshot: 384 """Return an immutable snapshot of current state.""" 385 with self._state_lock: 386 dialogs = tuple(self._pending_dialogs.values()) 387 recent = tuple(self._recent_dialogs[-RECENT_DIALOGS_MAX:]) 388 frames_tree = self._build_frame_tree_locked() 389 console = tuple(self._console_events[-CONSOLE_HISTORY_MAX:]) 390 active = self._active 391 return SupervisorSnapshot( 392 pending_dialogs=dialogs, 393 recent_dialogs=recent, 394 frame_tree=frames_tree, 395 console_errors=console, 396 active=active, 397 cdp_url=self.cdp_url, 398 task_id=self.task_id, 399 ) 400 401 def respond_to_dialog( 402 self, 403 action: str, 404 *, 405 prompt_text: Optional[str] = None, 406 dialog_id: Optional[str] = None, 407 timeout: float = 10.0, 408 ) -> Dict[str, Any]: 409 """Accept/dismiss a pending dialog. Sync bridge onto the supervisor loop. 410 411 Returns ``{"ok": True, "dialog": {...}}`` on success, 412 ``{"ok": False, "error": "..."}`` on a recoverable error (no dialog, 413 ambiguous dialog_id, supervisor inactive). 414 """ 415 if action not in ("accept", "dismiss"): 416 return {"ok": False, "error": f"action must be 'accept' or 'dismiss', got {action!r}"} 417 418 with self._state_lock: 419 if not self._active: 420 return {"ok": False, "error": "supervisor is not active"} 421 pending = list(self._pending_dialogs.values()) 422 if not pending: 423 return {"ok": False, "error": "no dialog is currently open"} 424 if dialog_id: 425 dialog = self._pending_dialogs.get(dialog_id) 426 if dialog is None: 427 return { 428 "ok": False, 429 "error": f"dialog_id {dialog_id!r} not found " 430 f"(known: {sorted(self._pending_dialogs)})", 431 } 432 elif len(pending) > 1: 433 return { 434 "ok": False, 435 "error": ( 436 f"{len(pending)} pending dialogs; specify dialog_id. " 437 f"Candidates: {[d.id for d in pending]}" 438 ), 439 } 440 else: 441 dialog = pending[0] 442 snapshot_copy = dialog 443 444 loop = self._loop 445 if loop is None: 446 return {"ok": False, "error": "supervisor loop is not running"} 447 448 async def _do_respond(): 449 return await self._handle_dialog_cdp( 450 snapshot_copy, accept=(action == "accept"), prompt_text=prompt_text or "" 451 ) 452 453 try: 454 fut = asyncio.run_coroutine_threadsafe(_do_respond(), loop) 455 fut.result(timeout=timeout) 456 except Exception as e: 457 return {"ok": False, "error": f"{type(e).__name__}: {e}"} 458 return {"ok": True, "dialog": snapshot_copy.to_dict()} 459 460 # ── Supervisor loop internals ──────────────────────────────────────────── 461 462 def _thread_main(self) -> None: 463 """Entry point for the supervisor's dedicated thread.""" 464 loop = asyncio.new_event_loop() 465 self._loop = loop 466 try: 467 asyncio.set_event_loop(loop) 468 loop.run_until_complete(self._run()) 469 except BaseException as e: # noqa: BLE001 — propagate via _start_error 470 if not self._ready_event.is_set(): 471 self._start_error = e 472 self._ready_event.set() 473 else: 474 logger.warning("CDP supervisor %s crashed: %s", self.task_id, e) 475 finally: 476 # Flush any remaining tasks before closing the loop so we don't 477 # emit "Task was destroyed but it is pending" warnings. 478 try: 479 pending = [t for t in asyncio.all_tasks(loop) if not t.done()] 480 for t in pending: 481 t.cancel() 482 if pending: 483 loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) 484 except Exception: 485 pass 486 try: 487 loop.close() 488 except Exception: 489 pass 490 with self._state_lock: 491 self._active = False 492 493 async def _run(self) -> None: 494 """Top-level supervisor coroutine. 495 496 Holds a reconnecting loop so we survive the remote closing the 497 WebSocket — Browserbase in particular tears down the CDP socket 498 every time a short-lived client (e.g. agent-browser's per-command 499 CDP client) disconnects. We drop our state snapshot keys that 500 depend on specific CDP session ids, re-attach, and keep going. 501 """ 502 attempt = 0 503 last_success_at = 0.0 504 backoff = 0.5 505 while not self._stop_requested: 506 try: 507 self._ws = await asyncio.wait_for( 508 websockets.connect(self.cdp_url, max_size=50 * 1024 * 1024), 509 timeout=10.0, 510 ) 511 except Exception as e: 512 attempt += 1 513 if not self._ready_event.is_set(): 514 # Never connected once — fatal for start(). 515 self._start_error = e 516 self._ready_event.set() 517 return 518 logger.warning( 519 "CDP supervisor %s: connect failed (attempt %s): %s", 520 self.task_id, attempt, e, 521 ) 522 await asyncio.sleep(min(backoff, 10.0)) 523 backoff = min(backoff * 2, 10.0) 524 continue 525 526 reader_task = asyncio.create_task(self._read_loop(), name="cdp-reader") 527 try: 528 # Reset per-connection session state so stale ids don't hang 529 # around after a reconnect. 530 self._page_session_id = None 531 self._child_sessions.clear() 532 # We deliberately keep `_pending_dialogs` and `_frames` — 533 # they're reconciled as the supervisor resubscribes and 534 # receives fresh events. Worst case: an agent sees a stale 535 # dialog entry that the new session's handleJavaScriptDialog 536 # call rejects with "no dialog is showing" (logged, not 537 # surfaced). 538 await self._attach_initial_page() 539 with self._state_lock: 540 self._active = True 541 last_success_at = time.time() 542 backoff = 0.5 # reset after a successful attach 543 if not self._ready_event.is_set(): 544 self._ready_event.set() 545 # Run until the reader returns. 546 await reader_task 547 except BaseException as e: 548 if not self._ready_event.is_set(): 549 # Never got to ready — propagate to start(). 550 self._start_error = e 551 self._ready_event.set() 552 raise 553 logger.warning( 554 "CDP supervisor %s: session dropped after %.1fs: %s", 555 self.task_id, 556 time.time() - last_success_at, 557 e, 558 ) 559 finally: 560 with self._state_lock: 561 self._active = False 562 if not reader_task.done(): 563 reader_task.cancel() 564 try: 565 await reader_task 566 except (asyncio.CancelledError, Exception): 567 pass 568 for handle in list(self._dialog_watchdogs.values()): 569 handle.cancel() 570 self._dialog_watchdogs.clear() 571 ws = self._ws 572 self._ws = None 573 if ws is not None: 574 try: 575 await ws.close() 576 except Exception: 577 pass 578 579 if self._stop_requested: 580 return 581 582 # Reconnect: brief backoff, then reattach. 583 logger.debug( 584 "CDP supervisor %s: reconnecting in %.1fs...", self.task_id, backoff, 585 ) 586 await asyncio.sleep(backoff) 587 backoff = min(backoff * 2, 10.0) 588 589 async def _attach_initial_page(self) -> None: 590 """Find a page target, attach flattened session, enable domains, install dialog bridge.""" 591 resp = await self._cdp("Target.getTargets") 592 targets = resp.get("result", {}).get("targetInfos", []) 593 page_target = next((t for t in targets if t.get("type") == "page"), None) 594 if page_target is None: 595 created = await self._cdp("Target.createTarget", {"url": "about:blank"}) 596 target_id = created["result"]["targetId"] 597 else: 598 target_id = page_target["targetId"] 599 600 attach = await self._cdp( 601 "Target.attachToTarget", 602 {"targetId": target_id, "flatten": True}, 603 ) 604 self._page_session_id = attach["result"]["sessionId"] 605 await self._cdp("Page.enable", session_id=self._page_session_id) 606 await self._cdp("Runtime.enable", session_id=self._page_session_id) 607 await self._cdp( 608 "Target.setAutoAttach", 609 {"autoAttach": True, "waitForDebuggerOnStart": False, "flatten": True}, 610 session_id=self._page_session_id, 611 ) 612 # Install the dialog bridge — overrides native alert/confirm/prompt with 613 # a synchronous XHR we intercept via Fetch domain. This is how we make 614 # dialog response work on Browserbase (whose CDP proxy auto-dismisses 615 # real native dialogs before we can call handleJavaScriptDialog). 616 await self._install_dialog_bridge(self._page_session_id) 617 618 async def _install_dialog_bridge(self, session_id: str) -> None: 619 """Install the dialog-bridge init script + Fetch interceptor on a session. 620 621 Two CDP calls: 622 1. ``Page.addScriptToEvaluateOnNewDocument`` — the JS override runs 623 in every frame before any page script. Replaces alert/confirm/ 624 prompt with a sync XHR to our bridge URL. 625 2. ``Fetch.enable`` scoped to the bridge URL — we catch those XHRs, 626 surface them as pending dialogs, then fulfill once the agent 627 responds. 628 629 Idempotent at the CDP level: Chromium de-duplicates identical 630 add-script calls by source, and Fetch.enable replaces prior patterns. 631 """ 632 try: 633 await self._cdp( 634 "Page.addScriptToEvaluateOnNewDocument", 635 {"source": _DIALOG_BRIDGE_SCRIPT, "runImmediately": True}, 636 session_id=session_id, 637 timeout=5.0, 638 ) 639 except Exception as e: 640 logger.debug( 641 "dialog bridge: addScriptToEvaluateOnNewDocument failed on sid=%s: %s", 642 (session_id or "")[:16], e, 643 ) 644 try: 645 await self._cdp( 646 "Fetch.enable", 647 { 648 "patterns": [ 649 { 650 "urlPattern": DIALOG_BRIDGE_URL_PATTERN, 651 "requestStage": "Request", 652 } 653 ], 654 "handleAuthRequests": False, 655 }, 656 session_id=session_id, 657 timeout=5.0, 658 ) 659 except Exception as e: 660 logger.debug( 661 "dialog bridge: Fetch.enable failed on sid=%s: %s", 662 (session_id or "")[:16], e, 663 ) 664 # Also try to inject into the already-loaded document so existing 665 # pages pick up the override on reconnect. Best-effort. 666 try: 667 await self._cdp( 668 "Runtime.evaluate", 669 {"expression": _DIALOG_BRIDGE_SCRIPT, "returnByValue": True}, 670 session_id=session_id, 671 timeout=3.0, 672 ) 673 except Exception: 674 pass 675 676 async def _cdp( 677 self, 678 method: str, 679 params: Optional[Dict[str, Any]] = None, 680 *, 681 session_id: Optional[str] = None, 682 timeout: float = 10.0, 683 ) -> Dict[str, Any]: 684 """Send a CDP command and await its response.""" 685 if self._ws is None: 686 raise RuntimeError("supervisor WebSocket is not connected") 687 call_id = self._next_call_id 688 self._next_call_id += 1 689 payload: Dict[str, Any] = {"id": call_id, "method": method} 690 if params: 691 payload["params"] = params 692 if session_id: 693 payload["sessionId"] = session_id 694 fut: asyncio.Future = asyncio.get_running_loop().create_future() 695 self._pending_calls[call_id] = fut 696 await self._ws.send(json.dumps(payload)) 697 try: 698 return await asyncio.wait_for(fut, timeout=timeout) 699 finally: 700 self._pending_calls.pop(call_id, None) 701 702 async def _read_loop(self) -> None: 703 """Continuously dispatch incoming CDP frames.""" 704 assert self._ws is not None 705 try: 706 async for raw in self._ws: 707 if self._stop_requested: 708 break 709 try: 710 msg = json.loads(raw) 711 except Exception: 712 logger.debug("CDP supervisor: non-JSON frame dropped") 713 continue 714 if "id" in msg: 715 fut = self._pending_calls.pop(msg["id"], None) 716 if fut is not None and not fut.done(): 717 if "error" in msg: 718 fut.set_exception( 719 RuntimeError(f"CDP error on id={msg['id']}: {msg['error']}") 720 ) 721 else: 722 fut.set_result(msg) 723 elif "method" in msg: 724 await self._on_event(msg["method"], msg.get("params", {}), msg.get("sessionId")) 725 except Exception as e: 726 logger.debug("CDP read loop exited: %s", e) 727 728 # ── Event dispatch ────────────────────────────────────────────────────── 729 730 async def _on_event( 731 self, method: str, params: Dict[str, Any], session_id: Optional[str] 732 ) -> None: 733 if method == "Page.javascriptDialogOpening": 734 await self._on_dialog_opening(params, session_id) 735 elif method == "Page.javascriptDialogClosed": 736 await self._on_dialog_closed(params, session_id) 737 elif method == "Fetch.requestPaused": 738 await self._on_fetch_paused(params, session_id) 739 elif method == "Page.frameAttached": 740 self._on_frame_attached(params, session_id) 741 elif method == "Page.frameNavigated": 742 self._on_frame_navigated(params, session_id) 743 elif method == "Page.frameDetached": 744 self._on_frame_detached(params, session_id) 745 elif method == "Target.attachedToTarget": 746 await self._on_target_attached(params) 747 elif method == "Target.detachedFromTarget": 748 self._on_target_detached(params) 749 elif method == "Runtime.consoleAPICalled": 750 self._on_console(params, level_from="api") 751 elif method == "Runtime.exceptionThrown": 752 self._on_console(params, level_from="exception") 753 754 async def _on_dialog_opening( 755 self, params: Dict[str, Any], session_id: Optional[str] 756 ) -> None: 757 self._dialog_seq += 1 758 dialog = PendingDialog( 759 id=f"d-{self._dialog_seq}", 760 type=str(params.get("type") or ""), 761 message=str(params.get("message") or ""), 762 default_prompt=str(params.get("defaultPrompt") or ""), 763 opened_at=time.time(), 764 cdp_session_id=session_id or self._page_session_id or "", 765 frame_id=params.get("frameId"), 766 ) 767 768 if self.dialog_policy == DIALOG_POLICY_AUTO_DISMISS: 769 # Archive immediately with the policy tag so the ``closed`` event 770 # arriving right after our handleJavaScriptDialog call doesn't 771 # re-archive it as "remote". 772 with self._state_lock: 773 self._archive_dialog_locked(dialog, "auto_policy") 774 asyncio.create_task( 775 self._auto_handle_dialog(dialog, accept=False, prompt_text="") 776 ) 777 elif self.dialog_policy == DIALOG_POLICY_AUTO_ACCEPT: 778 with self._state_lock: 779 self._archive_dialog_locked(dialog, "auto_policy") 780 asyncio.create_task( 781 self._auto_handle_dialog( 782 dialog, accept=True, prompt_text=dialog.default_prompt 783 ) 784 ) 785 else: 786 # must_respond → add to pending and arm watchdog. 787 with self._state_lock: 788 self._pending_dialogs[dialog.id] = dialog 789 loop = asyncio.get_running_loop() 790 handle = loop.call_later( 791 self.dialog_timeout_s, 792 lambda: asyncio.create_task(self._dialog_timeout_expired(dialog.id)), 793 ) 794 self._dialog_watchdogs[dialog.id] = handle 795 796 async def _auto_handle_dialog( 797 self, dialog: PendingDialog, *, accept: bool, prompt_text: str 798 ) -> None: 799 """Send handleJavaScriptDialog for auto_dismiss/auto_accept. 800 801 Dialog has already been archived by the caller (``_on_dialog_opening``); 802 this just fires the CDP call so the page unblocks. 803 """ 804 params: Dict[str, Any] = {"accept": accept} 805 if dialog.type == "prompt": 806 params["promptText"] = prompt_text 807 try: 808 await self._cdp( 809 "Page.handleJavaScriptDialog", 810 params, 811 session_id=dialog.cdp_session_id or None, 812 timeout=5.0, 813 ) 814 except Exception as e: 815 logger.debug("auto-handle CDP call failed for %s: %s", dialog.id, e) 816 817 async def _dialog_timeout_expired(self, dialog_id: str) -> None: 818 with self._state_lock: 819 dialog = self._pending_dialogs.get(dialog_id) 820 if dialog is None: 821 return 822 logger.warning( 823 "CDP supervisor %s: dialog %s (%s) auto-dismissed after %ss timeout", 824 self.task_id, 825 dialog_id, 826 dialog.type, 827 self.dialog_timeout_s, 828 ) 829 try: 830 # Archive with watchdog tag BEFORE fulfilling / dismissing. 831 with self._state_lock: 832 if dialog_id in self._pending_dialogs: 833 self._pending_dialogs.pop(dialog_id, None) 834 self._archive_dialog_locked(dialog, "watchdog") 835 # Unblock the page — via bridge Fetch fulfill for bridge dialogs, 836 # else native Page.handleJavaScriptDialog for real dialogs. 837 if dialog.bridge_request_id: 838 await self._fulfill_bridge_request(dialog, accept=False, prompt_text="") 839 else: 840 await self._cdp( 841 "Page.handleJavaScriptDialog", 842 {"accept": False}, 843 session_id=dialog.cdp_session_id or None, 844 timeout=5.0, 845 ) 846 except Exception as e: 847 logger.debug("auto-dismiss failed for %s: %s", dialog_id, e) 848 849 def _archive_dialog_locked(self, dialog: PendingDialog, closed_by: str) -> None: 850 """Move a pending dialog to the recent_dialogs ring buffer. Must hold state_lock.""" 851 record = DialogRecord( 852 id=dialog.id, 853 type=dialog.type, 854 message=dialog.message, 855 opened_at=dialog.opened_at, 856 closed_at=time.time(), 857 closed_by=closed_by, 858 frame_id=dialog.frame_id, 859 ) 860 self._recent_dialogs.append(record) 861 if len(self._recent_dialogs) > RECENT_DIALOGS_MAX * 2: 862 self._recent_dialogs = self._recent_dialogs[-RECENT_DIALOGS_MAX:] 863 864 async def _handle_dialog_cdp( 865 self, dialog: PendingDialog, *, accept: bool, prompt_text: str 866 ) -> None: 867 """Send the Page.handleJavaScriptDialog CDP command (agent path only). 868 869 Routes to the bridge-fulfill path when the dialog was captured via 870 the injected XHR override (see ``_on_fetch_paused``). 871 """ 872 if dialog.bridge_request_id: 873 try: 874 await self._fulfill_bridge_request( 875 dialog, accept=accept, prompt_text=prompt_text 876 ) 877 finally: 878 with self._state_lock: 879 if dialog.id in self._pending_dialogs: 880 self._pending_dialogs.pop(dialog.id, None) 881 self._archive_dialog_locked(dialog, "agent") 882 handle = self._dialog_watchdogs.pop(dialog.id, None) 883 if handle is not None: 884 handle.cancel() 885 return 886 887 params: Dict[str, Any] = {"accept": accept} 888 if dialog.type == "prompt": 889 params["promptText"] = prompt_text 890 try: 891 await self._cdp( 892 "Page.handleJavaScriptDialog", 893 params, 894 session_id=dialog.cdp_session_id or None, 895 timeout=5.0, 896 ) 897 finally: 898 # Clear regardless — the CDP error path usually means the dialog 899 # already closed (browser auto-dismissed after navigation, etc.). 900 with self._state_lock: 901 if dialog.id in self._pending_dialogs: 902 self._pending_dialogs.pop(dialog.id, None) 903 self._archive_dialog_locked(dialog, "agent") 904 handle = self._dialog_watchdogs.pop(dialog.id, None) 905 if handle is not None: 906 handle.cancel() 907 908 async def _on_dialog_closed( 909 self, params: Dict[str, Any], session_id: Optional[str] 910 ) -> None: 911 # ``Page.javascriptDialogClosed`` spec has only ``result`` (bool) and 912 # ``userInput`` (string), not the original ``message``. Match by 913 # session id and clear the oldest dialog on that session — if Chrome 914 # closed one on us (e.g. our disconnect auto-dismissed it, or the 915 # browser navigated, or Browserbase's CDP proxy auto-dismissed), there 916 # shouldn't be more than one in flight per session anyway because the 917 # JS thread is blocked while a dialog is up. 918 with self._state_lock: 919 candidate_ids = [ 920 d.id 921 for d in self._pending_dialogs.values() 922 if d.cdp_session_id == session_id 923 # Bridge-captured dialogs aren't cleared by native close events; 924 # they're resolved via Fetch.fulfillRequest instead. Only the 925 # real-native-dialog path uses Page.javascriptDialogClosed. 926 and d.bridge_request_id is None 927 ] 928 if candidate_ids: 929 did = candidate_ids[0] 930 dialog = self._pending_dialogs.pop(did, None) 931 if dialog is not None: 932 self._archive_dialog_locked(dialog, "remote") 933 handle = self._dialog_watchdogs.pop(did, None) 934 if handle is not None: 935 handle.cancel() 936 937 async def _on_fetch_paused( 938 self, params: Dict[str, Any], session_id: Optional[str] 939 ) -> None: 940 """Bridge XHR captured mid-flight — materialize as a pending dialog. 941 942 The injected script (``_DIALOG_BRIDGE_SCRIPT``) fires a synchronous 943 XHR to ``DIALOG_BRIDGE_HOST`` whenever page code calls alert/confirm/ 944 prompt. We catch it via Fetch.enable pattern; the page's JS thread 945 is blocked on the XHR's response until we call Fetch.fulfillRequest 946 (which happens from ``respond_to_dialog``) or until the watchdog 947 fires (at which point we fulfill with a cancel response). 948 """ 949 url = str(params.get("request", {}).get("url") or "") 950 request_id = params.get("requestId") 951 if not request_id: 952 return 953 # Only care about our bridge URLs. Fetch can still deliver other 954 # intercepted requests if patterns were ever broadened. 955 if DIALOG_BRIDGE_HOST not in url: 956 # Not ours — forward unchanged so the page sees its own request. 957 try: 958 await self._cdp( 959 "Fetch.continueRequest", {"requestId": request_id}, 960 session_id=session_id, timeout=3.0, 961 ) 962 except Exception: 963 pass 964 return 965 966 # Parse query string for dialog metadata. Use urllib to be robust. 967 from urllib.parse import urlparse, parse_qs 968 q = parse_qs(urlparse(url).query) 969 970 def _q(name: str) -> str: 971 v = q.get(name, [""]) 972 return v[0] if v else "" 973 974 kind = _q("kind") or "alert" 975 message = _q("message") 976 default_prompt = _q("default_prompt") 977 978 self._dialog_seq += 1 979 dialog = PendingDialog( 980 id=f"d-{self._dialog_seq}", 981 type=kind, 982 message=message, 983 default_prompt=default_prompt, 984 opened_at=time.time(), 985 cdp_session_id=session_id or self._page_session_id or "", 986 frame_id=params.get("frameId"), 987 bridge_request_id=str(request_id), 988 ) 989 990 # Apply policy exactly as for native dialogs. 991 if self.dialog_policy == DIALOG_POLICY_AUTO_DISMISS: 992 with self._state_lock: 993 self._archive_dialog_locked(dialog, "auto_policy") 994 asyncio.create_task( 995 self._fulfill_bridge_request(dialog, accept=False, prompt_text="") 996 ) 997 elif self.dialog_policy == DIALOG_POLICY_AUTO_ACCEPT: 998 with self._state_lock: 999 self._archive_dialog_locked(dialog, "auto_policy") 1000 asyncio.create_task( 1001 self._fulfill_bridge_request( 1002 dialog, accept=True, prompt_text=default_prompt 1003 ) 1004 ) 1005 else: 1006 # must_respond — add to pending + arm watchdog. 1007 with self._state_lock: 1008 self._pending_dialogs[dialog.id] = dialog 1009 loop = asyncio.get_running_loop() 1010 handle = loop.call_later( 1011 self.dialog_timeout_s, 1012 lambda: asyncio.create_task(self._dialog_timeout_expired(dialog.id)), 1013 ) 1014 self._dialog_watchdogs[dialog.id] = handle 1015 1016 async def _fulfill_bridge_request( 1017 self, dialog: PendingDialog, *, accept: bool, prompt_text: str 1018 ) -> None: 1019 """Resolve a bridge XHR via Fetch.fulfillRequest so the page unblocks.""" 1020 if not dialog.bridge_request_id: 1021 return 1022 payload = { 1023 "accept": bool(accept), 1024 "prompt_text": prompt_text if dialog.type == "prompt" else "", 1025 "dialog_id": dialog.id, 1026 } 1027 body = json.dumps(payload).encode() 1028 try: 1029 import base64 as _b64 1030 await self._cdp( 1031 "Fetch.fulfillRequest", 1032 { 1033 "requestId": dialog.bridge_request_id, 1034 "responseCode": 200, 1035 "responseHeaders": [ 1036 {"name": "Content-Type", "value": "application/json"}, 1037 {"name": "Access-Control-Allow-Origin", "value": "*"}, 1038 ], 1039 "body": _b64.b64encode(body).decode(), 1040 }, 1041 session_id=dialog.cdp_session_id or None, 1042 timeout=5.0, 1043 ) 1044 except Exception as e: 1045 logger.debug("bridge fulfill failed for %s: %s", dialog.id, e) 1046 1047 # ── Frame / target tracking ───────────────────────────────────────────── 1048 1049 def _on_frame_attached( 1050 self, params: Dict[str, Any], session_id: Optional[str] 1051 ) -> None: 1052 frame_id = params.get("frameId") 1053 if not frame_id: 1054 return 1055 with self._state_lock: 1056 self._frames[frame_id] = FrameInfo( 1057 frame_id=frame_id, 1058 url="", 1059 origin="", 1060 parent_frame_id=params.get("parentFrameId"), 1061 is_oopif=False, 1062 cdp_session_id=session_id, 1063 ) 1064 1065 def _on_frame_navigated( 1066 self, params: Dict[str, Any], session_id: Optional[str] 1067 ) -> None: 1068 frame = params.get("frame") or {} 1069 frame_id = frame.get("id") 1070 if not frame_id: 1071 return 1072 with self._state_lock: 1073 existing = self._frames.get(frame_id) 1074 info = FrameInfo( 1075 frame_id=frame_id, 1076 url=str(frame.get("url") or ""), 1077 origin=str(frame.get("securityOrigin") or frame.get("origin") or ""), 1078 parent_frame_id=frame.get("parentId") or (existing.parent_frame_id if existing else None), 1079 is_oopif=bool(existing.is_oopif if existing else False), 1080 cdp_session_id=existing.cdp_session_id if existing else session_id, 1081 name=str(frame.get("name") or (existing.name if existing else "")), 1082 ) 1083 self._frames[frame_id] = info 1084 1085 def _on_frame_detached( 1086 self, params: Dict[str, Any], session_id: Optional[str] 1087 ) -> None: 1088 """Remove a frame from our state only when it's truly gone. 1089 1090 CDP emits ``Page.frameDetached`` with a ``reason`` of either 1091 ``"remove"`` (the frame is actually gone from the DOM) or ``"swap"`` 1092 (the frame is migrating to a new process — typical when a 1093 same-process iframe becomes an OOPIF, or when history navigates). 1094 Dropping on ``swap`` would hide OOPIFs from the agent the moment 1095 Chromium promotes them to their own process, so treat swap as a 1096 no-op. 1097 1098 Even with ``reason=remove``, the parent page's perspective is 1099 "the child frame left MY process tree" — which is what happens 1100 when a same-origin iframe gets promoted to an OOPIF. If we 1101 already have a live child CDP session attached for that frame_id, 1102 the frame is still very much alive; only drop it when we have 1103 no session record. 1104 """ 1105 frame_id = params.get("frameId") 1106 if not frame_id: 1107 return 1108 reason = str(params.get("reason") or "remove").lower() 1109 if reason == "swap": 1110 return 1111 with self._state_lock: 1112 existing = self._frames.get(frame_id) 1113 # Keep OOPIF records even when the parent says the frame was 1114 # "removed" — the iframe is still visible, just in a different 1115 # process. If the frame truly goes away later, Target.detached 1116 # + the next Page.frameDetached without a live session will 1117 # clear it. 1118 if existing and existing.is_oopif and existing.cdp_session_id: 1119 return 1120 self._frames.pop(frame_id, None) 1121 1122 async def _on_target_attached(self, params: Dict[str, Any]) -> None: 1123 info = params.get("targetInfo") or {} 1124 sid = params.get("sessionId") 1125 target_type = info.get("type") 1126 if not sid or target_type not in ("iframe", "worker"): 1127 return 1128 self._child_sessions[sid] = {"info": info, "type": target_type} 1129 1130 # Record the frame with its OOPIF session id for interaction routing. 1131 if target_type == "iframe": 1132 target_id = info.get("targetId") 1133 with self._state_lock: 1134 existing = self._frames.get(target_id) 1135 self._frames[target_id] = FrameInfo( 1136 frame_id=target_id, 1137 url=str(info.get("url") or ""), 1138 origin="", # filled by frameNavigated on the child session 1139 parent_frame_id=(existing.parent_frame_id if existing else None), 1140 is_oopif=True, 1141 cdp_session_id=sid, 1142 name=str(info.get("title") or (existing.name if existing else "")), 1143 ) 1144 1145 # Enable domains on the child off-loop so the reader keeps pumping. 1146 # Awaiting the CDP replies here would deadlock because only the 1147 # reader can resolve those replies' Futures. 1148 asyncio.create_task(self._enable_child_domains(sid)) 1149 1150 async def _enable_child_domains(self, sid: str) -> None: 1151 """Enable Page+Runtime (+nested setAutoAttach) on a child CDP session. 1152 1153 Also installs the dialog bridge so iframe-scoped alert/confirm/prompt 1154 calls round-trip through Fetch too. 1155 """ 1156 try: 1157 await self._cdp("Page.enable", session_id=sid, timeout=3.0) 1158 await self._cdp("Runtime.enable", session_id=sid, timeout=3.0) 1159 await self._cdp( 1160 "Target.setAutoAttach", 1161 {"autoAttach": True, "waitForDebuggerOnStart": False, "flatten": True}, 1162 session_id=sid, 1163 timeout=3.0, 1164 ) 1165 except Exception as e: 1166 logger.debug("child session %s setup failed: %s", sid[:16], e) 1167 # Install the dialog bridge on the child so iframe dialogs are captured. 1168 await self._install_dialog_bridge(sid) 1169 1170 def _on_target_detached(self, params: Dict[str, Any]) -> None: 1171 """Handle a child CDP session detaching. 1172 1173 We deliberately DO NOT drop frames from ``_frames`` here — Browserbase 1174 fires transient detach events during page transitions even while the 1175 iframe is still visible to the user, and dropping the record hides 1176 OOPIFs from the agent between the detach and the next 1177 ``Target.attachedToTarget``. Instead, we just clear the session 1178 binding so stale ``cdp_session_id`` values aren't used for routing. 1179 If the iframe truly goes away, ``Page.frameDetached`` will clean up. 1180 """ 1181 sid = params.get("sessionId") 1182 if not sid: 1183 return 1184 self._child_sessions.pop(sid, None) 1185 with self._state_lock: 1186 for fid, frame in list(self._frames.items()): 1187 if frame.cdp_session_id == sid: 1188 # Replace with a copy that has cdp_session_id cleared so 1189 # routing falls back to top-level page session if retried. 1190 self._frames[fid] = FrameInfo( 1191 frame_id=frame.frame_id, 1192 url=frame.url, 1193 origin=frame.origin, 1194 parent_frame_id=frame.parent_frame_id, 1195 is_oopif=frame.is_oopif, 1196 cdp_session_id=None, 1197 name=frame.name, 1198 ) 1199 1200 # ── Console / exception ring buffer ───────────────────────────────────── 1201 1202 def _on_console(self, params: Dict[str, Any], *, level_from: str) -> None: 1203 if level_from == "exception": 1204 details = params.get("exceptionDetails") or {} 1205 text = str(details.get("text") or "") 1206 url = details.get("url") 1207 event = ConsoleEvent(ts=time.time(), level="exception", text=text, url=url) 1208 else: 1209 raw_level = str(params.get("type") or "log") 1210 level = "error" if raw_level in ("error", "assert") else ( 1211 "warning" if raw_level == "warning" else "log" 1212 ) 1213 args = params.get("args") or [] 1214 parts: List[str] = [] 1215 for a in args[:4]: 1216 if isinstance(a, dict): 1217 parts.append(str(a.get("value") or a.get("description") or "")) 1218 event = ConsoleEvent(ts=time.time(), level=level, text=" ".join(parts)) 1219 with self._state_lock: 1220 self._console_events.append(event) 1221 if len(self._console_events) > CONSOLE_HISTORY_MAX * 2: 1222 # Keep last CONSOLE_HISTORY_MAX; allow 2x slack to reduce churn. 1223 self._console_events = self._console_events[-CONSOLE_HISTORY_MAX:] 1224 1225 # ── Frame tree building (bounded) ─────────────────────────────────────── 1226 1227 def _build_frame_tree_locked(self) -> Dict[str, Any]: 1228 """Build the capped frame_tree payload. Must be called under state lock.""" 1229 frames = self._frames 1230 if not frames: 1231 return {"top": None, "children": [], "truncated": False} 1232 1233 # Identify a top frame — one with no parent, preferring oopif=False. 1234 tops = [f for f in frames.values() if not f.parent_frame_id] 1235 top = next((f for f in tops if not f.is_oopif), tops[0] if tops else None) 1236 1237 # BFS from top, capped by FRAME_TREE_MAX_ENTRIES and 1238 # FRAME_TREE_MAX_OOPIF_DEPTH for OOPIF branches. 1239 children: List[Dict[str, Any]] = [] 1240 truncated = False 1241 if top is None: 1242 return {"top": None, "children": [], "truncated": False} 1243 1244 queue: List[Tuple[FrameInfo, int]] = [ 1245 (f, 1) for f in frames.values() if f.parent_frame_id == top.frame_id 1246 ] 1247 visited: set[str] = {top.frame_id} 1248 while queue and len(children) < FRAME_TREE_MAX_ENTRIES: 1249 frame, depth = queue.pop(0) 1250 if frame.frame_id in visited: 1251 continue 1252 visited.add(frame.frame_id) 1253 if frame.is_oopif and depth > FRAME_TREE_MAX_OOPIF_DEPTH: 1254 truncated = True 1255 continue 1256 children.append(frame.to_dict()) 1257 for f in frames.values(): 1258 if f.parent_frame_id == frame.frame_id and f.frame_id not in visited: 1259 queue.append((f, depth + 1)) 1260 if queue: 1261 truncated = True 1262 1263 return { 1264 "top": top.to_dict(), 1265 "children": children, 1266 "truncated": truncated, 1267 } 1268 1269 1270 # ── Registry ───────────────────────────────────────────────────────────────── 1271 1272 1273 class _SupervisorRegistry: 1274 """Process-global (task_id → supervisor) map with idempotent start/stop. 1275 1276 One instance, exposed as ``SUPERVISOR_REGISTRY``. Safe to call from any 1277 thread — mutations go through ``_lock``. 1278 """ 1279 1280 def __init__(self) -> None: 1281 self._lock = threading.Lock() 1282 self._by_task: Dict[str, CDPSupervisor] = {} 1283 1284 def get(self, task_id: str) -> Optional[CDPSupervisor]: 1285 """Return the supervisor for ``task_id`` if running, else ``None``.""" 1286 with self._lock: 1287 return self._by_task.get(task_id) 1288 1289 def get_or_start( 1290 self, 1291 task_id: str, 1292 cdp_url: str, 1293 *, 1294 dialog_policy: str = DEFAULT_DIALOG_POLICY, 1295 dialog_timeout_s: float = DEFAULT_DIALOG_TIMEOUT_S, 1296 start_timeout: float = 15.0, 1297 ) -> CDPSupervisor: 1298 """Idempotently ensure a supervisor is running for ``(task_id, cdp_url)``. 1299 1300 If a supervisor exists for this task but was bound to a different 1301 ``cdp_url``, the old one is stopped and a fresh one is started. 1302 """ 1303 with self._lock: 1304 existing = self._by_task.get(task_id) 1305 if existing is not None: 1306 if existing.cdp_url == cdp_url: 1307 thread_ok = existing._thread is not None and existing._thread.is_alive() 1308 loop_ok = existing._loop is not None and existing._loop.is_running() 1309 if thread_ok and loop_ok: 1310 return existing 1311 # Unhealthy — tear down and recreate. 1312 # URL changed or unhealthy — tear down, fall through to re-create. 1313 self._by_task.pop(task_id, None) 1314 if existing is not None: 1315 existing.stop() 1316 1317 supervisor = CDPSupervisor( 1318 task_id=task_id, 1319 cdp_url=cdp_url, 1320 dialog_policy=dialog_policy, 1321 dialog_timeout_s=dialog_timeout_s, 1322 ) 1323 supervisor.start(timeout=start_timeout) 1324 with self._lock: 1325 # Guard against a concurrent get_or_start from another thread. 1326 already = self._by_task.get(task_id) 1327 if already is not None and already.cdp_url == cdp_url: 1328 supervisor.stop() 1329 return already 1330 self._by_task[task_id] = supervisor 1331 return supervisor 1332 1333 def stop(self, task_id: str) -> None: 1334 """Stop and discard the supervisor for ``task_id`` if it exists.""" 1335 with self._lock: 1336 supervisor = self._by_task.pop(task_id, None) 1337 if supervisor is not None: 1338 supervisor.stop() 1339 1340 def stop_all(self) -> None: 1341 """Stop every running supervisor. For shutdown / test teardown.""" 1342 with self._lock: 1343 items = list(self._by_task.items()) 1344 self._by_task.clear() 1345 for _, supervisor in items: 1346 supervisor.stop() 1347 1348 1349 SUPERVISOR_REGISTRY = _SupervisorRegistry() 1350 1351 1352 __all__ = [ 1353 "CDPSupervisor", 1354 "ConsoleEvent", 1355 "DEFAULT_DIALOG_POLICY", 1356 "DEFAULT_DIALOG_TIMEOUT_S", 1357 "DIALOG_POLICY_AUTO_ACCEPT", 1358 "DIALOG_POLICY_AUTO_DISMISS", 1359 "DIALOG_POLICY_MUST_RESPOND", 1360 "DialogRecord", 1361 "FrameInfo", 1362 "PendingDialog", 1363 "SUPERVISOR_REGISTRY", 1364 "SupervisorSnapshot", 1365 "_SupervisorRegistry", 1366 ]