/ tests / agent / test_gemini_cloudcode.py
test_gemini_cloudcode.py
   1  """Tests for the google-gemini-cli OAuth + Code Assist inference provider.
   2  
   3  Covers:
   4  - agent/google_oauth.py — PKCE, credential I/O with packed refresh format,
   5    token refresh dedup, invalid_grant handling, headless paste fallback
   6  - agent/google_code_assist.py — project discovery, VPC-SC fallback, onboarding
   7    with LRO polling, quota retrieval
   8  - agent/gemini_cloudcode_adapter.py — OpenAI↔Gemini translation, request
   9    envelope wrapping, response unwrapping, tool calls bidirectional, streaming
  10  - Provider registration — registry entry, aliases, runtime dispatch, auth
  11    status, _OAUTH_CAPABLE_PROVIDERS regression guard
  12  """
  13  from __future__ import annotations
  14  
  15  import base64
  16  import hashlib
  17  import json
  18  import stat
  19  import time
  20  from pathlib import Path
  21  from types import SimpleNamespace
  22  from unittest.mock import MagicMock, patch
  23  
  24  import pytest
  25  
  26  
  27  # =============================================================================
  28  # Fixtures
  29  # =============================================================================
  30  
  31  @pytest.fixture(autouse=True)
  32  def _isolate_env(monkeypatch, tmp_path):
  33      home = tmp_path / ".hermes"
  34      home.mkdir(parents=True)
  35      monkeypatch.setattr(Path, "home", lambda: tmp_path)
  36      monkeypatch.setenv("HERMES_HOME", str(home))
  37      for key in (
  38          "HERMES_GEMINI_CLIENT_ID",
  39          "HERMES_GEMINI_CLIENT_SECRET",
  40          "HERMES_GEMINI_PROJECT_ID",
  41          "GOOGLE_CLOUD_PROJECT",
  42          "GOOGLE_CLOUD_PROJECT_ID",
  43          "SSH_CONNECTION",
  44          "SSH_CLIENT",
  45          "SSH_TTY",
  46          "HERMES_HEADLESS",
  47      ):
  48          monkeypatch.delenv(key, raising=False)
  49      return home
  50  
  51  
  52  # =============================================================================
  53  # google_oauth.py — PKCE + packed refresh format
  54  # =============================================================================
  55  
  56  class TestPkce:
  57      def test_verifier_and_challenge_s256_roundtrip(self):
  58          from agent.google_oauth import _generate_pkce_pair
  59  
  60          verifier, challenge = _generate_pkce_pair()
  61          expected = base64.urlsafe_b64encode(
  62              hashlib.sha256(verifier.encode("ascii")).digest()
  63          ).rstrip(b"=").decode("ascii")
  64          assert challenge == expected
  65          assert 43 <= len(verifier) <= 128
  66  
  67  
  68  class TestRefreshParts:
  69      def test_parse_bare_token(self):
  70          from agent.google_oauth import RefreshParts
  71  
  72          p = RefreshParts.parse("abc-token")
  73          assert p.refresh_token == "abc-token"
  74          assert p.project_id == ""
  75          assert p.managed_project_id == ""
  76  
  77      def test_parse_packed(self):
  78          from agent.google_oauth import RefreshParts
  79  
  80          p = RefreshParts.parse("rt|proj-123|mgr-456")
  81          assert p.refresh_token == "rt"
  82          assert p.project_id == "proj-123"
  83          assert p.managed_project_id == "mgr-456"
  84  
  85      def test_format_bare_token(self):
  86          from agent.google_oauth import RefreshParts
  87  
  88          assert RefreshParts(refresh_token="rt").format() == "rt"
  89  
  90      def test_format_with_project(self):
  91          from agent.google_oauth import RefreshParts
  92  
  93          packed = RefreshParts(
  94              refresh_token="rt", project_id="p1", managed_project_id="m1",
  95          ).format()
  96          assert packed == "rt|p1|m1"
  97          # Roundtrip
  98          parsed = RefreshParts.parse(packed)
  99          assert parsed.refresh_token == "rt"
 100          assert parsed.project_id == "p1"
 101          assert parsed.managed_project_id == "m1"
 102  
 103      def test_format_empty_refresh_token_returns_empty(self):
 104          from agent.google_oauth import RefreshParts
 105  
 106          assert RefreshParts(refresh_token="").format() == ""
 107  
 108  
 109  class TestClientCredResolution:
 110      def test_env_override(self, monkeypatch):
 111          from agent.google_oauth import _get_client_id
 112  
 113          monkeypatch.setenv("HERMES_GEMINI_CLIENT_ID", "custom-id.apps.googleusercontent.com")
 114          assert _get_client_id() == "custom-id.apps.googleusercontent.com"
 115  
 116      def test_shipped_default_used_when_no_env(self):
 117          """Out of the box, the public gemini-cli desktop client is used."""
 118          from agent.google_oauth import _get_client_id, _DEFAULT_CLIENT_ID
 119  
 120          # Confirmed PUBLIC: baked into Google's open-source gemini-cli
 121          assert _DEFAULT_CLIENT_ID.endswith(".apps.googleusercontent.com")
 122          assert _DEFAULT_CLIENT_ID.startswith("681255809395-")
 123          assert _get_client_id() == _DEFAULT_CLIENT_ID
 124  
 125      def test_shipped_default_secret_present(self):
 126          from agent.google_oauth import _DEFAULT_CLIENT_SECRET, _get_client_secret
 127  
 128          assert _DEFAULT_CLIENT_SECRET.startswith("GOCSPX-")
 129          assert len(_DEFAULT_CLIENT_SECRET) >= 20
 130          assert _get_client_secret() == _DEFAULT_CLIENT_SECRET
 131  
 132      def test_falls_back_to_scrape_when_defaults_wiped(self, tmp_path, monkeypatch):
 133          """Forks that wipe the shipped defaults should still work with gemini-cli."""
 134          from agent import google_oauth
 135  
 136          monkeypatch.setattr(google_oauth, "_DEFAULT_CLIENT_ID", "")
 137          monkeypatch.setattr(google_oauth, "_DEFAULT_CLIENT_SECRET", "")
 138  
 139          fake_bin = tmp_path / "bin" / "gemini"
 140          fake_bin.parent.mkdir(parents=True)
 141          fake_bin.write_text("#!/bin/sh\n")
 142          oauth_dir = tmp_path / "node_modules" / "@google" / "gemini-cli-core" / "dist" / "src" / "code_assist"
 143          oauth_dir.mkdir(parents=True)
 144          (oauth_dir / "oauth2.js").write_text(
 145              'const OAUTH_CLIENT_ID = "99999-fakescrapedxyz.apps.googleusercontent.com";\n'
 146              'const OAUTH_CLIENT_SECRET = "GOCSPX-scraped-test-value-placeholder";\n'
 147          )
 148  
 149          monkeypatch.setattr("shutil.which", lambda _: str(fake_bin))
 150          google_oauth._scraped_creds_cache.clear()
 151  
 152          assert google_oauth._get_client_id().startswith("99999-")
 153  
 154      def test_missing_everything_raises_with_install_hint(self, monkeypatch):
 155          """When env + defaults + scrape all fail, raise with install instructions."""
 156          from agent import google_oauth
 157  
 158          monkeypatch.setattr(google_oauth, "_DEFAULT_CLIENT_ID", "")
 159          monkeypatch.setattr(google_oauth, "_DEFAULT_CLIENT_SECRET", "")
 160          google_oauth._scraped_creds_cache.clear()
 161          monkeypatch.setattr("shutil.which", lambda _: None)
 162  
 163          with pytest.raises(google_oauth.GoogleOAuthError) as exc_info:
 164              google_oauth._require_client_id()
 165          assert exc_info.value.code == "google_oauth_client_id_missing"
 166  
 167      def test_locate_gemini_cli_oauth_js_when_absent(self, monkeypatch):
 168          from agent import google_oauth
 169  
 170          monkeypatch.setattr("shutil.which", lambda _: None)
 171          assert google_oauth._locate_gemini_cli_oauth_js() is None
 172  
 173      def test_scrape_client_credentials_parses_id_and_secret(self, tmp_path, monkeypatch):
 174          from agent import google_oauth
 175  
 176          # Create a fake gemini binary and oauth2.js
 177          fake_gemini_bin = tmp_path / "bin" / "gemini"
 178          fake_gemini_bin.parent.mkdir(parents=True)
 179          fake_gemini_bin.write_text("#!/bin/sh\necho gemini\n")
 180  
 181          oauth_js_dir = tmp_path / "node_modules" / "@google" / "gemini-cli-core" / "dist" / "src" / "code_assist"
 182          oauth_js_dir.mkdir(parents=True)
 183          oauth_js = oauth_js_dir / "oauth2.js"
 184          # Synthesize a harmless test fingerprint (valid shape, obvious test values)
 185          oauth_js.write_text(
 186              'const OAUTH_CLIENT_ID = "12345678-testfakenotrealxyz.apps.googleusercontent.com";\n'
 187              'const OAUTH_CLIENT_SECRET = "GOCSPX-aaaaaaaaaaaaaaaaaaaaaaaa";\n'
 188          )
 189  
 190          monkeypatch.setattr("shutil.which", lambda _: str(fake_gemini_bin))
 191          google_oauth._scraped_creds_cache.clear()
 192  
 193          cid, cs = google_oauth._scrape_client_credentials()
 194          assert cid == "12345678-testfakenotrealxyz.apps.googleusercontent.com"
 195          assert cs.startswith("GOCSPX-")
 196  
 197  
 198  class TestCredentialIo:
 199      def _make(self):
 200          from agent.google_oauth import GoogleCredentials
 201  
 202          return GoogleCredentials(
 203              access_token="at-1",
 204              refresh_token="rt-1",
 205              expires_ms=int((time.time() + 3600) * 1000),
 206              email="user@example.com",
 207              project_id="proj-abc",
 208          )
 209  
 210      def test_save_and_load_packed_refresh(self):
 211          from agent.google_oauth import load_credentials, save_credentials
 212  
 213          creds = self._make()
 214          save_credentials(creds)
 215          loaded = load_credentials()
 216          assert loaded is not None
 217          assert loaded.refresh_token == "rt-1"
 218          assert loaded.project_id == "proj-abc"
 219  
 220      def test_save_uses_0600_permissions(self):
 221          from agent.google_oauth import _credentials_path, save_credentials
 222  
 223          save_credentials(self._make())
 224          mode = stat.S_IMODE(_credentials_path().stat().st_mode)
 225          assert mode == 0o600
 226  
 227      def test_disk_format_is_packed(self):
 228          from agent.google_oauth import _credentials_path, save_credentials
 229  
 230          save_credentials(self._make())
 231          data = json.loads(_credentials_path().read_text())
 232          # The refresh field on disk is the packed string, not a dict
 233          assert data["refresh"] == "rt-1|proj-abc|"
 234  
 235      def test_update_project_ids(self):
 236          from agent.google_oauth import (
 237              load_credentials, save_credentials, update_project_ids,
 238          )
 239          from agent.google_oauth import GoogleCredentials
 240  
 241          save_credentials(GoogleCredentials(
 242              access_token="at", refresh_token="rt",
 243              expires_ms=int((time.time() + 3600) * 1000),
 244          ))
 245          update_project_ids(project_id="new-proj", managed_project_id="mgr-xyz")
 246  
 247          loaded = load_credentials()
 248          assert loaded.project_id == "new-proj"
 249          assert loaded.managed_project_id == "mgr-xyz"
 250  
 251  
 252  class TestAccessTokenExpired:
 253      def test_fresh_token_not_expired(self):
 254          from agent.google_oauth import GoogleCredentials
 255  
 256          creds = GoogleCredentials(
 257              access_token="at", refresh_token="rt",
 258              expires_ms=int((time.time() + 3600) * 1000),
 259          )
 260          assert creds.access_token_expired() is False
 261  
 262      def test_near_expiry_considered_expired(self):
 263          """60s skew — a token with 30s left is considered expired."""
 264          from agent.google_oauth import GoogleCredentials
 265  
 266          creds = GoogleCredentials(
 267              access_token="at", refresh_token="rt",
 268              expires_ms=int((time.time() + 30) * 1000),
 269          )
 270          assert creds.access_token_expired() is True
 271  
 272      def test_no_token_is_expired(self):
 273          from agent.google_oauth import GoogleCredentials
 274  
 275          creds = GoogleCredentials(
 276              access_token="", refresh_token="rt", expires_ms=999999999,
 277          )
 278          assert creds.access_token_expired() is True
 279  
 280  
 281  class TestGetValidAccessToken:
 282      def _save(self, **over):
 283          from agent.google_oauth import GoogleCredentials, save_credentials
 284  
 285          defaults = {
 286              "access_token": "at",
 287              "refresh_token": "rt",
 288              "expires_ms": int((time.time() + 3600) * 1000),
 289          }
 290          defaults.update(over)
 291          save_credentials(GoogleCredentials(**defaults))
 292  
 293      def test_returns_cached_when_fresh(self):
 294          from agent.google_oauth import get_valid_access_token
 295  
 296          self._save(access_token="cached-token")
 297          assert get_valid_access_token() == "cached-token"
 298  
 299      def test_refreshes_when_near_expiry(self, monkeypatch):
 300          from agent import google_oauth
 301  
 302          self._save(expires_ms=int((time.time() + 30) * 1000))
 303          monkeypatch.setattr(
 304              google_oauth, "_post_form",
 305              lambda *a, **kw: {"access_token": "refreshed", "expires_in": 3600},
 306          )
 307          assert google_oauth.get_valid_access_token() == "refreshed"
 308  
 309      def test_invalid_grant_clears_credentials(self, monkeypatch):
 310          from agent import google_oauth
 311  
 312          self._save(expires_ms=int((time.time() - 10) * 1000))
 313  
 314          def boom(*a, **kw):
 315              raise google_oauth.GoogleOAuthError(
 316                  "invalid_grant", code="google_oauth_invalid_grant",
 317              )
 318  
 319          monkeypatch.setattr(google_oauth, "_post_form", boom)
 320  
 321          with pytest.raises(google_oauth.GoogleOAuthError) as exc_info:
 322              google_oauth.get_valid_access_token()
 323          assert exc_info.value.code == "google_oauth_invalid_grant"
 324          # Credentials should be wiped
 325          assert google_oauth.load_credentials() is None
 326  
 327      def test_preserves_refresh_when_google_omits(self, monkeypatch):
 328          from agent import google_oauth
 329  
 330          self._save(expires_ms=int((time.time() + 30) * 1000), refresh_token="original-rt")
 331          monkeypatch.setattr(
 332              google_oauth, "_post_form",
 333              lambda *a, **kw: {"access_token": "new", "expires_in": 3600},
 334          )
 335          google_oauth.get_valid_access_token()
 336          assert google_oauth.load_credentials().refresh_token == "original-rt"
 337  
 338  
 339  class TestProjectIdResolution:
 340      @pytest.mark.parametrize("env_var", [
 341          "HERMES_GEMINI_PROJECT_ID",
 342          "GOOGLE_CLOUD_PROJECT",
 343          "GOOGLE_CLOUD_PROJECT_ID",
 344      ])
 345      def test_env_vars_checked(self, monkeypatch, env_var):
 346          from agent.google_oauth import resolve_project_id_from_env
 347  
 348          monkeypatch.setenv(env_var, "test-proj")
 349          assert resolve_project_id_from_env() == "test-proj"
 350  
 351      def test_priority_order(self, monkeypatch):
 352          from agent.google_oauth import resolve_project_id_from_env
 353  
 354          monkeypatch.setenv("GOOGLE_CLOUD_PROJECT", "lower-priority")
 355          monkeypatch.setenv("HERMES_GEMINI_PROJECT_ID", "higher-priority")
 356          assert resolve_project_id_from_env() == "higher-priority"
 357  
 358      def test_no_env_returns_empty(self):
 359          from agent.google_oauth import resolve_project_id_from_env
 360  
 361          assert resolve_project_id_from_env() == ""
 362  
 363  
 364  class TestHeadlessDetection:
 365      def test_detects_ssh(self, monkeypatch):
 366          from agent.google_oauth import _is_headless
 367  
 368          monkeypatch.setenv("SSH_CONNECTION", "1.2.3.4 22 5.6.7.8 9876")
 369          assert _is_headless() is True
 370  
 371      def test_detects_hermes_headless(self, monkeypatch):
 372          from agent.google_oauth import _is_headless
 373  
 374          monkeypatch.setenv("HERMES_HEADLESS", "1")
 375          assert _is_headless() is True
 376  
 377      def test_default_not_headless(self):
 378          from agent.google_oauth import _is_headless
 379  
 380          assert _is_headless() is False
 381  
 382  
 383  # =============================================================================
 384  # google_code_assist.py — project discovery, onboarding, quota, VPC-SC
 385  # =============================================================================
 386  
 387  class TestCodeAssistVpcScDetection:
 388      def test_detects_vpc_sc_in_json(self):
 389          from agent.google_code_assist import _is_vpc_sc_violation
 390  
 391          body = json.dumps({
 392              "error": {
 393                  "details": [{"reason": "SECURITY_POLICY_VIOLATED"}],
 394                  "message": "blocked by policy",
 395              }
 396          })
 397          assert _is_vpc_sc_violation(body) is True
 398  
 399      def test_detects_vpc_sc_in_message(self):
 400          from agent.google_code_assist import _is_vpc_sc_violation
 401  
 402          body = '{"error": {"message": "SECURITY_POLICY_VIOLATED"}}'
 403          assert _is_vpc_sc_violation(body) is True
 404  
 405      def test_non_vpc_sc_returns_false(self):
 406          from agent.google_code_assist import _is_vpc_sc_violation
 407  
 408          assert _is_vpc_sc_violation('{"error": {"message": "not found"}}') is False
 409          assert _is_vpc_sc_violation("") is False
 410  
 411  
 412  class TestLoadCodeAssist:
 413      def test_parses_response(self, monkeypatch):
 414          from agent import google_code_assist
 415  
 416          fake = {
 417              "currentTier": {"id": "free-tier"},
 418              "cloudaicompanionProject": "proj-123",
 419              "allowedTiers": [{"id": "free-tier"}, {"id": "standard-tier"}],
 420          }
 421          monkeypatch.setattr(google_code_assist, "_post_json", lambda *a, **kw: fake)
 422  
 423          info = google_code_assist.load_code_assist("access-token")
 424          assert info.current_tier_id == "free-tier"
 425          assert info.cloudaicompanion_project == "proj-123"
 426          assert "free-tier" in info.allowed_tiers
 427          assert "standard-tier" in info.allowed_tiers
 428  
 429      def test_vpc_sc_forces_standard_tier(self, monkeypatch):
 430          from agent import google_code_assist
 431  
 432          def boom(*a, **kw):
 433              raise google_code_assist.CodeAssistError(
 434                  "VPC-SC policy violation", code="code_assist_vpc_sc",
 435              )
 436  
 437          monkeypatch.setattr(google_code_assist, "_post_json", boom)
 438  
 439          info = google_code_assist.load_code_assist("access-token", project_id="corp-proj")
 440          assert info.current_tier_id == "standard-tier"
 441          assert info.cloudaicompanion_project == "corp-proj"
 442  
 443  
 444  class TestOnboardUser:
 445      def test_paid_tier_requires_project_id(self):
 446          from agent import google_code_assist
 447  
 448          with pytest.raises(google_code_assist.ProjectIdRequiredError):
 449              google_code_assist.onboard_user(
 450                  "at", tier_id="standard-tier", project_id="",
 451              )
 452  
 453      def test_free_tier_no_project_required(self, monkeypatch):
 454          from agent import google_code_assist
 455  
 456          monkeypatch.setattr(
 457              google_code_assist, "_post_json",
 458              lambda *a, **kw: {"done": True, "response": {"cloudaicompanionProject": "gen-123"}},
 459          )
 460          resp = google_code_assist.onboard_user("at", tier_id="free-tier")
 461          assert resp["done"] is True
 462  
 463      def test_lro_polling(self, monkeypatch):
 464          """Simulate a long-running operation that completes on the second poll."""
 465          from agent import google_code_assist
 466  
 467          call_count = {"n": 0}
 468  
 469          def fake_post(url, body, token, **kw):
 470              call_count["n"] += 1
 471              if call_count["n"] == 1:
 472                  return {"name": "operations/op-abc", "done": False}
 473              return {"name": "operations/op-abc", "done": True, "response": {}}
 474  
 475          monkeypatch.setattr(google_code_assist, "_post_json", fake_post)
 476          monkeypatch.setattr(google_code_assist.time, "sleep", lambda *_: None)
 477  
 478          resp = google_code_assist.onboard_user(
 479              "at", tier_id="free-tier",
 480          )
 481          assert resp["done"] is True
 482          assert call_count["n"] >= 2
 483  
 484  
 485  class TestRetrieveUserQuota:
 486      def test_parses_buckets(self, monkeypatch):
 487          from agent import google_code_assist
 488  
 489          fake = {
 490              "buckets": [
 491                  {
 492                      "modelId": "gemini-2.5-pro",
 493                      "tokenType": "input",
 494                      "remainingFraction": 0.75,
 495                      "resetTime": "2026-04-17T00:00:00Z",
 496                  },
 497                  {
 498                      "modelId": "gemini-2.5-flash",
 499                      "remainingFraction": 0.9,
 500                  },
 501              ]
 502          }
 503          monkeypatch.setattr(google_code_assist, "_post_json", lambda *a, **kw: fake)
 504  
 505          buckets = google_code_assist.retrieve_user_quota("at", project_id="p1")
 506          assert len(buckets) == 2
 507          assert buckets[0].model_id == "gemini-2.5-pro"
 508          assert buckets[0].remaining_fraction == 0.75
 509          assert buckets[1].remaining_fraction == 0.9
 510  
 511  
 512  class TestResolveProjectContext:
 513      def test_configured_shortcircuits(self, monkeypatch):
 514          from agent.google_code_assist import resolve_project_context
 515  
 516          # Should NOT call loadCodeAssist when configured_project_id is set
 517          def should_not_be_called(*a, **kw):
 518              raise AssertionError("should short-circuit")
 519  
 520          monkeypatch.setattr(
 521              "agent.google_code_assist._post_json", should_not_be_called,
 522          )
 523          ctx = resolve_project_context("at", configured_project_id="proj-abc")
 524          assert ctx.project_id == "proj-abc"
 525          assert ctx.source == "config"
 526  
 527      def test_env_shortcircuits(self, monkeypatch):
 528          from agent.google_code_assist import resolve_project_context
 529  
 530          monkeypatch.setattr(
 531              "agent.google_code_assist._post_json",
 532              lambda *a, **kw: (_ for _ in ()).throw(AssertionError("nope")),
 533          )
 534          ctx = resolve_project_context("at", env_project_id="env-proj")
 535          assert ctx.project_id == "env-proj"
 536          assert ctx.source == "env"
 537  
 538      def test_discovers_via_load_code_assist(self, monkeypatch):
 539          from agent import google_code_assist
 540  
 541          monkeypatch.setattr(
 542              google_code_assist, "_post_json",
 543              lambda *a, **kw: {
 544                  "currentTier": {"id": "free-tier"},
 545                  "cloudaicompanionProject": "discovered-proj",
 546              },
 547          )
 548          ctx = google_code_assist.resolve_project_context("at")
 549          assert ctx.project_id == "discovered-proj"
 550          assert ctx.tier_id == "free-tier"
 551          assert ctx.source == "discovered"
 552  
 553  
 554  # =============================================================================
 555  # gemini_cloudcode_adapter.py — request/response translation
 556  # =============================================================================
 557  
 558  class TestBuildGeminiRequest:
 559      def test_user_assistant_messages(self):
 560          from agent.gemini_cloudcode_adapter import build_gemini_request
 561  
 562          req = build_gemini_request(messages=[
 563              {"role": "user", "content": "hi"},
 564              {"role": "assistant", "content": "hello"},
 565          ])
 566          assert req["contents"][0] == {
 567              "role": "user", "parts": [{"text": "hi"}],
 568          }
 569          assert req["contents"][1] == {
 570              "role": "model", "parts": [{"text": "hello"}],
 571          }
 572  
 573      def test_system_instruction_separated(self):
 574          from agent.gemini_cloudcode_adapter import build_gemini_request
 575  
 576          req = build_gemini_request(messages=[
 577              {"role": "system", "content": "You are helpful"},
 578              {"role": "user", "content": "hi"},
 579          ])
 580          assert req["systemInstruction"]["parts"][0]["text"] == "You are helpful"
 581          # System should NOT appear in contents
 582          assert all(c["role"] != "system" for c in req["contents"])
 583  
 584      def test_multiple_system_messages_joined(self):
 585          from agent.gemini_cloudcode_adapter import build_gemini_request
 586  
 587          req = build_gemini_request(messages=[
 588              {"role": "system", "content": "A"},
 589              {"role": "system", "content": "B"},
 590              {"role": "user", "content": "hi"},
 591          ])
 592          assert "A\nB" in req["systemInstruction"]["parts"][0]["text"]
 593  
 594      def test_tool_call_translation(self):
 595          from agent.gemini_cloudcode_adapter import build_gemini_request
 596  
 597          req = build_gemini_request(messages=[
 598              {"role": "user", "content": "what's the weather?"},
 599              {
 600                  "role": "assistant",
 601                  "content": None,
 602                  "tool_calls": [{
 603                      "id": "call_1",
 604                      "type": "function",
 605                      "function": {"name": "get_weather", "arguments": '{"city": "SF"}'},
 606                  }],
 607              },
 608          ])
 609          # Assistant turn should have a functionCall part
 610          model_turn = req["contents"][1]
 611          assert model_turn["role"] == "model"
 612          fc_part = next(p for p in model_turn["parts"] if "functionCall" in p)
 613          assert fc_part["functionCall"]["name"] == "get_weather"
 614          assert fc_part["functionCall"]["args"] == {"city": "SF"}
 615  
 616      def test_tool_result_translation(self):
 617          from agent.gemini_cloudcode_adapter import build_gemini_request
 618  
 619          req = build_gemini_request(messages=[
 620              {"role": "user", "content": "q"},
 621              {"role": "assistant", "tool_calls": [{
 622                  "id": "c1", "type": "function",
 623                  "function": {"name": "get_weather", "arguments": "{}"},
 624              }]},
 625              {
 626                  "role": "tool",
 627                  "name": "get_weather",
 628                  "tool_call_id": "c1",
 629                  "content": '{"temp": 72}',
 630              },
 631          ])
 632          # Last content turn should carry functionResponse
 633          last = req["contents"][-1]
 634          fr_part = next(p for p in last["parts"] if "functionResponse" in p)
 635          assert fr_part["functionResponse"]["name"] == "get_weather"
 636          assert fr_part["functionResponse"]["response"] == {"temp": 72}
 637  
 638      def test_tools_translated_to_function_declarations(self):
 639          from agent.gemini_cloudcode_adapter import build_gemini_request
 640  
 641          req = build_gemini_request(
 642              messages=[{"role": "user", "content": "hi"}],
 643              tools=[
 644                  {"type": "function", "function": {
 645                      "name": "fn1", "description": "foo",
 646                      "parameters": {"type": "object"},
 647                  }},
 648              ],
 649          )
 650          decls = req["tools"][0]["functionDeclarations"]
 651          assert decls[0]["name"] == "fn1"
 652          assert decls[0]["description"] == "foo"
 653          assert decls[0]["parameters"] == {"type": "object"}
 654  
 655      def test_tools_strip_json_schema_only_fields_from_parameters(self):
 656          from agent.gemini_cloudcode_adapter import build_gemini_request
 657  
 658          req = build_gemini_request(
 659              messages=[{"role": "user", "content": "hi"}],
 660              tools=[
 661                  {"type": "function", "function": {
 662                      "name": "fn1",
 663                      "description": "foo",
 664                      "parameters": {
 665                          "$schema": "https://json-schema.org/draft/2020-12/schema",
 666                          "type": "object",
 667                          "additionalProperties": False,
 668                          "properties": {
 669                              "city": {
 670                                  "type": "string",
 671                                  "$schema": "ignored",
 672                                  "description": "City name",
 673                                  "additionalProperties": False,
 674                              }
 675                          },
 676                          "required": ["city"],
 677                      },
 678                  }},
 679              ],
 680          )
 681          params = req["tools"][0]["functionDeclarations"][0]["parameters"]
 682          assert "$schema" not in params
 683          assert "additionalProperties" not in params
 684          assert params["type"] == "object"
 685          assert params["required"] == ["city"]
 686          assert params["properties"]["city"] == {
 687              "type": "string",
 688              "description": "City name",
 689          }
 690  
 691      def test_tool_choice_auto(self):
 692          from agent.gemini_cloudcode_adapter import build_gemini_request
 693  
 694          req = build_gemini_request(
 695              messages=[{"role": "user", "content": "hi"}],
 696              tool_choice="auto",
 697          )
 698          assert req["toolConfig"]["functionCallingConfig"]["mode"] == "AUTO"
 699  
 700      def test_tool_choice_required(self):
 701          from agent.gemini_cloudcode_adapter import build_gemini_request
 702  
 703          req = build_gemini_request(
 704              messages=[{"role": "user", "content": "hi"}],
 705              tool_choice="required",
 706          )
 707          assert req["toolConfig"]["functionCallingConfig"]["mode"] == "ANY"
 708  
 709      def test_tool_choice_specific_function(self):
 710          from agent.gemini_cloudcode_adapter import build_gemini_request
 711  
 712          req = build_gemini_request(
 713              messages=[{"role": "user", "content": "hi"}],
 714              tool_choice={"type": "function", "function": {"name": "my_fn"}},
 715          )
 716          cfg = req["toolConfig"]["functionCallingConfig"]
 717          assert cfg["mode"] == "ANY"
 718          assert cfg["allowedFunctionNames"] == ["my_fn"]
 719  
 720      def test_generation_config_params(self):
 721          from agent.gemini_cloudcode_adapter import build_gemini_request
 722  
 723          req = build_gemini_request(
 724              messages=[{"role": "user", "content": "hi"}],
 725              temperature=0.7,
 726              max_tokens=512,
 727              top_p=0.9,
 728              stop=["###", "END"],
 729          )
 730          gc = req["generationConfig"]
 731          assert gc["temperature"] == 0.7
 732          assert gc["maxOutputTokens"] == 512
 733          assert gc["topP"] == 0.9
 734          assert gc["stopSequences"] == ["###", "END"]
 735  
 736      def test_thinking_config_normalization(self):
 737          from agent.gemini_cloudcode_adapter import build_gemini_request
 738  
 739          req = build_gemini_request(
 740              messages=[{"role": "user", "content": "hi"}],
 741              thinking_config={"thinking_budget": 1024, "include_thoughts": True},
 742          )
 743          tc = req["generationConfig"]["thinkingConfig"]
 744          assert tc["thinkingBudget"] == 1024
 745          assert tc["includeThoughts"] is True
 746  
 747  
 748  class TestWrapCodeAssistRequest:
 749      def test_envelope_shape(self):
 750          from agent.gemini_cloudcode_adapter import wrap_code_assist_request
 751  
 752          inner = {"contents": [], "generationConfig": {}}
 753          wrapped = wrap_code_assist_request(
 754              project_id="p1", model="gemini-2.5-pro", inner_request=inner,
 755          )
 756          assert wrapped["project"] == "p1"
 757          assert wrapped["model"] == "gemini-2.5-pro"
 758          assert wrapped["request"] is inner
 759          assert "user_prompt_id" in wrapped
 760          assert len(wrapped["user_prompt_id"]) > 10
 761  
 762  
 763  class TestTranslateGeminiResponse:
 764      def test_text_response(self):
 765          from agent.gemini_cloudcode_adapter import _translate_gemini_response
 766  
 767          resp = {
 768              "response": {
 769                  "candidates": [{
 770                      "content": {"parts": [{"text": "hello world"}]},
 771                      "finishReason": "STOP",
 772                  }],
 773                  "usageMetadata": {
 774                      "promptTokenCount": 10,
 775                      "candidatesTokenCount": 5,
 776                      "totalTokenCount": 15,
 777                  },
 778              }
 779          }
 780          result = _translate_gemini_response(resp, model="gemini-2.5-flash")
 781          assert result.choices[0].message.content == "hello world"
 782          assert result.choices[0].message.tool_calls is None
 783          assert result.choices[0].finish_reason == "stop"
 784          assert result.usage.prompt_tokens == 10
 785          assert result.usage.completion_tokens == 5
 786          assert result.usage.total_tokens == 15
 787  
 788      def test_function_call_response(self):
 789          from agent.gemini_cloudcode_adapter import _translate_gemini_response
 790  
 791          resp = {
 792              "response": {
 793                  "candidates": [{
 794                      "content": {"parts": [{
 795                          "functionCall": {"name": "lookup", "args": {"q": "weather"}},
 796                      }]},
 797                      "finishReason": "STOP",
 798                  }],
 799              }
 800          }
 801          result = _translate_gemini_response(resp, model="gemini-2.5-flash")
 802          tc = result.choices[0].message.tool_calls[0]
 803          assert tc.function.name == "lookup"
 804          assert json.loads(tc.function.arguments) == {"q": "weather"}
 805          assert result.choices[0].finish_reason == "tool_calls"
 806  
 807      def test_thought_parts_go_to_reasoning(self):
 808          from agent.gemini_cloudcode_adapter import _translate_gemini_response
 809  
 810          resp = {
 811              "response": {
 812                  "candidates": [{
 813                      "content": {"parts": [
 814                          {"thought": True, "text": "let me think"},
 815                          {"text": "final answer"},
 816                      ]},
 817                  }],
 818              }
 819          }
 820          result = _translate_gemini_response(resp, model="gemini-2.5-flash")
 821          assert result.choices[0].message.content == "final answer"
 822          assert result.choices[0].message.reasoning == "let me think"
 823  
 824      def test_unwraps_direct_format(self):
 825          """If response is already at top level (no 'response' wrapper), still parse."""
 826          from agent.gemini_cloudcode_adapter import _translate_gemini_response
 827  
 828          resp = {
 829              "candidates": [{
 830                  "content": {"parts": [{"text": "hi"}]},
 831                  "finishReason": "STOP",
 832              }],
 833          }
 834          result = _translate_gemini_response(resp, model="gemini-2.5-flash")
 835          assert result.choices[0].message.content == "hi"
 836  
 837      def test_empty_candidates(self):
 838          from agent.gemini_cloudcode_adapter import _translate_gemini_response
 839  
 840          result = _translate_gemini_response({"response": {"candidates": []}}, model="gemini-2.5-flash")
 841          assert result.choices[0].message.content == ""
 842          assert result.choices[0].finish_reason == "stop"
 843  
 844      def test_finish_reason_mapping(self):
 845          from agent.gemini_cloudcode_adapter import _map_gemini_finish_reason
 846  
 847          assert _map_gemini_finish_reason("STOP") == "stop"
 848          assert _map_gemini_finish_reason("MAX_TOKENS") == "length"
 849          assert _map_gemini_finish_reason("SAFETY") == "content_filter"
 850          assert _map_gemini_finish_reason("RECITATION") == "content_filter"
 851  
 852  
 853  class TestTranslateStreamEvent:
 854      def test_parallel_calls_to_same_tool_get_unique_indices(self):
 855          """Gemini may emit several functionCall parts with the same name in a
 856          single turn (e.g. parallel file reads). Each must get its own OpenAI
 857          ``index`` — otherwise downstream aggregators collapse them into one.
 858          """
 859          from agent.gemini_cloudcode_adapter import _translate_stream_event
 860  
 861          event = {
 862              "response": {
 863                  "candidates": [{
 864                      "content": {"parts": [
 865                          {"functionCall": {"name": "read_file", "args": {"path": "a"}}},
 866                          {"functionCall": {"name": "read_file", "args": {"path": "b"}}},
 867                          {"functionCall": {"name": "read_file", "args": {"path": "c"}}},
 868                      ]},
 869                  }],
 870              }
 871          }
 872          counter = [0]
 873          chunks = _translate_stream_event(event, model="gemini-2.5-flash",
 874                                           tool_call_counter=counter)
 875          indices = [c.choices[0].delta.tool_calls[0].index for c in chunks]
 876          assert indices == [0, 1, 2]
 877          assert counter[0] == 3
 878  
 879      def test_counter_persists_across_events(self):
 880          """Index assignment must continue across SSE events in the same stream."""
 881          from agent.gemini_cloudcode_adapter import _translate_stream_event
 882  
 883          def _event(name):
 884              return {"response": {"candidates": [{
 885                  "content": {"parts": [{"functionCall": {"name": name, "args": {}}}]},
 886              }]}}
 887  
 888          counter = [0]
 889          chunks_a = _translate_stream_event(_event("foo"), model="m", tool_call_counter=counter)
 890          chunks_b = _translate_stream_event(_event("bar"), model="m", tool_call_counter=counter)
 891          chunks_c = _translate_stream_event(_event("foo"), model="m", tool_call_counter=counter)
 892  
 893          assert chunks_a[0].choices[0].delta.tool_calls[0].index == 0
 894          assert chunks_b[0].choices[0].delta.tool_calls[0].index == 1
 895          assert chunks_c[0].choices[0].delta.tool_calls[0].index == 2
 896  
 897      def test_finish_reason_switches_to_tool_calls_when_any_seen(self):
 898          from agent.gemini_cloudcode_adapter import _translate_stream_event
 899  
 900          counter = [0]
 901          # First event emits one tool call.
 902          _translate_stream_event(
 903              {"response": {"candidates": [{
 904                  "content": {"parts": [{"functionCall": {"name": "x", "args": {}}}]},
 905              }]}},
 906              model="m", tool_call_counter=counter,
 907          )
 908          # Second event carries only the terminal finishReason.
 909          chunks = _translate_stream_event(
 910              {"response": {"candidates": [{"finishReason": "STOP"}]}},
 911              model="m", tool_call_counter=counter,
 912          )
 913          assert chunks[-1].choices[0].finish_reason == "tool_calls"
 914  
 915  
 916  class TestGeminiCloudCodeClient:
 917      def test_client_exposes_openai_interface(self):
 918          from agent.gemini_cloudcode_adapter import GeminiCloudCodeClient
 919  
 920          client = GeminiCloudCodeClient(api_key="dummy")
 921          try:
 922              assert hasattr(client, "chat")
 923              assert hasattr(client.chat, "completions")
 924              assert callable(client.chat.completions.create)
 925          finally:
 926              client.close()
 927  
 928  
 929  class TestGeminiHttpErrorParsing:
 930      """Regression coverage for _gemini_http_error Google-envelope parsing.
 931  
 932      These are the paths that users actually hit during Google-side throttling
 933      (April 2026: gemini-2.5-pro MODEL_CAPACITY_EXHAUSTED, gemma-4-26b-it
 934      returning 404).  The error needs to carry status_code + response so the
 935      main loop's error_classifier and Retry-After logic work.
 936      """
 937  
 938      @staticmethod
 939      def _fake_response(status: int, body: dict | str = "", headers=None):
 940          """Minimal httpx.Response stand-in (duck-typed for _gemini_http_error)."""
 941          class _FakeResponse:
 942              def __init__(self):
 943                  self.status_code = status
 944                  if isinstance(body, dict):
 945                      self.text = json.dumps(body)
 946                  else:
 947                      self.text = body
 948                  self.headers = headers or {}
 949          return _FakeResponse()
 950  
 951      def test_model_capacity_exhausted_produces_friendly_message(self):
 952          from agent.gemini_cloudcode_adapter import _gemini_http_error
 953  
 954          body = {
 955              "error": {
 956                  "code": 429,
 957                  "message": "Resource has been exhausted (e.g. check quota).",
 958                  "status": "RESOURCE_EXHAUSTED",
 959                  "details": [
 960                      {
 961                          "@type": "type.googleapis.com/google.rpc.ErrorInfo",
 962                          "reason": "MODEL_CAPACITY_EXHAUSTED",
 963                          "domain": "googleapis.com",
 964                          "metadata": {"model": "gemini-2.5-pro"},
 965                      },
 966                      {
 967                          "@type": "type.googleapis.com/google.rpc.RetryInfo",
 968                          "retryDelay": "30s",
 969                      },
 970                  ],
 971              }
 972          }
 973          err = _gemini_http_error(self._fake_response(429, body))
 974          assert err.status_code == 429
 975          assert err.code == "code_assist_capacity_exhausted"
 976          assert err.retry_after == 30.0
 977          assert err.details["reason"] == "MODEL_CAPACITY_EXHAUSTED"
 978          # Message must be user-friendly, not a raw JSON dump.
 979          message = str(err)
 980          assert "gemini-2.5-pro" in message
 981          assert "capacity exhausted" in message.lower()
 982          assert "30s" in message
 983          # response attr is preserved for run_agent's Retry-After header path.
 984          assert err.response is not None
 985  
 986      def test_resource_exhausted_without_reason(self):
 987          from agent.gemini_cloudcode_adapter import _gemini_http_error
 988  
 989          body = {
 990              "error": {
 991                  "code": 429,
 992                  "message": "Quota exceeded for requests per minute.",
 993                  "status": "RESOURCE_EXHAUSTED",
 994              }
 995          }
 996          err = _gemini_http_error(self._fake_response(429, body))
 997          assert err.status_code == 429
 998          assert err.code == "code_assist_rate_limited"
 999          message = str(err)
