/ tools / browser_supervisor.py
browser_supervisor.py
   1  """Persistent CDP supervisor for browser dialog + frame detection.
   2  
   3  One ``CDPSupervisor`` runs per Hermes ``task_id`` that has a reachable CDP
   4  endpoint. It holds a single persistent WebSocket to the backend, subscribes
   5  to ``Page`` / ``Runtime`` / ``Target`` events on every attached session
   6  (top-level page and every OOPIF / worker target that auto-attaches), and
   7  surfaces observable state — pending dialogs and frame tree — through a
   8  thread-safe snapshot object that tool handlers consume synchronously.
   9  
  10  The supervisor is NOT in the agent's tool schema. Its output reaches the
  11  agent via two channels:
  12  
  13  1. ``browser_snapshot`` merges supervisor state into its return payload
  14     (see ``tools/browser_tool.py``).
  15  2. ``browser_dialog`` tool responds to a pending dialog by calling
  16     ``respond_to_dialog()`` on the active supervisor.
  17  
  18  Design spec: ``website/docs/developer-guide/browser-supervisor.md``.
  19  """
  20  
  21  from __future__ import annotations
  22  
  23  import asyncio
  24  import json
  25  import logging
  26  import threading
  27  import time
  28  from dataclasses import dataclass
  29  from typing import Any, Dict, List, Optional, Tuple
  30  
  31  import websockets
  32  from websockets.asyncio.client import ClientConnection
  33  
  34  logger = logging.getLogger(__name__)
  35  
  36  
  37  # ── Config defaults ───────────────────────────────────────────────────────────
  38  
  39  DIALOG_POLICY_MUST_RESPOND = "must_respond"
  40  DIALOG_POLICY_AUTO_DISMISS = "auto_dismiss"
  41  DIALOG_POLICY_AUTO_ACCEPT = "auto_accept"
  42  
  43  _VALID_POLICIES = frozenset(
  44      {DIALOG_POLICY_MUST_RESPOND, DIALOG_POLICY_AUTO_DISMISS, DIALOG_POLICY_AUTO_ACCEPT}
  45  )
  46  
  47  DEFAULT_DIALOG_POLICY = DIALOG_POLICY_MUST_RESPOND
  48  DEFAULT_DIALOG_TIMEOUT_S = 300.0
  49  
  50  # Snapshot caps for frame_tree — keep payloads bounded on ad-heavy pages.
  51  FRAME_TREE_MAX_ENTRIES = 30
  52  FRAME_TREE_MAX_OOPIF_DEPTH = 2
  53  
  54  # Ring buffer of recent console-level events (used later by PR 2 diagnostics).
  55  CONSOLE_HISTORY_MAX = 50
  56  
  57  # Keep the last N closed dialogs in ``recent_dialogs`` so agents on backends
  58  # that auto-dismiss server-side (e.g. Browserbase) can still observe that a
  59  # dialog fired, even if they couldn't respond to it in time.
  60  RECENT_DIALOGS_MAX = 20
  61  
  62  # Magic host the injected dialog bridge XHRs to.  Intercepted via the CDP
  63  # Fetch domain before any network resolution happens, so the hostname never
  64  # has to exist.  Keep this ASCII + URL-safe; we also gate Fetch patterns on it.
  65  DIALOG_BRIDGE_HOST = "hermes-dialog-bridge.invalid"
  66  DIALOG_BRIDGE_URL_PATTERN = f"http://{DIALOG_BRIDGE_HOST}/*"
  67  
  68  # Script injected into every frame via Page.addScriptToEvaluateOnNewDocument.
  69  # Overrides alert/confirm/prompt to round-trip through a sync XHR that we
  70  # intercept via Fetch.requestPaused. Works on Browserbase (whose CDP proxy
  71  # auto-dismisses REAL native dialogs) because the native dialogs never fire
  72  # in the first place — the overrides take precedence.
  73  _DIALOG_BRIDGE_SCRIPT = r"""
  74  (() => {
  75    if (window.__hermesDialogBridgeInstalled) return;
  76    window.__hermesDialogBridgeInstalled = true;
  77    const ENDPOINT = "http://hermes-dialog-bridge.invalid/";
  78    function ask(kind, message, defaultPrompt) {
  79      try {
  80        const xhr = new XMLHttpRequest();
  81        // Use GET with query params so we don't need to worry about request
  82        // body encoding in the Fetch interceptor.
  83        const params = new URLSearchParams({
  84          kind: String(kind || ""),
  85          message: String(message == null ? "" : message),
  86          default_prompt: String(defaultPrompt == null ? "" : defaultPrompt),
  87        });
  88        xhr.open("GET", ENDPOINT + "?" + params.toString(), false);  // sync
  89        xhr.send(null);
  90        if (xhr.status !== 200) return null;
  91        const body = xhr.responseText || "";
  92        let parsed;
  93        try { parsed = JSON.parse(body); } catch (e) { return null; }
  94        if (kind === "alert") return undefined;
  95        if (kind === "confirm") return Boolean(parsed && parsed.accept);
  96        if (kind === "prompt") {
  97          if (!parsed || !parsed.accept) return null;
  98          return parsed.prompt_text == null ? "" : String(parsed.prompt_text);
  99        }
 100        return null;
 101      } catch (e) {
 102        // If the bridge is unreachable, fall back to the native call so the
 103        // page still sees *some* behavior (the backend will auto-dismiss).
 104        return null;
 105      }
 106    }
 107    const realAlert   = window.alert;
 108    const realConfirm = window.confirm;
 109    const realPrompt  = window.prompt;
 110    window.alert   = function(message) { ask("alert",   message, ""); };
 111    window.confirm = function(message) {
 112      const r = ask("confirm", message, "");
 113      return r === null ? false : Boolean(r);
 114    };
 115    window.prompt  = function(message, def) {
 116      const r = ask("prompt", message, def == null ? "" : def);
 117      return r === null ? null : String(r);
 118    };
 119    // onbeforeunload — we can't really synchronously prompt the user from this
 120    // event without racing navigation.  Leave native behavior for now; the
 121    // supervisor's native-dialog fallback path still surfaces them in
 122    // recent_dialogs.
 123  })();
 124  """
 125  
 126  
 127  # ── Data model ────────────────────────────────────────────────────────────────
 128  
 129  
 130  @dataclass
 131  class PendingDialog:
 132      """A JS dialog currently open on some frame's session."""
 133  
 134      id: str
 135      type: str  # "alert" | "confirm" | "prompt" | "beforeunload"
 136      message: str
 137      default_prompt: str
 138      opened_at: float
 139      cdp_session_id: str  # which attached CDP session the dialog fired in
 140      frame_id: Optional[str] = None
 141      # When set, the dialog was captured via the bridge XHR path (Fetch domain).
 142      # Response must be delivered via Fetch.fulfillRequest, NOT
 143      # Page.handleJavaScriptDialog — the native dialog never fired.
 144      bridge_request_id: Optional[str] = None
 145  
 146      def to_dict(self) -> Dict[str, Any]:
 147          return {
 148              "id": self.id,
 149              "type": self.type,
 150              "message": self.message,
 151              "default_prompt": self.default_prompt,
 152              "opened_at": self.opened_at,
 153              "frame_id": self.frame_id,
 154          }
 155  
 156  
 157  @dataclass
 158  class DialogRecord:
 159      """A historical record of a dialog that was opened and then handled.
 160  
 161      Retained in ``recent_dialogs`` for a short window so agents on backends
 162      that auto-dismiss dialogs server-side (Browserbase) can still observe
 163      that a dialog fired, even though they couldn't respond to it.
 164      """
 165  
 166      id: str
 167      type: str
 168      message: str
 169      opened_at: float
 170      closed_at: float
 171      closed_by: str  # "agent" | "auto_policy" | "remote" | "watchdog"
 172      frame_id: Optional[str] = None
 173  
 174      def to_dict(self) -> Dict[str, Any]:
 175          return {
 176              "id": self.id,
 177              "type": self.type,
 178              "message": self.message,
 179              "opened_at": self.opened_at,
 180              "closed_at": self.closed_at,
 181              "closed_by": self.closed_by,
 182              "frame_id": self.frame_id,
 183          }
 184  
 185  
 186  @dataclass
 187  class FrameInfo:
 188      """One frame in the page's frame tree.
 189  
 190      ``is_oopif`` means the frame has its own CDP target (separate process,
 191      reachable via ``cdp_session_id``). Same-origin / srcdoc iframes share
 192      the parent process and have ``is_oopif=False`` + ``cdp_session_id=None``.
 193      """
 194  
 195      frame_id: str
 196      url: str
 197      origin: str
 198      parent_frame_id: Optional[str]
 199      is_oopif: bool
 200      cdp_session_id: Optional[str] = None
 201      name: str = ""
 202  
 203      def to_dict(self) -> Dict[str, Any]:
 204          d = {
 205              "frame_id": self.frame_id,
 206              "url": self.url,
 207              "origin": self.origin,
 208              "is_oopif": self.is_oopif,
 209          }
 210          if self.cdp_session_id:
 211              d["session_id"] = self.cdp_session_id
 212          if self.parent_frame_id:
 213              d["parent_frame_id"] = self.parent_frame_id
 214          if self.name:
 215              d["name"] = self.name
 216          return d
 217  
 218  
 219  @dataclass
 220  class ConsoleEvent:
 221      """Ring buffer entry for console + exception traffic."""
 222  
 223      ts: float
 224      level: str  # "log" | "error" | "warning" | "exception"
 225      text: str
 226      url: Optional[str] = None
 227  
 228  
 229  @dataclass(frozen=True)
 230  class SupervisorSnapshot:
 231      """Read-only snapshot of supervisor state.
 232  
 233      Frozen dataclass so tool handlers can freely dereference without
 234      worrying about mutation under their feet.
 235      """
 236  
 237      pending_dialogs: Tuple[PendingDialog, ...]
 238      recent_dialogs: Tuple[DialogRecord, ...]
 239      frame_tree: Dict[str, Any]
 240      console_errors: Tuple[ConsoleEvent, ...]
 241      active: bool  # False if supervisor is detached/stopped
 242      cdp_url: str
 243      task_id: str
 244  
 245      def to_dict(self) -> Dict[str, Any]:
 246          """Serialize for inclusion in ``browser_snapshot`` output."""
 247          out: Dict[str, Any] = {
 248              "pending_dialogs": [d.to_dict() for d in self.pending_dialogs],
 249              "frame_tree": self.frame_tree,
 250          }
 251          if self.recent_dialogs:
 252              out["recent_dialogs"] = [d.to_dict() for d in self.recent_dialogs]
 253          return out
 254  
 255  
 256  # ── Supervisor core ───────────────────────────────────────────────────────────
 257  
 258  
 259  class CDPSupervisor:
 260      """One supervisor per (task_id, cdp_url) pair.
 261  
 262      Lifecycle:
 263        * ``start()`` — kicked off by ``SupervisorRegistry.get_or_start``; spawns
 264          a daemon thread running its own asyncio loop, connects the WebSocket,
 265          attaches to the first page target, enables domains, starts
 266          auto-attaching to child targets.
 267        * ``snapshot()`` — sync, thread-safe, called from tool handlers.
 268        * ``respond_to_dialog(action, ...)`` — sync bridge; schedules a coroutine
 269          on the supervisor's loop and waits (with timeout) for the CDP ack.
 270        * ``stop()`` — cancels task, closes WebSocket, joins thread.
 271  
 272      All CDP I/O lives on the supervisor's own loop. External callers never
 273      touch the loop directly; they go through the sync API above.
 274      """
 275  
 276      def __init__(
 277          self,
 278          task_id: str,
 279          cdp_url: str,
 280          *,
 281          dialog_policy: str = DEFAULT_DIALOG_POLICY,
 282          dialog_timeout_s: float = DEFAULT_DIALOG_TIMEOUT_S,
 283      ) -> None:
 284          if dialog_policy not in _VALID_POLICIES:
 285              raise ValueError(
 286                  f"Invalid dialog_policy {dialog_policy!r}; "
 287                  f"must be one of {sorted(_VALID_POLICIES)}"
 288              )
 289          self.task_id = task_id
 290          self.cdp_url = cdp_url
 291          self.dialog_policy = dialog_policy
 292          self.dialog_timeout_s = float(dialog_timeout_s)
 293  
 294          # State protected by ``_state_lock`` for cross-thread reads.
 295          self._state_lock = threading.Lock()
 296          self._pending_dialogs: Dict[str, PendingDialog] = {}
 297          self._recent_dialogs: List[DialogRecord] = []
 298          self._frames: Dict[str, FrameInfo] = {}
 299          self._console_events: List[ConsoleEvent] = []
 300          self._active = False
 301  
 302          # Supervisor loop machinery — populated in start().
 303          self._loop: Optional[asyncio.AbstractEventLoop] = None
 304          self._thread: Optional[threading.Thread] = None
 305          self._ready_event = threading.Event()
 306          self._start_error: Optional[BaseException] = None
 307          self._stop_requested = False
 308  
 309          # CDP call tracking (runs on supervisor loop only).
 310          self._next_call_id = 1
 311          self._pending_calls: Dict[int, asyncio.Future] = {}
 312          self._ws: Optional[ClientConnection] = None
 313          self._page_session_id: Optional[str] = None
 314          self._child_sessions: Dict[str, Dict[str, Any]] = {}  # session_id -> info
 315  
 316          # Dialog auto-dismiss watchdog handles (per dialog id).
 317          self._dialog_watchdogs: Dict[str, asyncio.TimerHandle] = {}
 318          # Monotonic id generator for dialogs (human-readable in snapshots).
 319          self._dialog_seq = 0
 320  
 321      # ── Public sync API ──────────────────────────────────────────────────────
 322  
 323      def start(self, timeout: float = 15.0) -> None:
 324          """Launch the background loop and wait until attachment is complete.
 325  
 326          Raises whatever exception attach failed with (connect error, bad
 327          WebSocket URL, CDP domain enable failure, etc.). On success, the
 328          supervisor is fully wired up — pending-dialog events will be captured
 329          as of the moment ``start()`` returns.
 330          """
 331          if self._thread and self._thread.is_alive():
 332              return
 333          self._ready_event.clear()
 334          self._start_error = None
 335          self._stop_requested = False
 336          self._thread = threading.Thread(
 337              target=self._thread_main,
 338              name=f"cdp-supervisor-{self.task_id}",
 339              daemon=True,
 340          )
 341          self._thread.start()
 342          if not self._ready_event.wait(timeout=timeout):
 343              self.stop()
 344              raise TimeoutError(
 345                  f"CDP supervisor did not attach within {timeout}s "
 346                  f"(cdp_url={self.cdp_url[:80]}...)"
 347              )
 348          if self._start_error is not None:
 349              err = self._start_error
 350              self.stop()
 351              raise err
 352  
 353      def stop(self, timeout: float = 5.0) -> None:
 354          """Cancel the supervisor task and join the thread."""
 355          self._stop_requested = True
 356          loop = self._loop
 357          if loop is not None and loop.is_running():
 358              # Close the WebSocket from inside the loop — this makes ``async for
 359              # raw in self._ws`` return cleanly, ``_run`` hits its ``finally``,
 360              # pending tasks get cancelled in order, THEN the thread exits.
 361              async def _close_ws():
 362                  ws = self._ws
 363                  self._ws = None
 364                  if ws is not None:
 365                      try:
 366                          await ws.close()
 367                      except Exception:
 368                          pass
 369  
 370              try:
 371                  fut = asyncio.run_coroutine_threadsafe(_close_ws(), loop)
 372                  try:
 373                      fut.result(timeout=2.0)
 374                  except Exception:
 375                      pass
 376              except RuntimeError:
 377                  pass  # loop already shutting down
 378          if self._thread is not None:
 379              self._thread.join(timeout=timeout)
 380          with self._state_lock:
 381              self._active = False
 382  
 383      def snapshot(self) -> SupervisorSnapshot:
 384          """Return an immutable snapshot of current state."""
 385          with self._state_lock:
 386              dialogs = tuple(self._pending_dialogs.values())
 387              recent = tuple(self._recent_dialogs[-RECENT_DIALOGS_MAX:])
 388              frames_tree = self._build_frame_tree_locked()
 389              console = tuple(self._console_events[-CONSOLE_HISTORY_MAX:])
 390              active = self._active
 391          return SupervisorSnapshot(
 392              pending_dialogs=dialogs,
 393              recent_dialogs=recent,
 394              frame_tree=frames_tree,
 395              console_errors=console,
 396              active=active,
 397              cdp_url=self.cdp_url,
 398              task_id=self.task_id,
 399          )
 400  
 401      def respond_to_dialog(
 402          self,
 403          action: str,
 404          *,
 405          prompt_text: Optional[str] = None,
 406          dialog_id: Optional[str] = None,
 407          timeout: float = 10.0,
 408      ) -> Dict[str, Any]:
 409          """Accept/dismiss a pending dialog. Sync bridge onto the supervisor loop.
 410  
 411          Returns ``{"ok": True, "dialog": {...}}`` on success,
 412          ``{"ok": False, "error": "..."}`` on a recoverable error (no dialog,
 413          ambiguous dialog_id, supervisor inactive).
 414          """
 415          if action not in ("accept", "dismiss"):
 416              return {"ok": False, "error": f"action must be 'accept' or 'dismiss', got {action!r}"}
 417  
 418          with self._state_lock:
 419              if not self._active:
 420                  return {"ok": False, "error": "supervisor is not active"}
 421              pending = list(self._pending_dialogs.values())
 422              if not pending:
 423                  return {"ok": False, "error": "no dialog is currently open"}
 424              if dialog_id:
 425                  dialog = self._pending_dialogs.get(dialog_id)
 426                  if dialog is None:
 427                      return {
 428                          "ok": False,
 429                          "error": f"dialog_id {dialog_id!r} not found "
 430                          f"(known: {sorted(self._pending_dialogs)})",
 431                      }
 432              elif len(pending) > 1:
 433                  return {
 434                      "ok": False,
 435                      "error": (
 436                          f"{len(pending)} pending dialogs; specify dialog_id. "
 437                          f"Candidates: {[d.id for d in pending]}"
 438                      ),
 439                  }
 440              else:
 441                  dialog = pending[0]
 442              snapshot_copy = dialog
 443  
 444          loop = self._loop
 445          if loop is None:
 446              return {"ok": False, "error": "supervisor loop is not running"}
 447  
 448          async def _do_respond():
 449              return await self._handle_dialog_cdp(
 450                  snapshot_copy, accept=(action == "accept"), prompt_text=prompt_text or ""
 451              )
 452  
 453          try:
 454              fut = asyncio.run_coroutine_threadsafe(_do_respond(), loop)
 455              fut.result(timeout=timeout)
 456          except Exception as e:
 457              return {"ok": False, "error": f"{type(e).__name__}: {e}"}
 458          return {"ok": True, "dialog": snapshot_copy.to_dict()}
 459  
 460      # ── Supervisor loop internals ────────────────────────────────────────────
 461  
 462      def _thread_main(self) -> None:
 463          """Entry point for the supervisor's dedicated thread."""
 464          loop = asyncio.new_event_loop()
 465          self._loop = loop
 466          try:
 467              asyncio.set_event_loop(loop)
 468              loop.run_until_complete(self._run())
 469          except BaseException as e:  # noqa: BLE001 — propagate via _start_error
 470              if not self._ready_event.is_set():
 471                  self._start_error = e
 472                  self._ready_event.set()
 473              else:
 474                  logger.warning("CDP supervisor %s crashed: %s", self.task_id, e)
 475          finally:
 476              # Flush any remaining tasks before closing the loop so we don't
 477              # emit "Task was destroyed but it is pending" warnings.
 478              try:
 479                  pending = [t for t in asyncio.all_tasks(loop) if not t.done()]
 480                  for t in pending:
 481                      t.cancel()
 482                  if pending:
 483                      loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
 484              except Exception:
 485                  pass
 486              try:
 487                  loop.close()
 488              except Exception:
 489                  pass
 490              with self._state_lock:
 491                  self._active = False
 492  
 493      async def _run(self) -> None:
 494          """Top-level supervisor coroutine.
 495  
 496          Holds a reconnecting loop so we survive the remote closing the
 497          WebSocket — Browserbase in particular tears down the CDP socket
 498          every time a short-lived client (e.g. agent-browser's per-command
 499          CDP client) disconnects.  We drop our state snapshot keys that
 500          depend on specific CDP session ids, re-attach, and keep going.
 501          """
 502          attempt = 0
 503          last_success_at = 0.0
 504          backoff = 0.5
 505          while not self._stop_requested:
 506              try:
 507                  self._ws = await asyncio.wait_for(
 508                      websockets.connect(self.cdp_url, max_size=50 * 1024 * 1024),
 509                      timeout=10.0,
 510                  )
 511              except Exception as e:
 512                  attempt += 1
 513                  if not self._ready_event.is_set():
 514                      # Never connected once — fatal for start().
 515                      self._start_error = e
 516                      self._ready_event.set()
 517                      return
 518                  logger.warning(
 519                      "CDP supervisor %s: connect failed (attempt %s): %s",
 520                      self.task_id, attempt, e,
 521                  )
 522                  await asyncio.sleep(min(backoff, 10.0))
 523                  backoff = min(backoff * 2, 10.0)
 524                  continue
 525  
 526              reader_task = asyncio.create_task(self._read_loop(), name="cdp-reader")
 527              try:
 528                  # Reset per-connection session state so stale ids don't hang
 529                  # around after a reconnect.
 530                  self._page_session_id = None
 531                  self._child_sessions.clear()
 532                  # We deliberately keep `_pending_dialogs` and `_frames` —
 533                  # they're reconciled as the supervisor resubscribes and
 534                  # receives fresh events.  Worst case: an agent sees a stale
 535                  # dialog entry that the new session's handleJavaScriptDialog
 536                  # call rejects with "no dialog is showing" (logged, not
 537                  # surfaced).
 538                  await self._attach_initial_page()
 539                  with self._state_lock:
 540                      self._active = True
 541                  last_success_at = time.time()
 542                  backoff = 0.5  # reset after a successful attach
 543                  if not self._ready_event.is_set():
 544                      self._ready_event.set()
 545                  # Run until the reader returns.
 546                  await reader_task
 547              except BaseException as e:
 548                  if not self._ready_event.is_set():
 549                      # Never got to ready — propagate to start().
 550                      self._start_error = e
 551                      self._ready_event.set()
 552                      raise
 553                  logger.warning(
 554                      "CDP supervisor %s: session dropped after %.1fs: %s",
 555                      self.task_id,
 556                      time.time() - last_success_at,
 557                      e,
 558                  )
 559              finally:
 560                  with self._state_lock:
 561                      self._active = False
 562                  if not reader_task.done():
 563                      reader_task.cancel()
 564                      try:
 565                          await reader_task
 566                      except (asyncio.CancelledError, Exception):
 567                          pass
 568                  for handle in list(self._dialog_watchdogs.values()):
 569                      handle.cancel()
 570                  self._dialog_watchdogs.clear()
 571                  ws = self._ws
 572                  self._ws = None
 573                  if ws is not None:
 574                      try:
 575                          await ws.close()
 576                      except Exception:
 577                          pass
 578  
 579              if self._stop_requested:
 580                  return
 581  
 582              # Reconnect: brief backoff, then reattach.
 583              logger.debug(
 584                  "CDP supervisor %s: reconnecting in %.1fs...", self.task_id, backoff,
 585              )
 586              await asyncio.sleep(backoff)
 587              backoff = min(backoff * 2, 10.0)
 588  
 589      async def _attach_initial_page(self) -> None:
 590          """Find a page target, attach flattened session, enable domains, install dialog bridge."""
 591          resp = await self._cdp("Target.getTargets")
 592          targets = resp.get("result", {}).get("targetInfos", [])
 593          page_target = next((t for t in targets if t.get("type") == "page"), None)
 594          if page_target is None:
 595              created = await self._cdp("Target.createTarget", {"url": "about:blank"})
 596              target_id = created["result"]["targetId"]
 597          else:
 598              target_id = page_target["targetId"]
 599  
 600          attach = await self._cdp(
 601              "Target.attachToTarget",
 602              {"targetId": target_id, "flatten": True},
 603          )
 604          self._page_session_id = attach["result"]["sessionId"]
 605          await self._cdp("Page.enable", session_id=self._page_session_id)
 606          await self._cdp("Runtime.enable", session_id=self._page_session_id)
 607          await self._cdp(
 608              "Target.setAutoAttach",
 609              {"autoAttach": True, "waitForDebuggerOnStart": False, "flatten": True},
 610              session_id=self._page_session_id,
 611          )
 612          # Install the dialog bridge — overrides native alert/confirm/prompt with
 613          # a synchronous XHR we intercept via Fetch domain. This is how we make
 614          # dialog response work on Browserbase (whose CDP proxy auto-dismisses
 615          # real native dialogs before we can call handleJavaScriptDialog).
 616          await self._install_dialog_bridge(self._page_session_id)
 617  
 618      async def _install_dialog_bridge(self, session_id: str) -> None:
 619          """Install the dialog-bridge init script + Fetch interceptor on a session.
 620  
 621          Two CDP calls:
 622            1. ``Page.addScriptToEvaluateOnNewDocument`` — the JS override runs
 623               in every frame before any page script. Replaces alert/confirm/
 624               prompt with a sync XHR to our bridge URL.
 625            2. ``Fetch.enable`` scoped to the bridge URL — we catch those XHRs,
 626               surface them as pending dialogs, then fulfill once the agent
 627               responds.
 628  
 629          Idempotent at the CDP level: Chromium de-duplicates identical
 630          add-script calls by source, and Fetch.enable replaces prior patterns.
 631          """
 632          try:
 633              await self._cdp(
 634                  "Page.addScriptToEvaluateOnNewDocument",
 635                  {"source": _DIALOG_BRIDGE_SCRIPT, "runImmediately": True},
 636                  session_id=session_id,
 637                  timeout=5.0,
 638              )
 639          except Exception as e:
 640              logger.debug(
 641                  "dialog bridge: addScriptToEvaluateOnNewDocument failed on sid=%s: %s",
 642                  (session_id or "")[:16], e,
 643              )
 644          try:
 645              await self._cdp(
 646                  "Fetch.enable",
 647                  {
 648                      "patterns": [
 649                          {
 650                              "urlPattern": DIALOG_BRIDGE_URL_PATTERN,
 651                              "requestStage": "Request",
 652                          }
 653                      ],
 654                      "handleAuthRequests": False,
 655                  },
 656                  session_id=session_id,
 657                  timeout=5.0,
 658              )
 659          except Exception as e:
 660              logger.debug(
 661                  "dialog bridge: Fetch.enable failed on sid=%s: %s",
 662                  (session_id or "")[:16], e,
 663              )
 664          # Also try to inject into the already-loaded document so existing
 665          # pages pick up the override on reconnect. Best-effort.
 666          try:
 667              await self._cdp(
 668                  "Runtime.evaluate",
 669                  {"expression": _DIALOG_BRIDGE_SCRIPT, "returnByValue": True},
 670                  session_id=session_id,
 671                  timeout=3.0,
 672              )
 673          except Exception:
 674              pass
 675  
 676      async def _cdp(
 677          self,
 678          method: str,
 679          params: Optional[Dict[str, Any]] = None,
 680          *,
 681          session_id: Optional[str] = None,
 682          timeout: float = 10.0,
 683      ) -> Dict[str, Any]:
 684          """Send a CDP command and await its response."""
 685          if self._ws is None:
 686              raise RuntimeError("supervisor WebSocket is not connected")
 687          call_id = self._next_call_id
 688          self._next_call_id += 1
 689          payload: Dict[str, Any] = {"id": call_id, "method": method}
 690          if params:
 691              payload["params"] = params
 692          if session_id:
 693              payload["sessionId"] = session_id
 694          fut: asyncio.Future = asyncio.get_running_loop().create_future()
 695          self._pending_calls[call_id] = fut
 696          await self._ws.send(json.dumps(payload))
 697          try:
 698              return await asyncio.wait_for(fut, timeout=timeout)
 699          finally:
 700              self._pending_calls.pop(call_id, None)
 701  
 702      async def _read_loop(self) -> None:
 703          """Continuously dispatch incoming CDP frames."""
 704          assert self._ws is not None
 705          try:
 706              async for raw in self._ws:
 707                  if self._stop_requested:
 708                      break
 709                  try:
 710                      msg = json.loads(raw)
 711                  except Exception:
 712                      logger.debug("CDP supervisor: non-JSON frame dropped")
 713                      continue
 714                  if "id" in msg:
 715                      fut = self._pending_calls.pop(msg["id"], None)
 716                      if fut is not None and not fut.done():
 717                          if "error" in msg:
 718                              fut.set_exception(
 719                                  RuntimeError(f"CDP error on id={msg['id']}: {msg['error']}")
 720                              )
 721                          else:
 722                              fut.set_result(msg)
 723                  elif "method" in msg:
 724                      await self._on_event(msg["method"], msg.get("params", {}), msg.get("sessionId"))
 725          except Exception as e:
 726              logger.debug("CDP read loop exited: %s", e)
 727  
 728      # ── Event dispatch ──────────────────────────────────────────────────────
 729  
 730      async def _on_event(
 731          self, method: str, params: Dict[str, Any], session_id: Optional[str]
 732      ) -> None:
 733          if method == "Page.javascriptDialogOpening":
 734              await self._on_dialog_opening(params, session_id)
 735          elif method == "Page.javascriptDialogClosed":
 736              await self._on_dialog_closed(params, session_id)
 737          elif method == "Fetch.requestPaused":
 738              await self._on_fetch_paused(params, session_id)
 739          elif method == "Page.frameAttached":
 740              self._on_frame_attached(params, session_id)
 741          elif method == "Page.frameNavigated":
 742              self._on_frame_navigated(params, session_id)
 743          elif method == "Page.frameDetached":
 744              self._on_frame_detached(params, session_id)
 745          elif method == "Target.attachedToTarget":
 746              await self._on_target_attached(params)
 747          elif method == "Target.detachedFromTarget":
 748              self._on_target_detached(params)
 749          elif method == "Runtime.consoleAPICalled":
 750              self._on_console(params, level_from="api")
 751          elif method == "Runtime.exceptionThrown":
 752              self._on_console(params, level_from="exception")
 753  
 754      async def _on_dialog_opening(
 755          self, params: Dict[str, Any], session_id: Optional[str]
 756      ) -> None:
 757          self._dialog_seq += 1
 758          dialog = PendingDialog(
 759              id=f"d-{self._dialog_seq}",
 760              type=str(params.get("type") or ""),
 761              message=str(params.get("message") or ""),
 762              default_prompt=str(params.get("defaultPrompt") or ""),
 763              opened_at=time.time(),
 764              cdp_session_id=session_id or self._page_session_id or "",
 765              frame_id=params.get("frameId"),
 766          )
 767  
 768          if self.dialog_policy == DIALOG_POLICY_AUTO_DISMISS:
 769              # Archive immediately with the policy tag so the ``closed`` event
 770              # arriving right after our handleJavaScriptDialog call doesn't
 771              # re-archive it as "remote".
 772              with self._state_lock:
 773                  self._archive_dialog_locked(dialog, "auto_policy")
 774              asyncio.create_task(
 775                  self._auto_handle_dialog(dialog, accept=False, prompt_text="")
 776              )
 777          elif self.dialog_policy == DIALOG_POLICY_AUTO_ACCEPT:
 778              with self._state_lock:
 779                  self._archive_dialog_locked(dialog, "auto_policy")
 780              asyncio.create_task(
 781                  self._auto_handle_dialog(
 782                      dialog, accept=True, prompt_text=dialog.default_prompt
 783                  )
 784              )
 785          else:
 786              # must_respond → add to pending and arm watchdog.
 787              with self._state_lock:
 788                  self._pending_dialogs[dialog.id] = dialog
 789              loop = asyncio.get_running_loop()
 790              handle = loop.call_later(
 791                  self.dialog_timeout_s,
 792                  lambda: asyncio.create_task(self._dialog_timeout_expired(dialog.id)),
 793              )
 794              self._dialog_watchdogs[dialog.id] = handle
 795  
 796      async def _auto_handle_dialog(
 797          self, dialog: PendingDialog, *, accept: bool, prompt_text: str
 798      ) -> None:
 799          """Send handleJavaScriptDialog for auto_dismiss/auto_accept.
 800  
 801          Dialog has already been archived by the caller (``_on_dialog_opening``);
 802          this just fires the CDP call so the page unblocks.
 803          """
 804          params: Dict[str, Any] = {"accept": accept}
 805          if dialog.type == "prompt":
 806              params["promptText"] = prompt_text
 807          try:
 808              await self._cdp(
 809                  "Page.handleJavaScriptDialog",
 810                  params,
 811                  session_id=dialog.cdp_session_id or None,
 812                  timeout=5.0,
 813              )
 814          except Exception as e:
 815              logger.debug("auto-handle CDP call failed for %s: %s", dialog.id, e)
 816  
 817      async def _dialog_timeout_expired(self, dialog_id: str) -> None:
 818          with self._state_lock:
 819              dialog = self._pending_dialogs.get(dialog_id)
 820          if dialog is None:
 821              return
 822          logger.warning(
 823              "CDP supervisor %s: dialog %s (%s) auto-dismissed after %ss timeout",
 824              self.task_id,
 825              dialog_id,
 826              dialog.type,
 827              self.dialog_timeout_s,
 828          )
 829          try:
 830              # Archive with watchdog tag BEFORE fulfilling / dismissing.
 831              with self._state_lock:
 832                  if dialog_id in self._pending_dialogs:
 833                      self._pending_dialogs.pop(dialog_id, None)
 834                      self._archive_dialog_locked(dialog, "watchdog")
 835              # Unblock the page — via bridge Fetch fulfill for bridge dialogs,
 836              # else native Page.handleJavaScriptDialog for real dialogs.
 837              if dialog.bridge_request_id:
 838                  await self._fulfill_bridge_request(dialog, accept=False, prompt_text="")
 839              else:
 840                  await self._cdp(
 841                      "Page.handleJavaScriptDialog",
 842                      {"accept": False},
 843                      session_id=dialog.cdp_session_id or None,
 844                      timeout=5.0,
 845                  )
 846          except Exception as e:
 847              logger.debug("auto-dismiss failed for %s: %s", dialog_id, e)
 848  
 849      def _archive_dialog_locked(self, dialog: PendingDialog, closed_by: str) -> None:
 850          """Move a pending dialog to the recent_dialogs ring buffer. Must hold state_lock."""
 851          record = DialogRecord(
 852              id=dialog.id,
 853              type=dialog.type,
 854              message=dialog.message,
 855              opened_at=dialog.opened_at,
 856              closed_at=time.time(),
 857              closed_by=closed_by,
 858              frame_id=dialog.frame_id,
 859          )
 860          self._recent_dialogs.append(record)
 861          if len(self._recent_dialogs) > RECENT_DIALOGS_MAX * 2:
 862              self._recent_dialogs = self._recent_dialogs[-RECENT_DIALOGS_MAX:]
 863  
 864      async def _handle_dialog_cdp(
 865          self, dialog: PendingDialog, *, accept: bool, prompt_text: str
 866      ) -> None:
 867          """Send the Page.handleJavaScriptDialog CDP command (agent path only).
 868  
 869          Routes to the bridge-fulfill path when the dialog was captured via
 870          the injected XHR override (see ``_on_fetch_paused``).
 871          """
 872          if dialog.bridge_request_id:
 873              try:
 874                  await self._fulfill_bridge_request(
 875                      dialog, accept=accept, prompt_text=prompt_text
 876                  )
 877              finally:
 878                  with self._state_lock:
 879                      if dialog.id in self._pending_dialogs:
 880                          self._pending_dialogs.pop(dialog.id, None)
 881                          self._archive_dialog_locked(dialog, "agent")
 882                  handle = self._dialog_watchdogs.pop(dialog.id, None)
 883                  if handle is not None:
 884                      handle.cancel()
 885              return
 886  
 887          params: Dict[str, Any] = {"accept": accept}
 888          if dialog.type == "prompt":
 889              params["promptText"] = prompt_text
 890          try:
 891              await self._cdp(
 892                  "Page.handleJavaScriptDialog",
 893                  params,
 894                  session_id=dialog.cdp_session_id or None,
 895                  timeout=5.0,
 896              )
 897          finally:
 898              # Clear regardless — the CDP error path usually means the dialog
 899              # already closed (browser auto-dismissed after navigation, etc.).
 900              with self._state_lock:
 901                  if dialog.id in self._pending_dialogs:
 902                      self._pending_dialogs.pop(dialog.id, None)
 903                      self._archive_dialog_locked(dialog, "agent")
 904              handle = self._dialog_watchdogs.pop(dialog.id, None)
 905              if handle is not None:
 906                  handle.cancel()
 907  
 908      async def _on_dialog_closed(
 909          self, params: Dict[str, Any], session_id: Optional[str]
 910      ) -> None:
 911          # ``Page.javascriptDialogClosed`` spec has only ``result`` (bool) and
 912          # ``userInput`` (string), not the original ``message``.  Match by
 913          # session id and clear the oldest dialog on that session — if Chrome
 914          # closed one on us (e.g. our disconnect auto-dismissed it, or the
 915          # browser navigated, or Browserbase's CDP proxy auto-dismissed), there
 916          # shouldn't be more than one in flight per session anyway because the
 917          # JS thread is blocked while a dialog is up.
 918          with self._state_lock:
 919              candidate_ids = [
 920                  d.id
 921                  for d in self._pending_dialogs.values()
 922                  if d.cdp_session_id == session_id
 923                  # Bridge-captured dialogs aren't cleared by native close events;
 924                  # they're resolved via Fetch.fulfillRequest instead. Only the
 925                  # real-native-dialog path uses Page.javascriptDialogClosed.
 926                  and d.bridge_request_id is None
 927              ]
 928              if candidate_ids:
 929                  did = candidate_ids[0]
 930                  dialog = self._pending_dialogs.pop(did, None)
 931                  if dialog is not None:
 932                      self._archive_dialog_locked(dialog, "remote")
 933                  handle = self._dialog_watchdogs.pop(did, None)
 934                  if handle is not None:
 935                      handle.cancel()
 936  
 937      async def _on_fetch_paused(
 938          self, params: Dict[str, Any], session_id: Optional[str]
 939      ) -> None:
 940          """Bridge XHR captured mid-flight — materialize as a pending dialog.
 941  
 942          The injected script (``_DIALOG_BRIDGE_SCRIPT``) fires a synchronous
 943          XHR to ``DIALOG_BRIDGE_HOST`` whenever page code calls alert/confirm/
 944          prompt. We catch it via Fetch.enable pattern; the page's JS thread
 945          is blocked on the XHR's response until we call Fetch.fulfillRequest
 946          (which happens from ``respond_to_dialog``) or until the watchdog
 947          fires (at which point we fulfill with a cancel response).
 948          """
 949          url = str(params.get("request", {}).get("url") or "")
 950          request_id = params.get("requestId")
 951          if not request_id:
 952              return
 953          # Only care about our bridge URLs. Fetch can still deliver other
 954          # intercepted requests if patterns were ever broadened.
 955          if DIALOG_BRIDGE_HOST not in url:
 956              # Not ours — forward unchanged so the page sees its own request.
 957              try:
 958                  await self._cdp(
 959                      "Fetch.continueRequest", {"requestId": request_id},
 960                      session_id=session_id, timeout=3.0,
 961                  )
 962              except Exception:
 963                  pass
 964              return
 965  
 966          # Parse query string for dialog metadata. Use urllib to be robust.
 967          from urllib.parse import urlparse, parse_qs
 968          q = parse_qs(urlparse(url).query)
 969  
 970          def _q(name: str) -> str:
 971              v = q.get(name, [""])
 972              return v[0] if v else ""
 973  
 974          kind = _q("kind") or "alert"
 975          message = _q("message")
 976          default_prompt = _q("default_prompt")
 977  
 978          self._dialog_seq += 1
 979          dialog = PendingDialog(
 980              id=f"d-{self._dialog_seq}",
 981              type=kind,
 982              message=message,
 983              default_prompt=default_prompt,
 984              opened_at=time.time(),
 985              cdp_session_id=session_id or self._page_session_id or "",
 986              frame_id=params.get("frameId"),
 987              bridge_request_id=str(request_id),
 988          )
 989  
 990          # Apply policy exactly as for native dialogs.
 991          if self.dialog_policy == DIALOG_POLICY_AUTO_DISMISS:
 992              with self._state_lock:
 993                  self._archive_dialog_locked(dialog, "auto_policy")
 994              asyncio.create_task(
 995                  self._fulfill_bridge_request(dialog, accept=False, prompt_text="")
 996              )
 997          elif self.dialog_policy == DIALOG_POLICY_AUTO_ACCEPT:
 998              with self._state_lock:
 999                  self._archive_dialog_locked(dialog, "auto_policy")
