test_gemini_free_tier_gate.py
1 """Tests for Gemini free-tier detection and blocking.""" 2 from __future__ import annotations 3 4 from unittest.mock import MagicMock, patch 5 6 import pytest 7 8 from agent.gemini_native_adapter import ( 9 gemini_http_error, 10 is_free_tier_quota_error, 11 probe_gemini_tier, 12 ) 13 14 15 def _mock_response(status: int, headers: dict | None = None, text: str = "") -> MagicMock: 16 resp = MagicMock() 17 resp.status_code = status 18 resp.headers = headers or {} 19 resp.text = text 20 return resp 21 22 23 def _run_probe(resp: MagicMock) -> str: 24 with patch("agent.gemini_native_adapter.httpx.Client") as MC: 25 inst = MagicMock() 26 inst.post.return_value = resp 27 MC.return_value.__enter__.return_value = inst 28 return probe_gemini_tier("fake-key") 29 30 31 class TestProbeGeminiTier: 32 """Verify the tier probe classifies keys correctly.""" 33 34 def test_free_tier_via_rpd_header_flash(self): 35 # gemini-2.5-flash free tier: 250 RPD 36 resp = _mock_response(200, {"x-ratelimit-limit-requests-per-day": "250"}, "{}") 37 assert _run_probe(resp) == "free" 38 39 def test_free_tier_via_rpd_header_pro(self): 40 # gemini-2.5-pro free tier: 100 RPD 41 resp = _mock_response(200, {"x-ratelimit-limit-requests-per-day": "100"}, "{}") 42 assert _run_probe(resp) == "free" 43 44 def test_free_tier_via_rpd_header_flash_lite(self): 45 # flash-lite free tier: 1000 RPD (our upper bound) 46 resp = _mock_response(200, {"x-ratelimit-limit-requests-per-day": "1000"}, "{}") 47 assert _run_probe(resp) == "free" 48 49 def test_paid_tier_via_rpd_header(self): 50 # Tier 1 starts at 1500+ RPD 51 resp = _mock_response(200, {"x-ratelimit-limit-requests-per-day": "1500"}, "{}") 52 assert _run_probe(resp) == "paid" 53 54 def test_free_tier_via_429_body(self): 55 body = ( 56 '{"error":{"code":429,"message":"Quota exceeded for metric: ' 57 'generativelanguage.googleapis.com/generate_content_free_tier_requests, ' 58 'limit: 20"}}' 59 ) 60 resp = _mock_response(429, {}, body) 61 assert _run_probe(resp) == "free" 62 63 def test_paid_429_has_no_free_tier_marker(self): 64 body = '{"error":{"code":429,"message":"rate limited"}}' 65 resp = _mock_response(429, {}, body) 66 assert _run_probe(resp) == "paid" 67 68 def test_successful_200_without_rpd_header_is_paid(self): 69 resp = _mock_response(200, {}, '{"candidates":[]}') 70 assert _run_probe(resp) == "paid" 71 72 def test_401_returns_unknown(self): 73 resp = _mock_response(401, {}, '{"error":{"code":401}}') 74 assert _run_probe(resp) == "unknown" 75 76 def test_404_returns_unknown(self): 77 resp = _mock_response(404, {}, '{"error":{"code":404}}') 78 assert _run_probe(resp) == "unknown" 79 80 def test_network_error_returns_unknown(self): 81 with patch( 82 "agent.gemini_native_adapter.httpx.Client", 83 side_effect=Exception("dns failure"), 84 ): 85 assert probe_gemini_tier("fake-key") == "unknown" 86 87 def test_empty_key_returns_unknown(self): 88 assert probe_gemini_tier("") == "unknown" 89 assert probe_gemini_tier(" ") == "unknown" 90 assert probe_gemini_tier(None) == "unknown" # type: ignore[arg-type] 91 92 def test_malformed_rpd_header_falls_through(self): 93 # Non-integer header value shouldn't crash; 200 with no usable header -> paid. 94 resp = _mock_response(200, {"x-ratelimit-limit-requests-per-day": "abc"}, "{}") 95 assert _run_probe(resp) == "paid" 96 97 def test_openai_compat_suffix_stripped(self): 98 """Base URLs ending in /openai get normalized to the native endpoint.""" 99 resp = _mock_response(200, {"x-ratelimit-limit-requests-per-day": "1500"}, "{}") 100 with patch("agent.gemini_native_adapter.httpx.Client") as MC: 101 inst = MagicMock() 102 inst.post.return_value = resp 103 MC.return_value.__enter__.return_value = inst 104 probe_gemini_tier( 105 "fake", 106 "https://generativelanguage.googleapis.com/v1beta/openai", 107 ) 108 # Verify the post URL does NOT contain /openai 109 called_url = inst.post.call_args[0][0] 110 assert "/openai/" not in called_url 111 assert called_url.endswith(":generateContent") 112 113 114 class TestIsFreeTierQuotaError: 115 def test_detects_free_tier_marker(self): 116 assert is_free_tier_quota_error( 117 "Quota exceeded for metric: generate_content_free_tier_requests" 118 ) 119 120 def test_case_insensitive(self): 121 assert is_free_tier_quota_error("QUOTA: FREE_TIER_REQUESTS") 122 123 def test_no_free_tier_marker(self): 124 assert not is_free_tier_quota_error("rate limited") 125 126 def test_empty_string(self): 127 assert not is_free_tier_quota_error("") 128 129 def test_none(self): 130 assert not is_free_tier_quota_error(None) # type: ignore[arg-type] 131 132 133 class TestGeminiHttpErrorFreeTierGuidance: 134 """gemini_http_error should append free-tier guidance for free-tier 429s.""" 135 136 class _FakeResp: 137 def __init__(self, status: int, text: str): 138 self.status_code = status 139 self.headers: dict = {} 140 self.text = text 141 142 def test_free_tier_429_appends_guidance(self): 143 body = ( 144 '{"error":{"code":429,"message":"Quota exceeded for metric: ' 145 "generativelanguage.googleapis.com/generate_content_free_tier_requests, " 146 'limit: 20","status":"RESOURCE_EXHAUSTED"}}' 147 ) 148 err = gemini_http_error(self._FakeResp(429, body)) 149 msg = str(err) 150 assert "free tier" in msg.lower() 151 assert "aistudio.google.com/apikey" in msg 152 153 def test_paid_429_has_no_billing_url(self): 154 body = '{"error":{"code":429,"message":"Rate limited","status":"RESOURCE_EXHAUSTED"}}' 155 err = gemini_http_error(self._FakeResp(429, body)) 156 assert "aistudio.google.com/apikey" not in str(err) 157 158 def test_non_429_has_no_billing_url(self): 159 body = '{"error":{"code":400,"message":"bad request","status":"INVALID_ARGUMENT"}}' 160 err = gemini_http_error(self._FakeResp(400, body)) 161 assert "aistudio.google.com/apikey" not in str(err) 162 163 def test_401_has_no_billing_url(self): 164 body = '{"error":{"code":401,"message":"API key invalid","status":"UNAUTHENTICATED"}}' 165 err = gemini_http_error(self._FakeResp(401, body)) 166 assert "aistudio.google.com/apikey" not in str(err)