test_gemini_cloudcode.py
1 """Tests for the google-gemini-cli OAuth + Code Assist inference provider. 2 3 Covers: 4 - agent/google_oauth.py — PKCE, credential I/O with packed refresh format, 5 token refresh dedup, invalid_grant handling, headless paste fallback 6 - agent/google_code_assist.py — project discovery, VPC-SC fallback, onboarding 7 with LRO polling, quota retrieval 8 - agent/gemini_cloudcode_adapter.py — OpenAI↔Gemini translation, request 9 envelope wrapping, response unwrapping, tool calls bidirectional, streaming 10 - Provider registration — registry entry, aliases, runtime dispatch, auth 11 status, _OAUTH_CAPABLE_PROVIDERS regression guard 12 """ 13 from __future__ import annotations 14 15 import base64 16 import hashlib 17 import json 18 import stat 19 import time 20 from pathlib import Path 21 from types import SimpleNamespace 22 from unittest.mock import MagicMock, patch 23 24 import pytest 25 26 27 # ============================================================================= 28 # Fixtures 29 # ============================================================================= 30 31 @pytest.fixture(autouse=True) 32 def _isolate_env(monkeypatch, tmp_path): 33 home = tmp_path / ".hermes" 34 home.mkdir(parents=True) 35 monkeypatch.setattr(Path, "home", lambda: tmp_path) 36 monkeypatch.setenv("HERMES_HOME", str(home)) 37 for key in ( 38 "HERMES_GEMINI_CLIENT_ID", 39 "HERMES_GEMINI_CLIENT_SECRET", 40 "HERMES_GEMINI_PROJECT_ID", 41 "GOOGLE_CLOUD_PROJECT", 42 "GOOGLE_CLOUD_PROJECT_ID", 43 "SSH_CONNECTION", 44 "SSH_CLIENT", 45 "SSH_TTY", 46 "HERMES_HEADLESS", 47 ): 48 monkeypatch.delenv(key, raising=False) 49 return home 50 51 52 # ============================================================================= 53 # google_oauth.py — PKCE + packed refresh format 54 # ============================================================================= 55 56 class TestPkce: 57 def test_verifier_and_challenge_s256_roundtrip(self): 58 from agent.google_oauth import _generate_pkce_pair 59 60 verifier, challenge = _generate_pkce_pair() 61 expected = base64.urlsafe_b64encode( 62 hashlib.sha256(verifier.encode("ascii")).digest() 63 ).rstrip(b"=").decode("ascii") 64 assert challenge == expected 65 assert 43 <= len(verifier) <= 128 66 67 68 class TestRefreshParts: 69 def test_parse_bare_token(self): 70 from agent.google_oauth import RefreshParts 71 72 p = RefreshParts.parse("abc-token") 73 assert p.refresh_token == "abc-token" 74 assert p.project_id == "" 75 assert p.managed_project_id == "" 76 77 def test_parse_packed(self): 78 from agent.google_oauth import RefreshParts 79 80 p = RefreshParts.parse("rt|proj-123|mgr-456") 81 assert p.refresh_token == "rt" 82 assert p.project_id == "proj-123" 83 assert p.managed_project_id == "mgr-456" 84 85 def test_format_bare_token(self): 86 from agent.google_oauth import RefreshParts 87 88 assert RefreshParts(refresh_token="rt").format() == "rt" 89 90 def test_format_with_project(self): 91 from agent.google_oauth import RefreshParts 92 93 packed = RefreshParts( 94 refresh_token="rt", project_id="p1", managed_project_id="m1", 95 ).format() 96 assert packed == "rt|p1|m1" 97 # Roundtrip 98 parsed = RefreshParts.parse(packed) 99 assert parsed.refresh_token == "rt" 100 assert parsed.project_id == "p1" 101 assert parsed.managed_project_id == "m1" 102 103 def test_format_empty_refresh_token_returns_empty(self): 104 from agent.google_oauth import RefreshParts 105 106 assert RefreshParts(refresh_token="").format() == "" 107 108 109 class TestClientCredResolution: 110 def test_env_override(self, monkeypatch): 111 from agent.google_oauth import _get_client_id 112 113 monkeypatch.setenv("HERMES_GEMINI_CLIENT_ID", "custom-id.apps.googleusercontent.com") 114 assert _get_client_id() == "custom-id.apps.googleusercontent.com" 115 116 def test_shipped_default_used_when_no_env(self): 117 """Out of the box, the public gemini-cli desktop client is used.""" 118 from agent.google_oauth import _get_client_id, _DEFAULT_CLIENT_ID 119 120 # Confirmed PUBLIC: baked into Google's open-source gemini-cli 121 assert _DEFAULT_CLIENT_ID.endswith(".apps.googleusercontent.com") 122 assert _DEFAULT_CLIENT_ID.startswith("681255809395-") 123 assert _get_client_id() == _DEFAULT_CLIENT_ID 124 125 def test_shipped_default_secret_present(self): 126 from agent.google_oauth import _DEFAULT_CLIENT_SECRET, _get_client_secret 127 128 assert _DEFAULT_CLIENT_SECRET.startswith("GOCSPX-") 129 assert len(_DEFAULT_CLIENT_SECRET) >= 20 130 assert _get_client_secret() == _DEFAULT_CLIENT_SECRET 131 132 def test_falls_back_to_scrape_when_defaults_wiped(self, tmp_path, monkeypatch): 133 """Forks that wipe the shipped defaults should still work with gemini-cli.""" 134 from agent import google_oauth 135 136 monkeypatch.setattr(google_oauth, "_DEFAULT_CLIENT_ID", "") 137 monkeypatch.setattr(google_oauth, "_DEFAULT_CLIENT_SECRET", "") 138 139 fake_bin = tmp_path / "bin" / "gemini" 140 fake_bin.parent.mkdir(parents=True) 141 fake_bin.write_text("#!/bin/sh\n") 142 oauth_dir = tmp_path / "node_modules" / "@google" / "gemini-cli-core" / "dist" / "src" / "code_assist" 143 oauth_dir.mkdir(parents=True) 144 (oauth_dir / "oauth2.js").write_text( 145 'const OAUTH_CLIENT_ID = "99999-fakescrapedxyz.apps.googleusercontent.com";\n' 146 'const OAUTH_CLIENT_SECRET = "GOCSPX-scraped-test-value-placeholder";\n' 147 ) 148 149 monkeypatch.setattr("shutil.which", lambda _: str(fake_bin)) 150 google_oauth._scraped_creds_cache.clear() 151 152 assert google_oauth._get_client_id().startswith("99999-") 153 154 def test_missing_everything_raises_with_install_hint(self, monkeypatch): 155 """When env + defaults + scrape all fail, raise with install instructions.""" 156 from agent import google_oauth 157 158 monkeypatch.setattr(google_oauth, "_DEFAULT_CLIENT_ID", "") 159 monkeypatch.setattr(google_oauth, "_DEFAULT_CLIENT_SECRET", "") 160 google_oauth._scraped_creds_cache.clear() 161 monkeypatch.setattr("shutil.which", lambda _: None) 162 163 with pytest.raises(google_oauth.GoogleOAuthError) as exc_info: 164 google_oauth._require_client_id() 165 assert exc_info.value.code == "google_oauth_client_id_missing" 166 167 def test_locate_gemini_cli_oauth_js_when_absent(self, monkeypatch): 168 from agent import google_oauth 169 170 monkeypatch.setattr("shutil.which", lambda _: None) 171 assert google_oauth._locate_gemini_cli_oauth_js() is None 172 173 def test_scrape_client_credentials_parses_id_and_secret(self, tmp_path, monkeypatch): 174 from agent import google_oauth 175 176 # Create a fake gemini binary and oauth2.js 177 fake_gemini_bin = tmp_path / "bin" / "gemini" 178 fake_gemini_bin.parent.mkdir(parents=True) 179 fake_gemini_bin.write_text("#!/bin/sh\necho gemini\n") 180 181 oauth_js_dir = tmp_path / "node_modules" / "@google" / "gemini-cli-core" / "dist" / "src" / "code_assist" 182 oauth_js_dir.mkdir(parents=True) 183 oauth_js = oauth_js_dir / "oauth2.js" 184 # Synthesize a harmless test fingerprint (valid shape, obvious test values) 185 oauth_js.write_text( 186 'const OAUTH_CLIENT_ID = "12345678-testfakenotrealxyz.apps.googleusercontent.com";\n' 187 'const OAUTH_CLIENT_SECRET = "GOCSPX-aaaaaaaaaaaaaaaaaaaaaaaa";\n' 188 ) 189 190 monkeypatch.setattr("shutil.which", lambda _: str(fake_gemini_bin)) 191 google_oauth._scraped_creds_cache.clear() 192 193 cid, cs = google_oauth._scrape_client_credentials() 194 assert cid == "12345678-testfakenotrealxyz.apps.googleusercontent.com" 195 assert cs.startswith("GOCSPX-") 196 197 198 class TestCredentialIo: 199 def _make(self): 200 from agent.google_oauth import GoogleCredentials 201 202 return GoogleCredentials( 203 access_token="at-1", 204 refresh_token="rt-1", 205 expires_ms=int((time.time() + 3600) * 1000), 206 email="user@example.com", 207 project_id="proj-abc", 208 ) 209 210 def test_save_and_load_packed_refresh(self): 211 from agent.google_oauth import load_credentials, save_credentials 212 213 creds = self._make() 214 save_credentials(creds) 215 loaded = load_credentials() 216 assert loaded is not None 217 assert loaded.refresh_token == "rt-1" 218 assert loaded.project_id == "proj-abc" 219 220 def test_save_uses_0600_permissions(self): 221 from agent.google_oauth import _credentials_path, save_credentials 222 223 save_credentials(self._make()) 224 mode = stat.S_IMODE(_credentials_path().stat().st_mode) 225 assert mode == 0o600 226 227 def test_disk_format_is_packed(self): 228 from agent.google_oauth import _credentials_path, save_credentials 229 230 save_credentials(self._make()) 231 data = json.loads(_credentials_path().read_text()) 232 # The refresh field on disk is the packed string, not a dict 233 assert data["refresh"] == "rt-1|proj-abc|" 234 235 def test_update_project_ids(self): 236 from agent.google_oauth import ( 237 load_credentials, save_credentials, update_project_ids, 238 ) 239 from agent.google_oauth import GoogleCredentials 240 241 save_credentials(GoogleCredentials( 242 access_token="at", refresh_token="rt", 243 expires_ms=int((time.time() + 3600) * 1000), 244 )) 245 update_project_ids(project_id="new-proj", managed_project_id="mgr-xyz") 246 247 loaded = load_credentials() 248 assert loaded.project_id == "new-proj" 249 assert loaded.managed_project_id == "mgr-xyz" 250 251 252 class TestAccessTokenExpired: 253 def test_fresh_token_not_expired(self): 254 from agent.google_oauth import GoogleCredentials 255 256 creds = GoogleCredentials( 257 access_token="at", refresh_token="rt", 258 expires_ms=int((time.time() + 3600) * 1000), 259 ) 260 assert creds.access_token_expired() is False 261 262 def test_near_expiry_considered_expired(self): 263 """60s skew — a token with 30s left is considered expired.""" 264 from agent.google_oauth import GoogleCredentials 265 266 creds = GoogleCredentials( 267 access_token="at", refresh_token="rt", 268 expires_ms=int((time.time() + 30) * 1000), 269 ) 270 assert creds.access_token_expired() is True 271 272 def test_no_token_is_expired(self): 273 from agent.google_oauth import GoogleCredentials 274 275 creds = GoogleCredentials( 276 access_token="", refresh_token="rt", expires_ms=999999999, 277 ) 278 assert creds.access_token_expired() is True 279 280 281 class TestGetValidAccessToken: 282 def _save(self, **over): 283 from agent.google_oauth import GoogleCredentials, save_credentials 284 285 defaults = { 286 "access_token": "at", 287 "refresh_token": "rt", 288 "expires_ms": int((time.time() + 3600) * 1000), 289 } 290 defaults.update(over) 291 save_credentials(GoogleCredentials(**defaults)) 292 293 def test_returns_cached_when_fresh(self): 294 from agent.google_oauth import get_valid_access_token 295 296 self._save(access_token="cached-token") 297 assert get_valid_access_token() == "cached-token" 298 299 def test_refreshes_when_near_expiry(self, monkeypatch): 300 from agent import google_oauth 301 302 self._save(expires_ms=int((time.time() + 30) * 1000)) 303 monkeypatch.setattr( 304 google_oauth, "_post_form", 305 lambda *a, **kw: {"access_token": "refreshed", "expires_in": 3600}, 306 ) 307 assert google_oauth.get_valid_access_token() == "refreshed" 308 309 def test_invalid_grant_clears_credentials(self, monkeypatch): 310 from agent import google_oauth 311 312 self._save(expires_ms=int((time.time() - 10) * 1000)) 313 314 def boom(*a, **kw): 315 raise google_oauth.GoogleOAuthError( 316 "invalid_grant", code="google_oauth_invalid_grant", 317 ) 318 319 monkeypatch.setattr(google_oauth, "_post_form", boom) 320 321 with pytest.raises(google_oauth.GoogleOAuthError) as exc_info: 322 google_oauth.get_valid_access_token() 323 assert exc_info.value.code == "google_oauth_invalid_grant" 324 # Credentials should be wiped 325 assert google_oauth.load_credentials() is None 326 327 def test_preserves_refresh_when_google_omits(self, monkeypatch): 328 from agent import google_oauth 329 330 self._save(expires_ms=int((time.time() + 30) * 1000), refresh_token="original-rt") 331 monkeypatch.setattr( 332 google_oauth, "_post_form", 333 lambda *a, **kw: {"access_token": "new", "expires_in": 3600}, 334 ) 335 google_oauth.get_valid_access_token() 336 assert google_oauth.load_credentials().refresh_token == "original-rt" 337 338 339 class TestProjectIdResolution: 340 @pytest.mark.parametrize("env_var", [ 341 "HERMES_GEMINI_PROJECT_ID", 342 "GOOGLE_CLOUD_PROJECT", 343 "GOOGLE_CLOUD_PROJECT_ID", 344 ]) 345 def test_env_vars_checked(self, monkeypatch, env_var): 346 from agent.google_oauth import resolve_project_id_from_env 347 348 monkeypatch.setenv(env_var, "test-proj") 349 assert resolve_project_id_from_env() == "test-proj" 350 351 def test_priority_order(self, monkeypatch): 352 from agent.google_oauth import resolve_project_id_from_env 353 354 monkeypatch.setenv("GOOGLE_CLOUD_PROJECT", "lower-priority") 355 monkeypatch.setenv("HERMES_GEMINI_PROJECT_ID", "higher-priority") 356 assert resolve_project_id_from_env() == "higher-priority" 357 358 def test_no_env_returns_empty(self): 359 from agent.google_oauth import resolve_project_id_from_env 360 361 assert resolve_project_id_from_env() == "" 362 363 364 class TestHeadlessDetection: 365 def test_detects_ssh(self, monkeypatch): 366 from agent.google_oauth import _is_headless 367 368 monkeypatch.setenv("SSH_CONNECTION", "1.2.3.4 22 5.6.7.8 9876") 369 assert _is_headless() is True 370 371 def test_detects_hermes_headless(self, monkeypatch): 372 from agent.google_oauth import _is_headless 373 374 monkeypatch.setenv("HERMES_HEADLESS", "1") 375 assert _is_headless() is True 376 377 def test_default_not_headless(self): 378 from agent.google_oauth import _is_headless 379 380 assert _is_headless() is False 381 382 383 # ============================================================================= 384 # google_code_assist.py — project discovery, onboarding, quota, VPC-SC 385 # ============================================================================= 386 387 class TestCodeAssistVpcScDetection: 388 def test_detects_vpc_sc_in_json(self): 389 from agent.google_code_assist import _is_vpc_sc_violation 390 391 body = json.dumps({ 392 "error": { 393 "details": [{"reason": "SECURITY_POLICY_VIOLATED"}], 394 "message": "blocked by policy", 395 } 396 }) 397 assert _is_vpc_sc_violation(body) is True 398 399 def test_detects_vpc_sc_in_message(self): 400 from agent.google_code_assist import _is_vpc_sc_violation 401 402 body = '{"error": {"message": "SECURITY_POLICY_VIOLATED"}}' 403 assert _is_vpc_sc_violation(body) is True 404 405 def test_non_vpc_sc_returns_false(self): 406 from agent.google_code_assist import _is_vpc_sc_violation 407 408 assert _is_vpc_sc_violation('{"error": {"message": "not found"}}') is False 409 assert _is_vpc_sc_violation("") is False 410 411 412 class TestLoadCodeAssist: 413 def test_parses_response(self, monkeypatch): 414 from agent import google_code_assist 415 416 fake = { 417 "currentTier": {"id": "free-tier"}, 418 "cloudaicompanionProject": "proj-123", 419 "allowedTiers": [{"id": "free-tier"}, {"id": "standard-tier"}], 420 } 421 monkeypatch.setattr(google_code_assist, "_post_json", lambda *a, **kw: fake) 422 423 info = google_code_assist.load_code_assist("access-token") 424 assert info.current_tier_id == "free-tier" 425 assert info.cloudaicompanion_project == "proj-123" 426 assert "free-tier" in info.allowed_tiers 427 assert "standard-tier" in info.allowed_tiers 428 429 def test_vpc_sc_forces_standard_tier(self, monkeypatch): 430 from agent import google_code_assist 431 432 def boom(*a, **kw): 433 raise google_code_assist.CodeAssistError( 434 "VPC-SC policy violation", code="code_assist_vpc_sc", 435 ) 436 437 monkeypatch.setattr(google_code_assist, "_post_json", boom) 438 439 info = google_code_assist.load_code_assist("access-token", project_id="corp-proj") 440 assert info.current_tier_id == "standard-tier" 441 assert info.cloudaicompanion_project == "corp-proj" 442 443 444 class TestOnboardUser: 445 def test_paid_tier_requires_project_id(self): 446 from agent import google_code_assist 447 448 with pytest.raises(google_code_assist.ProjectIdRequiredError): 449 google_code_assist.onboard_user( 450 "at", tier_id="standard-tier", project_id="", 451 ) 452 453 def test_free_tier_no_project_required(self, monkeypatch): 454 from agent import google_code_assist 455 456 monkeypatch.setattr( 457 google_code_assist, "_post_json", 458 lambda *a, **kw: {"done": True, "response": {"cloudaicompanionProject": "gen-123"}}, 459 ) 460 resp = google_code_assist.onboard_user("at", tier_id="free-tier") 461 assert resp["done"] is True 462 463 def test_lro_polling(self, monkeypatch): 464 """Simulate a long-running operation that completes on the second poll.""" 465 from agent import google_code_assist 466 467 call_count = {"n": 0} 468 469 def fake_post(url, body, token, **kw): 470 call_count["n"] += 1 471 if call_count["n"] == 1: 472 return {"name": "operations/op-abc", "done": False} 473 return {"name": "operations/op-abc", "done": True, "response": {}} 474 475 monkeypatch.setattr(google_code_assist, "_post_json", fake_post) 476 monkeypatch.setattr(google_code_assist.time, "sleep", lambda *_: None) 477 478 resp = google_code_assist.onboard_user( 479 "at", tier_id="free-tier", 480 ) 481 assert resp["done"] is True 482 assert call_count["n"] >= 2 483 484 485 class TestRetrieveUserQuota: 486 def test_parses_buckets(self, monkeypatch): 487 from agent import google_code_assist 488 489 fake = { 490 "buckets": [ 491 { 492 "modelId": "gemini-2.5-pro", 493 "tokenType": "input", 494 "remainingFraction": 0.75, 495 "resetTime": "2026-04-17T00:00:00Z", 496 }, 497 { 498 "modelId": "gemini-2.5-flash", 499 "remainingFraction": 0.9, 500 }, 501 ] 502 } 503 monkeypatch.setattr(google_code_assist, "_post_json", lambda *a, **kw: fake) 504 505 buckets = google_code_assist.retrieve_user_quota("at", project_id="p1") 506 assert len(buckets) == 2 507 assert buckets[0].model_id == "gemini-2.5-pro" 508 assert buckets[0].remaining_fraction == 0.75 509 assert buckets[1].remaining_fraction == 0.9 510 511 512 class TestResolveProjectContext: 513 def test_configured_shortcircuits(self, monkeypatch): 514 from agent.google_code_assist import resolve_project_context 515 516 # Should NOT call loadCodeAssist when configured_project_id is set 517 def should_not_be_called(*a, **kw): 518 raise AssertionError("should short-circuit") 519 520 monkeypatch.setattr( 521 "agent.google_code_assist._post_json", should_not_be_called, 522 ) 523 ctx = resolve_project_context("at", configured_project_id="proj-abc") 524 assert ctx.project_id == "proj-abc" 525 assert ctx.source == "config" 526 527 def test_env_shortcircuits(self, monkeypatch): 528 from agent.google_code_assist import resolve_project_context 529 530 monkeypatch.setattr( 531 "agent.google_code_assist._post_json", 532 lambda *a, **kw: (_ for _ in ()).throw(AssertionError("nope")), 533 ) 534 ctx = resolve_project_context("at", env_project_id="env-proj") 535 assert ctx.project_id == "env-proj" 536 assert ctx.source == "env" 537 538 def test_discovers_via_load_code_assist(self, monkeypatch): 539 from agent import google_code_assist 540 541 monkeypatch.setattr( 542 google_code_assist, "_post_json", 543 lambda *a, **kw: { 544 "currentTier": {"id": "free-tier"}, 545 "cloudaicompanionProject": "discovered-proj", 546 }, 547 ) 548 ctx = google_code_assist.resolve_project_context("at") 549 assert ctx.project_id == "discovered-proj" 550 assert ctx.tier_id == "free-tier" 551 assert ctx.source == "discovered" 552 553 554 # ============================================================================= 555 # gemini_cloudcode_adapter.py — request/response translation 556 # ============================================================================= 557 558 class TestBuildGeminiRequest: 559 def test_user_assistant_messages(self): 560 from agent.gemini_cloudcode_adapter import build_gemini_request 561 562 req = build_gemini_request(messages=[ 563 {"role": "user", "content": "hi"}, 564 {"role": "assistant", "content": "hello"}, 565 ]) 566 assert req["contents"][0] == { 567 "role": "user", "parts": [{"text": "hi"}], 568 } 569 assert req["contents"][1] == { 570 "role": "model", "parts": [{"text": "hello"}], 571 } 572 573 def test_system_instruction_separated(self): 574 from agent.gemini_cloudcode_adapter import build_gemini_request 575 576 req = build_gemini_request(messages=[ 577 {"role": "system", "content": "You are helpful"}, 578 {"role": "user", "content": "hi"}, 579 ]) 580 assert req["systemInstruction"]["parts"][0]["text"] == "You are helpful" 581 # System should NOT appear in contents 582 assert all(c["role"] != "system" for c in req["contents"]) 583 584 def test_multiple_system_messages_joined(self): 585 from agent.gemini_cloudcode_adapter import build_gemini_request 586 587 req = build_gemini_request(messages=[ 588 {"role": "system", "content": "A"}, 589 {"role": "system", "content": "B"}, 590 {"role": "user", "content": "hi"}, 591 ]) 592 assert "A\nB" in req["systemInstruction"]["parts"][0]["text"] 593 594 def test_tool_call_translation(self): 595 from agent.gemini_cloudcode_adapter import build_gemini_request 596 597 req = build_gemini_request(messages=[ 598 {"role": "user", "content": "what's the weather?"}, 599 { 600 "role": "assistant", 601 "content": None, 602 "tool_calls": [{ 603 "id": "call_1", 604 "type": "function", 605 "function": {"name": "get_weather", "arguments": '{"city": "SF"}'}, 606 }], 607 }, 608 ]) 609 # Assistant turn should have a functionCall part 610 model_turn = req["contents"][1] 611 assert model_turn["role"] == "model" 612 fc_part = next(p for p in model_turn["parts"] if "functionCall" in p) 613 assert fc_part["functionCall"]["name"] == "get_weather" 614 assert fc_part["functionCall"]["args"] == {"city": "SF"} 615 616 def test_tool_result_translation(self): 617 from agent.gemini_cloudcode_adapter import build_gemini_request 618 619 req = build_gemini_request(messages=[ 620 {"role": "user", "content": "q"}, 621 {"role": "assistant", "tool_calls": [{ 622 "id": "c1", "type": "function", 623 "function": {"name": "get_weather", "arguments": "{}"}, 624 }]}, 625 { 626 "role": "tool", 627 "name": "get_weather", 628 "tool_call_id": "c1", 629 "content": '{"temp": 72}', 630 }, 631 ]) 632 # Last content turn should carry functionResponse 633 last = req["contents"][-1] 634 fr_part = next(p for p in last["parts"] if "functionResponse" in p) 635 assert fr_part["functionResponse"]["name"] == "get_weather" 636 assert fr_part["functionResponse"]["response"] == {"temp": 72} 637 638 def test_tools_translated_to_function_declarations(self): 639 from agent.gemini_cloudcode_adapter import build_gemini_request 640 641 req = build_gemini_request( 642 messages=[{"role": "user", "content": "hi"}], 643 tools=[ 644 {"type": "function", "function": { 645 "name": "fn1", "description": "foo", 646 "parameters": {"type": "object"}, 647 }}, 648 ], 649 ) 650 decls = req["tools"][0]["functionDeclarations"] 651 assert decls[0]["name"] == "fn1" 652 assert decls[0]["description"] == "foo" 653 assert decls[0]["parameters"] == {"type": "object"} 654 655 def test_tools_strip_json_schema_only_fields_from_parameters(self): 656 from agent.gemini_cloudcode_adapter import build_gemini_request 657 658 req = build_gemini_request( 659 messages=[{"role": "user", "content": "hi"}], 660 tools=[ 661 {"type": "function", "function": { 662 "name": "fn1", 663 "description": "foo", 664 "parameters": { 665 "$schema": "https://json-schema.org/draft/2020-12/schema", 666 "type": "object", 667 "additionalProperties": False, 668 "properties": { 669 "city": { 670 "type": "string", 671 "$schema": "ignored", 672 "description": "City name", 673 "additionalProperties": False, 674 } 675 }, 676 "required": ["city"], 677 }, 678 }}, 679 ], 680 ) 681 params = req["tools"][0]["functionDeclarations"][0]["parameters"] 682 assert "$schema" not in params 683 assert "additionalProperties" not in params 684 assert params["type"] == "object" 685 assert params["required"] == ["city"] 686 assert params["properties"]["city"] == { 687 "type": "string", 688 "description": "City name", 689 } 690 691 def test_tool_choice_auto(self): 692 from agent.gemini_cloudcode_adapter import build_gemini_request 693 694 req = build_gemini_request( 695 messages=[{"role": "user", "content": "hi"}], 696 tool_choice="auto", 697 ) 698 assert req["toolConfig"]["functionCallingConfig"]["mode"] == "AUTO" 699 700 def test_tool_choice_required(self): 701 from agent.gemini_cloudcode_adapter import build_gemini_request 702 703 req = build_gemini_request( 704 messages=[{"role": "user", "content": "hi"}], 705 tool_choice="required", 706 ) 707 assert req["toolConfig"]["functionCallingConfig"]["mode"] == "ANY" 708 709 def test_tool_choice_specific_function(self): 710 from agent.gemini_cloudcode_adapter import build_gemini_request 711 712 req = build_gemini_request( 713 messages=[{"role": "user", "content": "hi"}], 714 tool_choice={"type": "function", "function": {"name": "my_fn"}}, 715 ) 716 cfg = req["toolConfig"]["functionCallingConfig"] 717 assert cfg["mode"] == "ANY" 718 assert cfg["allowedFunctionNames"] == ["my_fn"] 719 720 def test_generation_config_params(self): 721 from agent.gemini_cloudcode_adapter import build_gemini_request 722 723 req = build_gemini_request( 724 messages=[{"role": "user", "content": "hi"}], 725 temperature=0.7, 726 max_tokens=512, 727 top_p=0.9, 728 stop=["###", "END"], 729 ) 730 gc = req["generationConfig"] 731 assert gc["temperature"] == 0.7 732 assert gc["maxOutputTokens"] == 512 733 assert gc["topP"] == 0.9 734 assert gc["stopSequences"] == ["###", "END"] 735 736 def test_thinking_config_normalization(self): 737 from agent.gemini_cloudcode_adapter import build_gemini_request 738 739 req = build_gemini_request( 740 messages=[{"role": "user", "content": "hi"}], 741 thinking_config={"thinking_budget": 1024, "include_thoughts": True}, 742 ) 743 tc = req["generationConfig"]["thinkingConfig"] 744 assert tc["thinkingBudget"] == 1024 745 assert tc["includeThoughts"] is True 746 747 748 class TestWrapCodeAssistRequest: 749 def test_envelope_shape(self): 750 from agent.gemini_cloudcode_adapter import wrap_code_assist_request 751 752 inner = {"contents": [], "generationConfig": {}} 753 wrapped = wrap_code_assist_request( 754 project_id="p1", model="gemini-2.5-pro", inner_request=inner, 755 ) 756 assert wrapped["project"] == "p1" 757 assert wrapped["model"] == "gemini-2.5-pro" 758 assert wrapped["request"] is inner 759 assert "user_prompt_id" in wrapped 760 assert len(wrapped["user_prompt_id"]) > 10 761 762 763 class TestTranslateGeminiResponse: 764 def test_text_response(self): 765 from agent.gemini_cloudcode_adapter import _translate_gemini_response 766 767 resp = { 768 "response": { 769 "candidates": [{ 770 "content": {"parts": [{"text": "hello world"}]}, 771 "finishReason": "STOP", 772 }], 773 "usageMetadata": { 774 "promptTokenCount": 10, 775 "candidatesTokenCount": 5, 776 "totalTokenCount": 15, 777 }, 778 } 779 } 780 result = _translate_gemini_response(resp, model="gemini-2.5-flash") 781 assert result.choices[0].message.content == "hello world" 782 assert result.choices[0].message.tool_calls is None 783 assert result.choices[0].finish_reason == "stop" 784 assert result.usage.prompt_tokens == 10 785 assert result.usage.completion_tokens == 5 786 assert result.usage.total_tokens == 15 787 788 def test_function_call_response(self): 789 from agent.gemini_cloudcode_adapter import _translate_gemini_response 790 791 resp = { 792 "response": { 793 "candidates": [{ 794 "content": {"parts": [{ 795 "functionCall": {"name": "lookup", "args": {"q": "weather"}}, 796 }]}, 797 "finishReason": "STOP", 798 }], 799 } 800 } 801 result = _translate_gemini_response(resp, model="gemini-2.5-flash") 802 tc = result.choices[0].message.tool_calls[0] 803 assert tc.function.name == "lookup" 804 assert json.loads(tc.function.arguments) == {"q": "weather"} 805 assert result.choices[0].finish_reason == "tool_calls" 806 807 def test_thought_parts_go_to_reasoning(self): 808 from agent.gemini_cloudcode_adapter import _translate_gemini_response 809 810 resp = { 811 "response": { 812 "candidates": [{ 813 "content": {"parts": [ 814 {"thought": True, "text": "let me think"}, 815 {"text": "final answer"}, 816 ]}, 817 }], 818 } 819 } 820 result = _translate_gemini_response(resp, model="gemini-2.5-flash") 821 assert result.choices[0].message.content == "final answer" 822 assert result.choices[0].message.reasoning == "let me think" 823 824 def test_unwraps_direct_format(self): 825 """If response is already at top level (no 'response' wrapper), still parse.""" 826 from agent.gemini_cloudcode_adapter import _translate_gemini_response 827 828 resp = { 829 "candidates": [{ 830 "content": {"parts": [{"text": "hi"}]}, 831 "finishReason": "STOP", 832 }], 833 } 834 result = _translate_gemini_response(resp, model="gemini-2.5-flash") 835 assert result.choices[0].message.content == "hi" 836 837 def test_empty_candidates(self): 838 from agent.gemini_cloudcode_adapter import _translate_gemini_response 839 840 result = _translate_gemini_response({"response": {"candidates": []}}, model="gemini-2.5-flash") 841 assert result.choices[0].message.content == "" 842 assert result.choices[0].finish_reason == "stop" 843 844 def test_finish_reason_mapping(self): 845 from agent.gemini_cloudcode_adapter import _map_gemini_finish_reason 846 847 assert _map_gemini_finish_reason("STOP") == "stop" 848 assert _map_gemini_finish_reason("MAX_TOKENS") == "length" 849 assert _map_gemini_finish_reason("SAFETY") == "content_filter" 850 assert _map_gemini_finish_reason("RECITATION") == "content_filter" 851 852 853 class TestTranslateStreamEvent: 854 def test_parallel_calls_to_same_tool_get_unique_indices(self): 855 """Gemini may emit several functionCall parts with the same name in a 856 single turn (e.g. parallel file reads). Each must get its own OpenAI 857 ``index`` — otherwise downstream aggregators collapse them into one. 858 """ 859 from agent.gemini_cloudcode_adapter import _translate_stream_event 860 861 event = { 862 "response": { 863 "candidates": [{ 864 "content": {"parts": [ 865 {"functionCall": {"name": "read_file", "args": {"path": "a"}}}, 866 {"functionCall": {"name": "read_file", "args": {"path": "b"}}}, 867 {"functionCall": {"name": "read_file", "args": {"path": "c"}}}, 868 ]}, 869 }], 870 } 871 } 872 counter = [0] 873 chunks = _translate_stream_event(event, model="gemini-2.5-flash", 874 tool_call_counter=counter) 875 indices = [c.choices[0].delta.tool_calls[0].index for c in chunks] 876 assert indices == [0, 1, 2] 877 assert counter[0] == 3 878 879 def test_counter_persists_across_events(self): 880 """Index assignment must continue across SSE events in the same stream.""" 881 from agent.gemini_cloudcode_adapter import _translate_stream_event 882 883 def _event(name): 884 return {"response": {"candidates": [{ 885 "content": {"parts": [{"functionCall": {"name": name, "args": {}}}]}, 886 }]}} 887 888 counter = [0] 889 chunks_a = _translate_stream_event(_event("foo"), model="m", tool_call_counter=counter) 890 chunks_b = _translate_stream_event(_event("bar"), model="m", tool_call_counter=counter) 891 chunks_c = _translate_stream_event(_event("foo"), model="m", tool_call_counter=counter) 892 893 assert chunks_a[0].choices[0].delta.tool_calls[0].index == 0 894 assert chunks_b[0].choices[0].delta.tool_calls[0].index == 1 895 assert chunks_c[0].choices[0].delta.tool_calls[0].index == 2 896 897 def test_finish_reason_switches_to_tool_calls_when_any_seen(self): 898 from agent.gemini_cloudcode_adapter import _translate_stream_event 899 900 counter = [0] 901 # First event emits one tool call. 902 _translate_stream_event( 903 {"response": {"candidates": [{ 904 "content": {"parts": [{"functionCall": {"name": "x", "args": {}}}]}, 905 }]}}, 906 model="m", tool_call_counter=counter, 907 ) 908 # Second event carries only the terminal finishReason. 909 chunks = _translate_stream_event( 910 {"response": {"candidates": [{"finishReason": "STOP"}]}}, 911 model="m", tool_call_counter=counter, 912 ) 913 assert chunks[-1].choices[0].finish_reason == "tool_calls" 914 915 916 class TestGeminiCloudCodeClient: 917 def test_client_exposes_openai_interface(self): 918 from agent.gemini_cloudcode_adapter import GeminiCloudCodeClient 919 920 client = GeminiCloudCodeClient(api_key="dummy") 921 try: 922 assert hasattr(client, "chat") 923 assert hasattr(client.chat, "completions") 924 assert callable(client.chat.completions.create) 925 finally: 926 client.close() 927 928 929 class TestGeminiHttpErrorParsing: 930 """Regression coverage for _gemini_http_error Google-envelope parsing. 931 932 These are the paths that users actually hit during Google-side throttling 933 (April 2026: gemini-2.5-pro MODEL_CAPACITY_EXHAUSTED, gemma-4-26b-it 934 returning 404). The error needs to carry status_code + response so the 935 main loop's error_classifier and Retry-After logic work. 936 """ 937 938 @staticmethod 939 def _fake_response(status: int, body: dict | str = "", headers=None): 940 """Minimal httpx.Response stand-in (duck-typed for _gemini_http_error).""" 941 class _FakeResponse: 942 def __init__(self): 943 self.status_code = status 944 if isinstance(body, dict): 945 self.text = json.dumps(body) 946 else: 947 self.text = body 948 self.headers = headers or {} 949 return _FakeResponse() 950 951 def test_model_capacity_exhausted_produces_friendly_message(self): 952 from agent.gemini_cloudcode_adapter import _gemini_http_error 953 954 body = { 955 "error": { 956 "code": 429, 957 "message": "Resource has been exhausted (e.g. check quota).", 958 "status": "RESOURCE_EXHAUSTED", 959 "details": [ 960 { 961 "@type": "type.googleapis.com/google.rpc.ErrorInfo", 962 "reason": "MODEL_CAPACITY_EXHAUSTED", 963 "domain": "googleapis.com", 964 "metadata": {"model": "gemini-2.5-pro"}, 965 }, 966 { 967 "@type": "type.googleapis.com/google.rpc.RetryInfo", 968 "retryDelay": "30s", 969 }, 970 ], 971 } 972 } 973 err = _gemini_http_error(self._fake_response(429, body)) 974 assert err.status_code == 429 975 assert err.code == "code_assist_capacity_exhausted" 976 assert err.retry_after == 30.0 977 assert err.details["reason"] == "MODEL_CAPACITY_EXHAUSTED" 978 # Message must be user-friendly, not a raw JSON dump. 979 message = str(err) 980 assert "gemini-2.5-pro" in message 981 assert "capacity exhausted" in message.lower() 982 assert "30s" in message 983 # response attr is preserved for run_agent's Retry-After header path. 984 assert err.response is not None 985 986 def test_resource_exhausted_without_reason(self): 987 from agent.gemini_cloudcode_adapter import _gemini_http_error 988 989 body = { 990 "error": { 991 "code": 429, 992 "message": "Quota exceeded for requests per minute.", 993 "status": "RESOURCE_EXHAUSTED", 994 } 995 } 996 err = _gemini_http_error(self._fake_response(429, body)) 997 assert err.status_code == 429 998 assert err.code == "code_assist_rate_limited" 999 message = str(err) 1000 assert "quota" in message.lower() 1001 1002 def test_404_model_not_found_produces_model_retired_message(self): 1003 from agent.gemini_cloudcode_adapter import _gemini_http_error 1004 1005 body = { 1006 "error": { 1007 "code": 404, 1008 "message": "models/gemma-4-26b-it is not found for API version v1internal", 1009 "status": "NOT_FOUND", 1010 } 1011 } 1012 err = _gemini_http_error(self._fake_response(404, body)) 1013 assert err.status_code == 404 1014 message = str(err) 1015 assert "not available" in message.lower() or "retired" in message.lower() 1016 # Error message should reference the actual model text from Google. 1017 assert "gemma-4-26b-it" in message 1018 1019 def test_unauthorized_preserves_status_code(self): 1020 from agent.gemini_cloudcode_adapter import _gemini_http_error 1021 1022 err = _gemini_http_error(self._fake_response( 1023 401, {"error": {"code": 401, "message": "Invalid token", "status": "UNAUTHENTICATED"}}, 1024 )) 1025 assert err.status_code == 401 1026 assert err.code == "code_assist_unauthorized" 1027 1028 def test_retry_after_header_fallback(self): 1029 """If the body has no RetryInfo detail, fall back to Retry-After header.""" 1030 from agent.gemini_cloudcode_adapter import _gemini_http_error 1031 1032 resp = self._fake_response( 1033 429, 1034 {"error": {"code": 429, "message": "Rate limited", "status": "RESOURCE_EXHAUSTED"}}, 1035 headers={"Retry-After": "45"}, 1036 ) 1037 err = _gemini_http_error(resp) 1038 assert err.retry_after == 45.0 1039 1040 def test_malformed_body_still_produces_structured_error(self): 1041 """Non-JSON body must not swallow status_code — we still want the classifier path.""" 1042 from agent.gemini_cloudcode_adapter import _gemini_http_error 1043 1044 err = _gemini_http_error(self._fake_response(500, "<html>internal error</html>")) 1045 assert err.status_code == 500 1046 # Raw body snippet must still be there for debugging. 1047 assert "500" in str(err) 1048 1049 def test_status_code_flows_through_error_classifier(self): 1050 """End-to-end: CodeAssistError from a 429 must classify as rate_limit. 1051 1052 This is the whole point of adding status_code to CodeAssistError — 1053 _extract_status_code must see it and FailoverReason.rate_limit must 1054 fire, so the main loop triggers fallback_providers. 1055 """ 1056 from agent.gemini_cloudcode_adapter import _gemini_http_error 1057 from agent.error_classifier import classify_api_error, FailoverReason 1058 1059 body = { 1060 "error": { 1061 "code": 429, 1062 "message": "Resource has been exhausted", 1063 "status": "RESOURCE_EXHAUSTED", 1064 "details": [ 1065 { 1066 "@type": "type.googleapis.com/google.rpc.ErrorInfo", 1067 "reason": "MODEL_CAPACITY_EXHAUSTED", 1068 "metadata": {"model": "gemini-2.5-pro"}, 1069 } 1070 ], 1071 } 1072 } 1073 err = _gemini_http_error(self._fake_response(429, body)) 1074 1075 classified = classify_api_error( 1076 err, provider="google-gemini-cli", model="gemini-2.5-pro", 1077 ) 1078 assert classified.status_code == 429 1079 assert classified.reason == FailoverReason.rate_limit 1080 1081 1082 # ============================================================================= 1083 # Provider registration 1084 # ============================================================================= 1085 1086 class TestProviderRegistration: 1087 def test_registry_entry(self): 1088 from hermes_cli.auth import PROVIDER_REGISTRY 1089 1090 assert "google-gemini-cli" in PROVIDER_REGISTRY 1091 assert PROVIDER_REGISTRY["google-gemini-cli"].auth_type == "oauth_external" 1092 1093 def test_google_gemini_alias_still_goes_to_api_key_gemini(self): 1094 """Regression guard: don't shadow the existing google-gemini → gemini alias.""" 1095 from hermes_cli.auth import resolve_provider 1096 1097 assert resolve_provider("google-gemini") == "gemini" 1098 1099 def test_runtime_provider_raises_when_not_logged_in(self): 1100 from hermes_cli.auth import AuthError 1101 from hermes_cli.runtime_provider import resolve_runtime_provider 1102 1103 with pytest.raises(AuthError) as exc_info: 1104 resolve_runtime_provider(requested="google-gemini-cli") 1105 assert exc_info.value.code == "google_oauth_not_logged_in" 1106 1107 def test_runtime_provider_returns_correct_shape_when_logged_in(self): 1108 from agent.google_oauth import GoogleCredentials, save_credentials 1109 from hermes_cli.runtime_provider import resolve_runtime_provider 1110 1111 save_credentials(GoogleCredentials( 1112 access_token="live-tok", 1113 refresh_token="rt", 1114 expires_ms=int((time.time() + 3600) * 1000), 1115 project_id="my-proj", 1116 email="t@e.com", 1117 )) 1118 1119 result = resolve_runtime_provider(requested="google-gemini-cli") 1120 assert result["provider"] == "google-gemini-cli" 1121 assert result["api_mode"] == "chat_completions" 1122 assert result["api_key"] == "live-tok" 1123 assert result["base_url"] == "cloudcode-pa://google" 1124 assert result["project_id"] == "my-proj" 1125 assert result["email"] == "t@e.com" 1126 1127 def test_determine_api_mode(self): 1128 from hermes_cli.providers import determine_api_mode 1129 1130 assert determine_api_mode("google-gemini-cli", "cloudcode-pa://google") == "chat_completions" 1131 1132 def test_oauth_capable_set_preserves_existing(self): 1133 from hermes_cli.auth_commands import _OAUTH_CAPABLE_PROVIDERS 1134 1135 for required in ("anthropic", "nous", "openai-codex", "qwen-oauth", "google-gemini-cli"): 1136 assert required in _OAUTH_CAPABLE_PROVIDERS 1137 1138 def test_config_env_vars_registered(self): 1139 from hermes_cli.config import OPTIONAL_ENV_VARS 1140 1141 for key in ( 1142 "HERMES_GEMINI_CLIENT_ID", 1143 "HERMES_GEMINI_CLIENT_SECRET", 1144 "HERMES_GEMINI_PROJECT_ID", 1145 ): 1146 assert key in OPTIONAL_ENV_VARS 1147 1148 1149 class TestAuthStatus: 1150 def test_not_logged_in(self): 1151 from hermes_cli.auth import get_auth_status 1152 1153 s = get_auth_status("google-gemini-cli") 1154 assert s["logged_in"] is False 1155 1156 def test_logged_in_reports_email_and_project(self): 1157 from agent.google_oauth import GoogleCredentials, save_credentials 1158 from hermes_cli.auth import get_auth_status 1159 1160 save_credentials(GoogleCredentials( 1161 access_token="tok", refresh_token="rt", 1162 expires_ms=int((time.time() + 3600) * 1000), 1163 email="tek@nous.ai", 1164 project_id="tek-proj", 1165 )) 1166 1167 s = get_auth_status("google-gemini-cli") 1168 assert s["logged_in"] is True 1169 assert s["email"] == "tek@nous.ai" 1170 assert s["project_id"] == "tek-proj" 1171 1172 1173 class TestGquotaCommand: 1174 def test_gquota_registered(self): 1175 from hermes_cli.commands import COMMANDS 1176 1177 assert "/gquota" in COMMANDS 1178 1179 1180 class TestRunGeminiOauthLoginPure: 1181 def test_returns_pool_compatible_dict(self, monkeypatch): 1182 from agent import google_oauth 1183 1184 def fake_start(**kw): 1185 return google_oauth.GoogleCredentials( 1186 access_token="at", refresh_token="rt", 1187 expires_ms=int((time.time() + 3600) * 1000), 1188 email="u@e.com", project_id="p", 1189 ) 1190 1191 monkeypatch.setattr(google_oauth, "start_oauth_flow", fake_start) 1192 1193 result = google_oauth.run_gemini_oauth_login_pure() 1194 assert result["access_token"] == "at" 1195 assert result["refresh_token"] == "rt" 1196 assert result["email"] == "u@e.com" 1197 assert result["project_id"] == "p" 1198 assert isinstance(result["expires_at_ms"], int)