1000              asyncio.create_task(
1001                  self._fulfill_bridge_request(
1002                      dialog, accept=True, prompt_text=default_prompt
1003                  )
1004              )
1005          else:
1006              # must_respond — add to pending + arm watchdog.
1007              with self._state_lock:
1008                  self._pending_dialogs[dialog.id] = dialog
1009              loop = asyncio.get_running_loop()
1010              handle = loop.call_later(
1011                  self.dialog_timeout_s,
1012                  lambda: asyncio.create_task(self._dialog_timeout_expired(dialog.id)),
1013              )
1014              self._dialog_watchdogs[dialog.id] = handle
1015  
1016      async def _fulfill_bridge_request(
1017          self, dialog: PendingDialog, *, accept: bool, prompt_text: str
1018      ) -> None:
1019          """Resolve a bridge XHR via Fetch.fulfillRequest so the page unblocks."""
1020          if not dialog.bridge_request_id:
1021              return
1022          payload = {
1023              "accept": bool(accept),
1024              "prompt_text": prompt_text if dialog.type == "prompt" else "",
1025              "dialog_id": dialog.id,
1026          }
1027          body = json.dumps(payload).encode()
1028          try:
1029              import base64 as _b64
1030              await self._cdp(
1031                  "Fetch.fulfillRequest",
1032                  {
1033                      "requestId": dialog.bridge_request_id,
1034                      "responseCode": 200,
1035                      "responseHeaders": [
1036                          {"name": "Content-Type", "value": "application/json"},
1037                          {"name": "Access-Control-Allow-Origin", "value": "*"},
1038                      ],
1039                      "body": _b64.b64encode(body).decode(),
1040                  },
1041                  session_id=dialog.cdp_session_id or None,
1042                  timeout=5.0,
1043              )
1044          except Exception as e:
1045              logger.debug("bridge fulfill failed for %s: %s", dialog.id, e)
1046  
1047      # ── Frame / target tracking ─────────────────────────────────────────────
1048  
1049      def _on_frame_attached(
1050          self, params: Dict[str, Any], session_id: Optional[str]
1051      ) -> None:
1052          frame_id = params.get("frameId")
1053          if not frame_id:
1054              return
1055          with self._state_lock:
1056              self._frames[frame_id] = FrameInfo(
1057                  frame_id=frame_id,
1058                  url="",
1059                  origin="",
1060                  parent_frame_id=params.get("parentFrameId"),
1061                  is_oopif=False,
1062                  cdp_session_id=session_id,
1063              )
1064  
1065      def _on_frame_navigated(
1066          self, params: Dict[str, Any], session_id: Optional[str]
1067      ) -> None:
1068          frame = params.get("frame") or {}
1069          frame_id = frame.get("id")
1070          if not frame_id:
1071              return
1072          with self._state_lock:
1073              existing = self._frames.get(frame_id)
1074              info = FrameInfo(
1075                  frame_id=frame_id,
1076                  url=str(frame.get("url") or ""),
1077                  origin=str(frame.get("securityOrigin") or frame.get("origin") or ""),
1078                  parent_frame_id=frame.get("parentId") or (existing.parent_frame_id if existing else None),
1079                  is_oopif=bool(existing.is_oopif if existing else False),
1080                  cdp_session_id=existing.cdp_session_id if existing else session_id,
1081                  name=str(frame.get("name") or (existing.name if existing else "")),
1082              )
1083              self._frames[frame_id] = info
1084  
1085      def _on_frame_detached(
1086          self, params: Dict[str, Any], session_id: Optional[str]
1087      ) -> None:
1088          """Remove a frame from our state only when it's truly gone.
1089  
1090          CDP emits ``Page.frameDetached`` with a ``reason`` of either
1091          ``"remove"`` (the frame is actually gone from the DOM) or ``"swap"``
1092          (the frame is migrating to a new process — typical when a
1093          same-process iframe becomes an OOPIF, or when history navigates).
1094          Dropping on ``swap`` would hide OOPIFs from the agent the moment
1095          Chromium promotes them to their own process, so treat swap as a
1096          no-op.
1097  
1098          Even with ``reason=remove``, the parent page's perspective is
1099          "the child frame left MY process tree" — which is what happens
1100          when a same-origin iframe gets promoted to an OOPIF. If we
1101          already have a live child CDP session attached for that frame_id,
1102          the frame is still very much alive; only drop it when we have
1103          no session record.
1104          """
1105          frame_id = params.get("frameId")
1106          if not frame_id:
1107              return
1108          reason = str(params.get("reason") or "remove").lower()
1109          if reason == "swap":
1110              return
1111          with self._state_lock:
1112              existing = self._frames.get(frame_id)
1113              # Keep OOPIF records even when the parent says the frame was
1114              # "removed" — the iframe is still visible, just in a different
1115              # process. If the frame truly goes away later, Target.detached
1116              # + the next Page.frameDetached without a live session will
1117              # clear it.
1118              if existing and existing.is_oopif and existing.cdp_session_id:
1119                  return
1120              self._frames.pop(frame_id, None)
1121  
1122      async def _on_target_attached(self, params: Dict[str, Any]) -> None:
1123          info = params.get("targetInfo") or {}
1124          sid = params.get("sessionId")
1125          target_type = info.get("type")
1126          if not sid or target_type not in ("iframe", "worker"):
1127              return
1128          self._child_sessions[sid] = {"info": info, "type": target_type}
1129  
1130          # Record the frame with its OOPIF session id for interaction routing.
1131          if target_type == "iframe":
1132              target_id = info.get("targetId")
1133              with self._state_lock:
1134                  existing = self._frames.get(target_id)
1135                  self._frames[target_id] = FrameInfo(
1136                      frame_id=target_id,
1137                      url=str(info.get("url") or ""),
1138                      origin="",  # filled by frameNavigated on the child session
1139                      parent_frame_id=(existing.parent_frame_id if existing else None),
1140                      is_oopif=True,
1141                      cdp_session_id=sid,
1142                      name=str(info.get("title") or (existing.name if existing else "")),
1143                  )
1144  
1145          # Enable domains on the child off-loop so the reader keeps pumping.
1146          # Awaiting the CDP replies here would deadlock because only the
1147          # reader can resolve those replies' Futures.
1148          asyncio.create_task(self._enable_child_domains(sid))
1149  
1150      async def _enable_child_domains(self, sid: str) -> None:
1151          """Enable Page+Runtime (+nested setAutoAttach) on a child CDP session.
1152  
1153          Also installs the dialog bridge so iframe-scoped alert/confirm/prompt
1154          calls round-trip through Fetch too.
1155          """
1156          try:
1157              await self._cdp("Page.enable", session_id=sid, timeout=3.0)
1158              await self._cdp("Runtime.enable", session_id=sid, timeout=3.0)
1159              await self._cdp(
1160                  "Target.setAutoAttach",
1161                  {"autoAttach": True, "waitForDebuggerOnStart": False, "flatten": True},
1162                  session_id=sid,
1163                  timeout=3.0,
1164              )
1165          except Exception as e:
1166              logger.debug("child session %s setup failed: %s", sid[:16], e)
1167          # Install the dialog bridge on the child so iframe dialogs are captured.
1168          await self._install_dialog_bridge(sid)
1169  
1170      def _on_target_detached(self, params: Dict[str, Any]) -> None:
1171          """Handle a child CDP session detaching.
1172  
1173          We deliberately DO NOT drop frames from ``_frames`` here — Browserbase
1174          fires transient detach events during page transitions even while the
1175          iframe is still visible to the user, and dropping the record hides
1176          OOPIFs from the agent between the detach and the next
1177          ``Target.attachedToTarget``. Instead, we just clear the session
1178          binding so stale ``cdp_session_id`` values aren't used for routing.
1179          If the iframe truly goes away, ``Page.frameDetached`` will clean up.
1180          """
1181          sid = params.get("sessionId")
1182          if not sid:
1183              return
1184          self._child_sessions.pop(sid, None)
1185          with self._state_lock:
1186              for fid, frame in list(self._frames.items()):
1187                  if frame.cdp_session_id == sid:
1188                      # Replace with a copy that has cdp_session_id cleared so
1189                      # routing falls back to top-level page session if retried.
1190                      self._frames[fid] = FrameInfo(
1191                          frame_id=frame.frame_id,
1192                          url=frame.url,
1193                          origin=frame.origin,
1194                          parent_frame_id=frame.parent_frame_id,
1195                          is_oopif=frame.is_oopif,
1196                          cdp_session_id=None,
1197                          name=frame.name,
1198                      )
1199  
1200      # ── Console / exception ring buffer ─────────────────────────────────────
1201  
1202      def _on_console(self, params: Dict[str, Any], *, level_from: str) -> None:
1203          if level_from == "exception":
1204              details = params.get("exceptionDetails") or {}
1205              text = str(details.get("text") or "")
1206              url = details.get("url")
1207              event = ConsoleEvent(ts=time.time(), level="exception", text=text, url=url)
1208          else:
1209              raw_level = str(params.get("type") or "log")
1210              level = "error" if raw_level in ("error", "assert") else (
1211                  "warning" if raw_level == "warning" else "log"
1212              )
1213              args = params.get("args") or []
1214              parts: List[str] = []
1215              for a in args[:4]:
1216                  if isinstance(a, dict):
1217                      parts.append(str(a.get("value") or a.get("description") or ""))
1218              event = ConsoleEvent(ts=time.time(), level=level, text=" ".join(parts))
1219          with self._state_lock:
1220              self._console_events.append(event)
1221              if len(self._console_events) > CONSOLE_HISTORY_MAX * 2:
1222                  # Keep last CONSOLE_HISTORY_MAX; allow 2x slack to reduce churn.
1223                  self._console_events = self._console_events[-CONSOLE_HISTORY_MAX:]
1224  
1225      # ── Frame tree building (bounded) ───────────────────────────────────────
1226  
1227      def _build_frame_tree_locked(self) -> Dict[str, Any]:
1228          """Build the capped frame_tree payload. Must be called under state lock."""
1229          frames = self._frames
1230          if not frames:
1231              return {"top": None, "children": [], "truncated": False}
1232  
1233          # Identify a top frame — one with no parent, preferring oopif=False.
1234          tops = [f for f in frames.values() if not f.parent_frame_id]
1235          top = next((f for f in tops if not f.is_oopif), tops[0] if tops else None)
1236  
1237          # BFS from top, capped by FRAME_TREE_MAX_ENTRIES and
1238          # FRAME_TREE_MAX_OOPIF_DEPTH for OOPIF branches.
1239          children: List[Dict[str, Any]] = []
1240          truncated = False
1241          if top is None:
1242              return {"top": None, "children": [], "truncated": False}
1243  
1244          queue: List[Tuple[FrameInfo, int]] = [
1245              (f, 1) for f in frames.values() if f.parent_frame_id == top.frame_id
1246          ]
1247          visited: set[str] = {top.frame_id}
1248          while queue and len(children) < FRAME_TREE_MAX_ENTRIES:
1249              frame, depth = queue.pop(0)
1250              if frame.frame_id in visited:
1251                  continue
1252              visited.add(frame.frame_id)
1253              if frame.is_oopif and depth > FRAME_TREE_MAX_OOPIF_DEPTH:
1254                  truncated = True
1255                  continue
1256              children.append(frame.to_dict())
1257              for f in frames.values():
1258                  if f.parent_frame_id == frame.frame_id and f.frame_id not in visited:
1259                      queue.append((f, depth + 1))
1260          if queue:
1261              truncated = True
1262  
1263          return {
1264              "top": top.to_dict(),
1265              "children": children,
1266              "truncated": truncated,
1267          }
1268  
1269  
1270  # ── Registry ─────────────────────────────────────────────────────────────────
1271  
1272  
1273  class _SupervisorRegistry:
1274      """Process-global (task_id → supervisor) map with idempotent start/stop.
1275  
1276      One instance, exposed as ``SUPERVISOR_REGISTRY``. Safe to call from any
1277      thread — mutations go through ``_lock``.
1278      """
1279  
1280      def __init__(self) -> None:
1281          self._lock = threading.Lock()
1282          self._by_task: Dict[str, CDPSupervisor] = {}
1283  
1284      def get(self, task_id: str) -> Optional[CDPSupervisor]:
1285          """Return the supervisor for ``task_id`` if running, else ``None``."""
1286          with self._lock:
1287              return self._by_task.get(task_id)
1288  
1289      def get_or_start(
1290          self,
1291          task_id: str,
1292          cdp_url: str,
1293          *,
1294          dialog_policy: str = DEFAULT_DIALOG_POLICY,
1295          dialog_timeout_s: float = DEFAULT_DIALOG_TIMEOUT_S,
1296          start_timeout: float = 15.0,
1297      ) -> CDPSupervisor:
1298          """Idempotently ensure a supervisor is running for ``(task_id, cdp_url)``.
1299  
1300          If a supervisor exists for this task but was bound to a different
1301          ``cdp_url``, the old one is stopped and a fresh one is started.
1302          """
1303          with self._lock:
1304              existing = self._by_task.get(task_id)
1305              if existing is not None:
1306                  if existing.cdp_url == cdp_url:
1307                      thread_ok = existing._thread is not None and existing._thread.is_alive()
1308                      loop_ok = existing._loop is not None and existing._loop.is_running()
1309                      if thread_ok and loop_ok:
1310                          return existing
1311                      # Unhealthy — tear down and recreate.
1312                  # URL changed or unhealthy — tear down, fall through to re-create.
1313                  self._by_task.pop(task_id, None)
1314          if existing is not None:
1315              existing.stop()
1316  
1317          supervisor = CDPSupervisor(
1318              task_id=task_id,
1319              cdp_url=cdp_url,
1320              dialog_policy=dialog_policy,
1321              dialog_timeout_s=dialog_timeout_s,
1322          )
1323          supervisor.start(timeout=start_timeout)
1324          with self._lock:
1325              # Guard against a concurrent get_or_start from another thread.
1326              already = self._by_task.get(task_id)
1327              if already is not None and already.cdp_url == cdp_url:
1328                  supervisor.stop()
1329                  return already
1330              self._by_task[task_id] = supervisor
1331          return supervisor
1332  
1333      def stop(self, task_id: str) -> None:
1334          """Stop and discard the supervisor for ``task_id`` if it exists."""
1335          with self._lock:
1336              supervisor = self._by_task.pop(task_id, None)
1337          if supervisor is not None:
1338              supervisor.stop()
1339  
1340      def stop_all(self) -> None:
1341          """Stop every running supervisor. For shutdown / test teardown."""
1342          with self._lock:
1343              items = list(self._by_task.items())
1344              self._by_task.clear()
1345          for _, supervisor in items:
1346              supervisor.stop()
1347  
1348  
1349  SUPERVISOR_REGISTRY = _SupervisorRegistry()
1350  
1351  
1352  __all__ = [
1353      "CDPSupervisor",
1354      "ConsoleEvent",
1355      "DEFAULT_DIALOG_POLICY",
1356      "DEFAULT_DIALOG_TIMEOUT_S",
1357      "DIALOG_POLICY_AUTO_ACCEPT",
1358      "DIALOG_POLICY_AUTO_DISMISS",
1359      "DIALOG_POLICY_MUST_RESPOND",
1360      "DialogRecord",
1361      "FrameInfo",
1362      "PendingDialog",
1363      "SUPERVISOR_REGISTRY",
1364      "SupervisorSnapshot",
1365      "_SupervisorRegistry",
1366  ]