1000          assert "quota" in message.lower()
1001  
1002      def test_404_model_not_found_produces_model_retired_message(self):
1003          from agent.gemini_cloudcode_adapter import _gemini_http_error
1004  
1005          body = {
1006              "error": {
1007                  "code": 404,
1008                  "message": "models/gemma-4-26b-it is not found for API version v1internal",
1009                  "status": "NOT_FOUND",
1010              }
1011          }
1012          err = _gemini_http_error(self._fake_response(404, body))
1013          assert err.status_code == 404
1014          message = str(err)
1015          assert "not available" in message.lower() or "retired" in message.lower()
1016          # Error message should reference the actual model text from Google.
1017          assert "gemma-4-26b-it" in message
1018  
1019      def test_unauthorized_preserves_status_code(self):
1020          from agent.gemini_cloudcode_adapter import _gemini_http_error
1021  
1022          err = _gemini_http_error(self._fake_response(
1023              401, {"error": {"code": 401, "message": "Invalid token", "status": "UNAUTHENTICATED"}},
1024          ))
1025          assert err.status_code == 401
1026          assert err.code == "code_assist_unauthorized"
1027  
1028      def test_retry_after_header_fallback(self):
1029          """If the body has no RetryInfo detail, fall back to Retry-After header."""
1030          from agent.gemini_cloudcode_adapter import _gemini_http_error
1031  
1032          resp = self._fake_response(
1033              429,
1034              {"error": {"code": 429, "message": "Rate limited", "status": "RESOURCE_EXHAUSTED"}},
1035              headers={"Retry-After": "45"},
1036          )
1037          err = _gemini_http_error(resp)
1038          assert err.retry_after == 45.0
1039  
1040      def test_malformed_body_still_produces_structured_error(self):
1041          """Non-JSON body must not swallow status_code — we still want the classifier path."""
1042          from agent.gemini_cloudcode_adapter import _gemini_http_error
1043  
1044          err = _gemini_http_error(self._fake_response(500, "<html>internal error</html>"))
1045          assert err.status_code == 500
1046          # Raw body snippet must still be there for debugging.
1047          assert "500" in str(err)
1048  
1049      def test_status_code_flows_through_error_classifier(self):
1050          """End-to-end: CodeAssistError from a 429 must classify as rate_limit.
1051  
1052          This is the whole point of adding status_code to CodeAssistError —
1053          _extract_status_code must see it and FailoverReason.rate_limit must
1054          fire, so the main loop triggers fallback_providers.
1055          """
1056          from agent.gemini_cloudcode_adapter import _gemini_http_error
1057          from agent.error_classifier import classify_api_error, FailoverReason
1058  
1059          body = {
1060              "error": {
1061                  "code": 429,
1062                  "message": "Resource has been exhausted",
1063                  "status": "RESOURCE_EXHAUSTED",
1064                  "details": [
1065                      {
1066                          "@type": "type.googleapis.com/google.rpc.ErrorInfo",
1067                          "reason": "MODEL_CAPACITY_EXHAUSTED",
1068                          "metadata": {"model": "gemini-2.5-pro"},
1069                      }
1070                  ],
1071              }
1072          }
1073          err = _gemini_http_error(self._fake_response(429, body))
1074  
1075          classified = classify_api_error(
1076              err, provider="google-gemini-cli", model="gemini-2.5-pro",
1077          )
1078          assert classified.status_code == 429
1079          assert classified.reason == FailoverReason.rate_limit
1080  
1081  
1082  # =============================================================================
1083  # Provider registration
1084  # =============================================================================
1085  
1086  class TestProviderRegistration:
1087      def test_registry_entry(self):
1088          from hermes_cli.auth import PROVIDER_REGISTRY
1089  
1090          assert "google-gemini-cli" in PROVIDER_REGISTRY
1091          assert PROVIDER_REGISTRY["google-gemini-cli"].auth_type == "oauth_external"
1092  
1093      def test_google_gemini_alias_still_goes_to_api_key_gemini(self):
1094          """Regression guard: don't shadow the existing google-gemini → gemini alias."""
1095          from hermes_cli.auth import resolve_provider
1096  
1097          assert resolve_provider("google-gemini") == "gemini"
1098  
1099      def test_runtime_provider_raises_when_not_logged_in(self):
1100          from hermes_cli.auth import AuthError
1101          from hermes_cli.runtime_provider import resolve_runtime_provider
1102  
1103          with pytest.raises(AuthError) as exc_info:
1104              resolve_runtime_provider(requested="google-gemini-cli")
1105          assert exc_info.value.code == "google_oauth_not_logged_in"
1106  
1107      def test_runtime_provider_returns_correct_shape_when_logged_in(self):
1108          from agent.google_oauth import GoogleCredentials, save_credentials
1109          from hermes_cli.runtime_provider import resolve_runtime_provider
1110  
1111          save_credentials(GoogleCredentials(
1112              access_token="live-tok",
1113              refresh_token="rt",
1114              expires_ms=int((time.time() + 3600) * 1000),
1115              project_id="my-proj",
1116              email="t@e.com",
1117          ))
1118  
1119          result = resolve_runtime_provider(requested="google-gemini-cli")
1120          assert result["provider"] == "google-gemini-cli"
1121          assert result["api_mode"] == "chat_completions"
1122          assert result["api_key"] == "live-tok"
1123          assert result["base_url"] == "cloudcode-pa://google"
1124          assert result["project_id"] == "my-proj"
1125          assert result["email"] == "t@e.com"
1126  
1127      def test_determine_api_mode(self):
1128          from hermes_cli.providers import determine_api_mode
1129  
1130          assert determine_api_mode("google-gemini-cli", "cloudcode-pa://google") == "chat_completions"
1131  
1132      def test_oauth_capable_set_preserves_existing(self):
1133          from hermes_cli.auth_commands import _OAUTH_CAPABLE_PROVIDERS
1134  
1135          for required in ("anthropic", "nous", "openai-codex", "qwen-oauth", "google-gemini-cli"):
1136              assert required in _OAUTH_CAPABLE_PROVIDERS
1137  
1138      def test_config_env_vars_registered(self):
1139          from hermes_cli.config import OPTIONAL_ENV_VARS
1140  
1141          for key in (
1142              "HERMES_GEMINI_CLIENT_ID",
1143              "HERMES_GEMINI_CLIENT_SECRET",
1144              "HERMES_GEMINI_PROJECT_ID",
1145          ):
1146              assert key in OPTIONAL_ENV_VARS
1147  
1148  
1149  class TestAuthStatus:
1150      def test_not_logged_in(self):
1151          from hermes_cli.auth import get_auth_status
1152  
1153          s = get_auth_status("google-gemini-cli")
1154          assert s["logged_in"] is False
1155  
1156      def test_logged_in_reports_email_and_project(self):
1157          from agent.google_oauth import GoogleCredentials, save_credentials
1158          from hermes_cli.auth import get_auth_status
1159  
1160          save_credentials(GoogleCredentials(
1161              access_token="tok", refresh_token="rt",
1162              expires_ms=int((time.time() + 3600) * 1000),
1163              email="tek@nous.ai",
1164              project_id="tek-proj",
1165          ))
1166  
1167          s = get_auth_status("google-gemini-cli")
1168          assert s["logged_in"] is True
1169          assert s["email"] == "tek@nous.ai"
1170          assert s["project_id"] == "tek-proj"
1171  
1172  
1173  class TestGquotaCommand:
1174      def test_gquota_registered(self):
1175          from hermes_cli.commands import COMMANDS
1176  
1177          assert "/gquota" in COMMANDS
1178  
1179  
1180  class TestRunGeminiOauthLoginPure:
1181      def test_returns_pool_compatible_dict(self, monkeypatch):
1182          from agent import google_oauth
1183  
1184          def fake_start(**kw):
1185              return google_oauth.GoogleCredentials(
1186                  access_token="at", refresh_token="rt",
1187                  expires_ms=int((time.time() + 3600) * 1000),
1188                  email="u@e.com", project_id="p",
1189              )
1190  
1191          monkeypatch.setattr(google_oauth, "start_oauth_flow", fake_start)
1192  
1193          result = google_oauth.run_gemini_oauth_login_pure()
1194          assert result["access_token"] == "at"
1195          assert result["refresh_token"] == "rt"
1196          assert result["email"] == "u@e.com"
1197          assert result["project_id"] == "p"
1198          assert isinstance(result["expires_at_ms"], int)