/ tests / agent / test_gemini_free_tier_gate.py
test_gemini_free_tier_gate.py
  1  """Tests for Gemini free-tier detection and blocking."""
  2  from __future__ import annotations
  3  
  4  from unittest.mock import MagicMock, patch
  5  
  6  import pytest
  7  
  8  from agent.gemini_native_adapter import (
  9      gemini_http_error,
 10      is_free_tier_quota_error,
 11      probe_gemini_tier,
 12  )
 13  
 14  
 15  def _mock_response(status: int, headers: dict | None = None, text: str = "") -> MagicMock:
 16      resp = MagicMock()
 17      resp.status_code = status
 18      resp.headers = headers or {}
 19      resp.text = text
 20      return resp
 21  
 22  
 23  def _run_probe(resp: MagicMock) -> str:
 24      with patch("agent.gemini_native_adapter.httpx.Client") as MC:
 25          inst = MagicMock()
 26          inst.post.return_value = resp
 27          MC.return_value.__enter__.return_value = inst
 28          return probe_gemini_tier("fake-key")
 29  
 30  
 31  class TestProbeGeminiTier:
 32      """Verify the tier probe classifies keys correctly."""
 33  
 34      def test_free_tier_via_rpd_header_flash(self):
 35          # gemini-2.5-flash free tier: 250 RPD
 36          resp = _mock_response(200, {"x-ratelimit-limit-requests-per-day": "250"}, "{}")
 37          assert _run_probe(resp) == "free"
 38  
 39      def test_free_tier_via_rpd_header_pro(self):
 40          # gemini-2.5-pro free tier: 100 RPD
 41          resp = _mock_response(200, {"x-ratelimit-limit-requests-per-day": "100"}, "{}")
 42          assert _run_probe(resp) == "free"
 43  
 44      def test_free_tier_via_rpd_header_flash_lite(self):
 45          # flash-lite free tier: 1000 RPD (our upper bound)
 46          resp = _mock_response(200, {"x-ratelimit-limit-requests-per-day": "1000"}, "{}")
 47          assert _run_probe(resp) == "free"
 48  
 49      def test_paid_tier_via_rpd_header(self):
 50          # Tier 1 starts at 1500+ RPD
 51          resp = _mock_response(200, {"x-ratelimit-limit-requests-per-day": "1500"}, "{}")
 52          assert _run_probe(resp) == "paid"
 53  
 54      def test_free_tier_via_429_body(self):
 55          body = (
 56              '{"error":{"code":429,"message":"Quota exceeded for metric: '
 57              'generativelanguage.googleapis.com/generate_content_free_tier_requests, '
 58              'limit: 20"}}'
 59          )
 60          resp = _mock_response(429, {}, body)
 61          assert _run_probe(resp) == "free"
 62  
 63      def test_paid_429_has_no_free_tier_marker(self):
 64          body = '{"error":{"code":429,"message":"rate limited"}}'
 65          resp = _mock_response(429, {}, body)
 66          assert _run_probe(resp) == "paid"
 67  
 68      def test_successful_200_without_rpd_header_is_paid(self):
 69          resp = _mock_response(200, {}, '{"candidates":[]}')
 70          assert _run_probe(resp) == "paid"
 71  
 72      def test_401_returns_unknown(self):
 73          resp = _mock_response(401, {}, '{"error":{"code":401}}')
 74          assert _run_probe(resp) == "unknown"
 75  
 76      def test_404_returns_unknown(self):
 77          resp = _mock_response(404, {}, '{"error":{"code":404}}')
 78          assert _run_probe(resp) == "unknown"
 79  
 80      def test_network_error_returns_unknown(self):
 81          with patch(
 82              "agent.gemini_native_adapter.httpx.Client",
 83              side_effect=Exception("dns failure"),
 84          ):
 85              assert probe_gemini_tier("fake-key") == "unknown"
 86  
 87      def test_empty_key_returns_unknown(self):
 88          assert probe_gemini_tier("") == "unknown"
 89          assert probe_gemini_tier("   ") == "unknown"
 90          assert probe_gemini_tier(None) == "unknown"  # type: ignore[arg-type]
 91  
 92      def test_malformed_rpd_header_falls_through(self):
 93          # Non-integer header value shouldn't crash; 200 with no usable header -> paid.
 94          resp = _mock_response(200, {"x-ratelimit-limit-requests-per-day": "abc"}, "{}")
 95          assert _run_probe(resp) == "paid"
 96  
 97      def test_openai_compat_suffix_stripped(self):
 98          """Base URLs ending in /openai get normalized to the native endpoint."""
 99          resp = _mock_response(200, {"x-ratelimit-limit-requests-per-day": "1500"}, "{}")
100          with patch("agent.gemini_native_adapter.httpx.Client") as MC:
101              inst = MagicMock()
102              inst.post.return_value = resp
103              MC.return_value.__enter__.return_value = inst
104              probe_gemini_tier(
105                  "fake",
106                  "https://generativelanguage.googleapis.com/v1beta/openai",
107              )
108              # Verify the post URL does NOT contain /openai
109              called_url = inst.post.call_args[0][0]
110              assert "/openai/" not in called_url
111              assert called_url.endswith(":generateContent")
112  
113  
114  class TestIsFreeTierQuotaError:
115      def test_detects_free_tier_marker(self):
116          assert is_free_tier_quota_error(
117              "Quota exceeded for metric: generate_content_free_tier_requests"
118          )
119  
120      def test_case_insensitive(self):
121          assert is_free_tier_quota_error("QUOTA: FREE_TIER_REQUESTS")
122  
123      def test_no_free_tier_marker(self):
124          assert not is_free_tier_quota_error("rate limited")
125  
126      def test_empty_string(self):
127          assert not is_free_tier_quota_error("")
128  
129      def test_none(self):
130          assert not is_free_tier_quota_error(None)  # type: ignore[arg-type]
131  
132  
133  class TestGeminiHttpErrorFreeTierGuidance:
134      """gemini_http_error should append free-tier guidance for free-tier 429s."""
135  
136      class _FakeResp:
137          def __init__(self, status: int, text: str):
138              self.status_code = status
139              self.headers: dict = {}
140              self.text = text
141  
142      def test_free_tier_429_appends_guidance(self):
143          body = (
144              '{"error":{"code":429,"message":"Quota exceeded for metric: '
145              "generativelanguage.googleapis.com/generate_content_free_tier_requests, "
146              'limit: 20","status":"RESOURCE_EXHAUSTED"}}'
147          )
148          err = gemini_http_error(self._FakeResp(429, body))
149          msg = str(err)
150          assert "free tier" in msg.lower()
151          assert "aistudio.google.com/apikey" in msg
152  
153      def test_paid_429_has_no_billing_url(self):
154          body = '{"error":{"code":429,"message":"Rate limited","status":"RESOURCE_EXHAUSTED"}}'
155          err = gemini_http_error(self._FakeResp(429, body))
156          assert "aistudio.google.com/apikey" not in str(err)
157  
158      def test_non_429_has_no_billing_url(self):
159          body = '{"error":{"code":400,"message":"bad request","status":"INVALID_ARGUMENT"}}'
160          err = gemini_http_error(self._FakeResp(400, body))
161          assert "aistudio.google.com/apikey" not in str(err)
162  
163      def test_401_has_no_billing_url(self):
164          body = '{"error":{"code":401,"message":"API key invalid","status":"UNAUTHENTICATED"}}'
165          err = gemini_http_error(self._FakeResp(401, body))
166          assert "aistudio.google.com/apikey" not in str(err)