test_browser_cdp_tool.py
1 """Unit tests for browser_cdp tool. 2 3 Uses a tiny in-process ``websockets`` server to simulate a CDP endpoint — 4 gives real protocol coverage (connect, send, recv, close) without needing 5 a real Chrome instance. 6 """ 7 from __future__ import annotations 8 9 import asyncio 10 import json 11 import threading 12 import time 13 from typing import Any, Dict, List 14 15 import pytest 16 17 import websockets 18 from websockets.asyncio.server import serve 19 20 from tools import browser_cdp_tool 21 22 23 # --------------------------------------------------------------------------- 24 # In-process CDP mock server 25 # --------------------------------------------------------------------------- 26 27 28 class _CDPServer: 29 """A tiny CDP-over-WebSocket mock. 30 31 Each client gets a greeting-free stream. The server replies to each 32 inbound request whose ``id`` is set, using the registered handler for 33 that method. If no handler is registered, returns a generic CDP error. 34 """ 35 36 def __init__(self) -> None: 37 self._handlers: Dict[str, Any] = {} 38 self._responses: List[Dict[str, Any]] = [] 39 self._loop: asyncio.AbstractEventLoop | None = None 40 self._server: Any = None 41 self._thread: threading.Thread | None = None 42 self._host = "127.0.0.1" 43 self._port = 0 44 45 # --- handler registration -------------------------------------------- 46 47 def on(self, method: str, handler): 48 """Register a handler ``handler(params, session_id) -> dict or Exception``.""" 49 self._handlers[method] = handler 50 51 # --- lifecycle ------------------------------------------------------- 52 53 def start(self) -> str: 54 ready = threading.Event() 55 56 def _run() -> None: 57 self._loop = asyncio.new_event_loop() 58 asyncio.set_event_loop(self._loop) 59 60 async def _handler(ws): 61 try: 62 async for raw in ws: 63 msg = json.loads(raw) 64 call_id = msg.get("id") 65 method = msg.get("method", "") 66 params = msg.get("params", {}) or {} 67 session_id = msg.get("sessionId") 68 self._responses.append(msg) 69 70 fn = self._handlers.get(method) 71 if fn is None: 72 reply = { 73 "id": call_id, 74 "error": { 75 "code": -32601, 76 "message": f"No handler for {method}", 77 }, 78 } 79 else: 80 try: 81 result = fn(params, session_id) 82 if isinstance(result, Exception): 83 raise result 84 reply = {"id": call_id, "result": result} 85 except Exception as exc: 86 reply = { 87 "id": call_id, 88 "error": {"code": -1, "message": str(exc)}, 89 } 90 if session_id: 91 reply["sessionId"] = session_id 92 await ws.send(json.dumps(reply)) 93 except websockets.exceptions.ConnectionClosed: 94 pass 95 96 async def _serve() -> None: 97 self._server = await serve(_handler, self._host, 0) 98 sock = next(iter(self._server.sockets)) 99 self._port = sock.getsockname()[1] 100 ready.set() 101 await self._server.wait_closed() 102 103 try: 104 self._loop.run_until_complete(_serve()) 105 finally: 106 self._loop.close() 107 108 self._thread = threading.Thread(target=_run, daemon=True) 109 self._thread.start() 110 if not ready.wait(timeout=5.0): 111 raise RuntimeError("CDP mock server failed to start within 5s") 112 return f"ws://{self._host}:{self._port}/devtools/browser/mock" 113 114 def stop(self) -> None: 115 if self._loop and self._server: 116 def _close() -> None: 117 self._server.close() 118 119 self._loop.call_soon_threadsafe(_close) 120 if self._thread: 121 self._thread.join(timeout=3.0) 122 123 def received(self) -> List[Dict[str, Any]]: 124 return list(self._responses) 125 126 127 # --------------------------------------------------------------------------- 128 # Fixtures 129 # --------------------------------------------------------------------------- 130 131 132 @pytest.fixture 133 def cdp_server(monkeypatch): 134 """Start a CDP mock and route tool resolution to it.""" 135 server = _CDPServer() 136 ws_url = server.start() 137 monkeypatch.setattr( 138 browser_cdp_tool, "_resolve_cdp_endpoint", lambda: ws_url 139 ) 140 try: 141 yield server 142 finally: 143 server.stop() 144 145 146 # --------------------------------------------------------------------------- 147 # Input validation 148 # --------------------------------------------------------------------------- 149 150 151 def test_missing_method_returns_error(): 152 result = json.loads(browser_cdp_tool.browser_cdp(method="")) 153 assert "error" in result 154 assert "method" in result["error"].lower() 155 assert result.get("cdp_docs") == browser_cdp_tool.CDP_DOCS_URL 156 157 158 def test_non_string_method_returns_error(): 159 result = json.loads(browser_cdp_tool.browser_cdp(method=123)) # type: ignore[arg-type] 160 assert "error" in result 161 assert "method" in result["error"].lower() 162 163 164 def test_non_dict_params_returns_error(monkeypatch): 165 monkeypatch.setattr( 166 browser_cdp_tool, "_resolve_cdp_endpoint", lambda: "ws://localhost:9999" 167 ) 168 result = json.loads( 169 browser_cdp_tool.browser_cdp(method="Target.getTargets", params="not-a-dict") # type: ignore[arg-type] 170 ) 171 assert "error" in result 172 assert "object" in result["error"].lower() or "dict" in result["error"].lower() 173 174 175 # --------------------------------------------------------------------------- 176 # Endpoint resolution 177 # --------------------------------------------------------------------------- 178 179 180 def test_no_endpoint_returns_helpful_error(monkeypatch): 181 monkeypatch.setattr(browser_cdp_tool, "_resolve_cdp_endpoint", lambda: "") 182 result = json.loads(browser_cdp_tool.browser_cdp(method="Target.getTargets")) 183 assert "error" in result 184 assert "/browser connect" in result["error"] 185 assert result.get("cdp_docs") == browser_cdp_tool.CDP_DOCS_URL 186 187 188 def test_non_ws_endpoint_returns_error(monkeypatch): 189 monkeypatch.setattr( 190 browser_cdp_tool, "_resolve_cdp_endpoint", lambda: "http://localhost:9222" 191 ) 192 result = json.loads(browser_cdp_tool.browser_cdp(method="Target.getTargets")) 193 assert "error" in result 194 assert "WebSocket" in result["error"] 195 196 197 def test_websockets_missing_returns_error(monkeypatch): 198 monkeypatch.setattr(browser_cdp_tool, "_WS_AVAILABLE", False) 199 result = json.loads(browser_cdp_tool.browser_cdp(method="Target.getTargets")) 200 assert "error" in result 201 assert "websockets" in result["error"].lower() 202 203 204 # --------------------------------------------------------------------------- 205 # Happy-path: browser-level call 206 # --------------------------------------------------------------------------- 207 208 209 def test_browser_level_success(cdp_server): 210 cdp_server.on( 211 "Target.getTargets", 212 lambda params, sid: { 213 "targetInfos": [ 214 {"targetId": "A", "type": "page", "title": "Tab 1", "url": "about:blank"}, 215 {"targetId": "B", "type": "page", "title": "Tab 2", "url": "https://a.test"}, 216 ] 217 }, 218 ) 219 result = json.loads(browser_cdp_tool.browser_cdp(method="Target.getTargets")) 220 assert result["success"] is True 221 assert result["method"] == "Target.getTargets" 222 assert "target_id" not in result 223 assert len(result["result"]["targetInfos"]) == 2 224 # Verify the server actually received exactly one call (no extra traffic) 225 calls = cdp_server.received() 226 assert len(calls) == 1 227 assert calls[0]["method"] == "Target.getTargets" 228 assert "sessionId" not in calls[0] 229 230 231 def test_empty_params_sends_empty_object(cdp_server): 232 cdp_server.on("Browser.getVersion", lambda params, sid: {"product": "Mock/1.0"}) 233 json.loads(browser_cdp_tool.browser_cdp(method="Browser.getVersion")) 234 assert cdp_server.received()[0]["params"] == {} 235 236 237 # --------------------------------------------------------------------------- 238 # Happy-path: target-attached call 239 # --------------------------------------------------------------------------- 240 241 242 def test_target_attach_then_call(cdp_server): 243 cdp_server.on( 244 "Target.attachToTarget", 245 lambda params, sid: {"sessionId": f"sess-{params['targetId']}"}, 246 ) 247 cdp_server.on( 248 "Runtime.evaluate", 249 lambda params, sid: { 250 "result": {"type": "string", "value": f"evaluated[{sid}]"}, 251 }, 252 ) 253 result = json.loads( 254 browser_cdp_tool.browser_cdp( 255 method="Runtime.evaluate", 256 params={"expression": "document.title", "returnByValue": True}, 257 target_id="tab-A", 258 ) 259 ) 260 assert result["success"] is True 261 assert result["target_id"] == "tab-A" 262 assert result["result"]["result"]["value"] == "evaluated[sess-tab-A]" 263 264 calls = cdp_server.received() 265 # First call: attach 266 assert calls[0]["method"] == "Target.attachToTarget" 267 assert calls[0]["params"] == {"targetId": "tab-A", "flatten": True} 268 # Second call: dispatched method on the session 269 assert calls[1]["method"] == "Runtime.evaluate" 270 assert calls[1]["sessionId"] == "sess-tab-A" 271 272 273 # --------------------------------------------------------------------------- 274 # CDP error responses 275 # --------------------------------------------------------------------------- 276 277 278 def test_cdp_method_error_returns_tool_error(cdp_server): 279 # No handler registered -> server returns CDP error 280 result = json.loads( 281 browser_cdp_tool.browser_cdp(method="NonExistent.method") 282 ) 283 assert "error" in result 284 assert "CDP error" in result["error"] 285 assert result.get("method") == "NonExistent.method" 286 287 288 def test_attach_failure_returns_tool_error(cdp_server): 289 # Target.attachToTarget has no handler -> server errors on attach 290 result = json.loads( 291 browser_cdp_tool.browser_cdp( 292 method="Runtime.evaluate", 293 params={"expression": "1+1"}, 294 target_id="missing", 295 ) 296 ) 297 assert "error" in result 298 assert "Target.attachToTarget" in result["error"] 299 300 301 # --------------------------------------------------------------------------- 302 # Timeouts 303 # --------------------------------------------------------------------------- 304 305 306 def test_timeout_when_server_never_replies(cdp_server): 307 # Register a handler that blocks forever 308 def slow(params, sid): 309 time.sleep(10) 310 return {} 311 312 cdp_server.on("Page.slowMethod", slow) 313 result = json.loads( 314 browser_cdp_tool.browser_cdp( 315 method="Page.slowMethod", timeout=0.5 316 ) 317 ) 318 assert "error" in result 319 assert "tim" in result["error"].lower() 320 321 322 # --------------------------------------------------------------------------- 323 # Timeout clamping 324 # --------------------------------------------------------------------------- 325 326 327 def test_timeout_clamped_above_max(cdp_server): 328 cdp_server.on("Browser.getVersion", lambda p, s: {"product": "ok"}) 329 # timeout=10_000 should be clamped to 300 but still succeed 330 result = json.loads( 331 browser_cdp_tool.browser_cdp(method="Browser.getVersion", timeout=10_000) 332 ) 333 assert result["success"] is True 334 335 336 def test_invalid_timeout_falls_back_to_default(cdp_server): 337 cdp_server.on("Browser.getVersion", lambda p, s: {"product": "ok"}) 338 result = json.loads( 339 browser_cdp_tool.browser_cdp(method="Browser.getVersion", timeout="nope") # type: ignore[arg-type] 340 ) 341 assert result["success"] is True 342 343 344 # --------------------------------------------------------------------------- 345 # Registry integration 346 # --------------------------------------------------------------------------- 347 348 349 def test_registered_in_browser_toolset(): 350 from tools.registry import registry 351 352 entry = registry.get_entry("browser_cdp") 353 assert entry is not None 354 # browser_cdp lives in its own toolset so its stricter check_fn 355 # (requires reachable CDP endpoint) doesn't gate the whole browser 356 # toolset — see commit 96b0f3700. 357 assert entry.toolset == "browser-cdp" 358 assert entry.schema["name"] == "browser_cdp" 359 assert entry.schema["parameters"]["required"] == ["method"] 360 assert "Chrome DevTools Protocol" in entry.schema["description"] 361 assert browser_cdp_tool.CDP_DOCS_URL in entry.schema["description"] 362 363 364 def test_dispatch_through_registry(cdp_server): 365 from tools.registry import registry 366 367 cdp_server.on("Target.getTargets", lambda p, s: {"targetInfos": []}) 368 raw = registry.dispatch( 369 "browser_cdp", {"method": "Target.getTargets"}, task_id="t1" 370 ) 371 result = json.loads(raw) 372 assert result["success"] is True 373 assert result["method"] == "Target.getTargets" 374 375 376 # --------------------------------------------------------------------------- 377 # check_fn gating 378 # --------------------------------------------------------------------------- 379 380 381 def test_check_fn_false_when_no_cdp_url(monkeypatch): 382 """Gate closes when no CDP URL is set — even if the browser toolset is 383 otherwise configured.""" 384 import tools.browser_tool as bt 385 386 monkeypatch.setattr(bt, "check_browser_requirements", lambda: True) 387 monkeypatch.setattr(bt, "_get_cdp_override", lambda: "") 388 assert browser_cdp_tool._browser_cdp_check() is False 389 390 391 def test_check_fn_true_when_cdp_url_set(monkeypatch): 392 """Gate opens as soon as a CDP URL is resolvable.""" 393 import tools.browser_tool as bt 394 395 monkeypatch.setattr(bt, "check_browser_requirements", lambda: True) 396 monkeypatch.setattr( 397 bt, "_get_cdp_override", lambda: "ws://localhost:9222/devtools/browser/x" 398 ) 399 assert browser_cdp_tool._browser_cdp_check() is True 400 401 402 def test_check_fn_false_when_browser_requirements_fail(monkeypatch): 403 """Even with a CDP URL, gate closes if the overall browser toolset is 404 unavailable (e.g. agent-browser not installed).""" 405 import tools.browser_tool as bt 406 407 monkeypatch.setattr(bt, "check_browser_requirements", lambda: False) 408 monkeypatch.setattr( 409 bt, "_get_cdp_override", lambda: "ws://localhost:9222/devtools/browser/x" 410 ) 411 assert browser_cdp_tool._browser_cdp_check() is False