gemini_cloudcode_adapter.py
1 """OpenAI-compatible facade that talks to Google's Cloud Code Assist backend. 2 3 This adapter lets Hermes use the ``google-gemini-cli`` provider as if it were 4 a standard OpenAI-shaped chat completion endpoint, while the underlying HTTP 5 traffic goes to ``cloudcode-pa.googleapis.com/v1internal:{generateContent, 6 streamGenerateContent}`` with a Bearer access token obtained via OAuth PKCE. 7 8 Architecture 9 ------------ 10 - ``GeminiCloudCodeClient`` exposes ``.chat.completions.create(**kwargs)`` 11 mirroring the subset of the OpenAI SDK that ``run_agent.py`` uses. 12 - Incoming OpenAI ``messages[]`` / ``tools[]`` / ``tool_choice`` are translated 13 to Gemini's native ``contents[]`` / ``tools[].functionDeclarations`` / 14 ``toolConfig`` / ``systemInstruction`` shape. 15 - The request body is wrapped ``{project, model, user_prompt_id, request}`` 16 per Code Assist API expectations. 17 - Responses (``candidates[].content.parts[]``) are converted back to 18 OpenAI ``choices[0].message`` shape with ``content`` + ``tool_calls``. 19 - Streaming uses SSE (``?alt=sse``) and yields OpenAI-shaped delta chunks. 20 21 Attribution 22 ----------- 23 Translation semantics follow jenslys/opencode-gemini-auth (MIT) and the public 24 Gemini API docs. Request envelope shape 25 (``{project, model, user_prompt_id, request}``) is documented nowhere; it is 26 reverse-engineered from the opencode-gemini-auth and clawdbot implementations. 27 """ 28 29 from __future__ import annotations 30 31 import json 32 import logging 33 import time 34 import uuid 35 from types import SimpleNamespace 36 from typing import Any, Dict, Iterator, List, Optional 37 38 import httpx 39 40 from agent import google_oauth 41 from agent.gemini_schema import sanitize_gemini_tool_parameters 42 from agent.google_code_assist import ( 43 CODE_ASSIST_ENDPOINT, 44 CodeAssistError, 45 ProjectContext, 46 resolve_project_context, 47 ) 48 49 logger = logging.getLogger(__name__) 50 51 52 # ============================================================================= 53 # Request translation: OpenAI → Gemini 54 # ============================================================================= 55 56 _ROLE_MAP_OPENAI_TO_GEMINI = { 57 "user": "user", 58 "assistant": "model", 59 "system": "user", # handled separately via systemInstruction 60 "tool": "user", # functionResponse is wrapped in a user-role turn 61 "function": "user", 62 } 63 64 65 def _coerce_content_to_text(content: Any) -> str: 66 """OpenAI content may be str or a list of parts; reduce to plain text.""" 67 if content is None: 68 return "" 69 if isinstance(content, str): 70 return content 71 if isinstance(content, list): 72 pieces: List[str] = [] 73 for p in content: 74 if isinstance(p, str): 75 pieces.append(p) 76 elif isinstance(p, dict): 77 if p.get("type") == "text" and isinstance(p.get("text"), str): 78 pieces.append(p["text"]) 79 # Multimodal (image_url, etc.) — stub for now; log and skip 80 elif p.get("type") in ("image_url", "input_audio"): 81 logger.debug("Dropping multimodal part (not yet supported): %s", p.get("type")) 82 return "\n".join(pieces) 83 return str(content) 84 85 86 def _translate_tool_call_to_gemini(tool_call: Dict[str, Any]) -> Dict[str, Any]: 87 """OpenAI tool_call -> Gemini functionCall part.""" 88 fn = tool_call.get("function") or {} 89 args_raw = fn.get("arguments", "") 90 try: 91 args = json.loads(args_raw) if isinstance(args_raw, str) and args_raw else {} 92 except json.JSONDecodeError: 93 args = {"_raw": args_raw} 94 if not isinstance(args, dict): 95 args = {"_value": args} 96 return { 97 "functionCall": { 98 "name": fn.get("name") or "", 99 "args": args, 100 }, 101 # Sentinel signature — matches opencode-gemini-auth's approach. 102 # Without this, Code Assist rejects function calls that originated 103 # outside its own chain. 104 "thoughtSignature": "skip_thought_signature_validator", 105 } 106 107 108 def _translate_tool_result_to_gemini(message: Dict[str, Any]) -> Dict[str, Any]: 109 """OpenAI tool-role message -> Gemini functionResponse part. 110 111 The function name isn't in the OpenAI tool message directly; it must be 112 passed via the assistant message that issued the call. For simplicity we 113 look up ``name`` on the message (OpenAI SDK copies it there) or on the 114 ``tool_call_id`` cross-reference. 115 """ 116 name = str(message.get("name") or message.get("tool_call_id") or "tool") 117 content = _coerce_content_to_text(message.get("content")) 118 # Gemini expects the response as a dict under `response`. We wrap plain 119 # text in {"output": "..."}. 120 try: 121 parsed = json.loads(content) if content.strip().startswith(("{", "[")) else None 122 except json.JSONDecodeError: 123 parsed = None 124 response = parsed if isinstance(parsed, dict) else {"output": content} 125 return { 126 "functionResponse": { 127 "name": name, 128 "response": response, 129 }, 130 } 131 132 133 def _build_gemini_contents( 134 messages: List[Dict[str, Any]], 135 ) -> tuple[List[Dict[str, Any]], Optional[Dict[str, Any]]]: 136 """Convert OpenAI messages[] to Gemini contents[] + systemInstruction.""" 137 system_text_parts: List[str] = [] 138 contents: List[Dict[str, Any]] = [] 139 140 for msg in messages: 141 if not isinstance(msg, dict): 142 continue 143 role = str(msg.get("role") or "user") 144 145 if role == "system": 146 system_text_parts.append(_coerce_content_to_text(msg.get("content"))) 147 continue 148 149 # Tool result message — emit a user-role turn with functionResponse 150 if role == "tool" or role == "function": 151 contents.append({ 152 "role": "user", 153 "parts": [_translate_tool_result_to_gemini(msg)], 154 }) 155 continue 156 157 gemini_role = _ROLE_MAP_OPENAI_TO_GEMINI.get(role, "user") 158 parts: List[Dict[str, Any]] = [] 159 160 text = _coerce_content_to_text(msg.get("content")) 161 if text: 162 parts.append({"text": text}) 163 164 # Assistant messages can carry tool_calls 165 tool_calls = msg.get("tool_calls") or [] 166 if isinstance(tool_calls, list): 167 for tc in tool_calls: 168 if isinstance(tc, dict): 169 parts.append(_translate_tool_call_to_gemini(tc)) 170 171 if not parts: 172 # Gemini rejects empty parts; skip the turn entirely 173 continue 174 175 contents.append({"role": gemini_role, "parts": parts}) 176 177 system_instruction: Optional[Dict[str, Any]] = None 178 joined_system = "\n".join(p for p in system_text_parts if p).strip() 179 if joined_system: 180 system_instruction = { 181 "role": "system", 182 "parts": [{"text": joined_system}], 183 } 184 185 return contents, system_instruction 186 187 188 def _translate_tools_to_gemini(tools: Any) -> List[Dict[str, Any]]: 189 """OpenAI tools[] -> Gemini tools[].functionDeclarations[].""" 190 if not isinstance(tools, list) or not tools: 191 return [] 192 declarations: List[Dict[str, Any]] = [] 193 for t in tools: 194 if not isinstance(t, dict): 195 continue 196 fn = t.get("function") or {} 197 if not isinstance(fn, dict): 198 continue 199 name = fn.get("name") 200 if not name: 201 continue 202 decl = {"name": str(name)} 203 if fn.get("description"): 204 decl["description"] = str(fn["description"]) 205 params = fn.get("parameters") 206 if isinstance(params, dict): 207 decl["parameters"] = sanitize_gemini_tool_parameters(params) 208 declarations.append(decl) 209 if not declarations: 210 return [] 211 return [{"functionDeclarations": declarations}] 212 213 214 def _translate_tool_choice_to_gemini(tool_choice: Any) -> Optional[Dict[str, Any]]: 215 """OpenAI tool_choice -> Gemini toolConfig.functionCallingConfig.""" 216 if tool_choice is None: 217 return None 218 if isinstance(tool_choice, str): 219 if tool_choice == "auto": 220 return {"functionCallingConfig": {"mode": "AUTO"}} 221 if tool_choice == "required": 222 return {"functionCallingConfig": {"mode": "ANY"}} 223 if tool_choice == "none": 224 return {"functionCallingConfig": {"mode": "NONE"}} 225 if isinstance(tool_choice, dict): 226 fn = tool_choice.get("function") or {} 227 name = fn.get("name") 228 if name: 229 return { 230 "functionCallingConfig": { 231 "mode": "ANY", 232 "allowedFunctionNames": [str(name)], 233 }, 234 } 235 return None 236 237 238 def _normalize_thinking_config(config: Any) -> Optional[Dict[str, Any]]: 239 """Accept thinkingBudget / thinkingLevel / includeThoughts (+ snake_case).""" 240 if not isinstance(config, dict) or not config: 241 return None 242 budget = config.get("thinkingBudget", config.get("thinking_budget")) 243 level = config.get("thinkingLevel", config.get("thinking_level")) 244 include = config.get("includeThoughts", config.get("include_thoughts")) 245 normalized: Dict[str, Any] = {} 246 if isinstance(budget, (int, float)): 247 normalized["thinkingBudget"] = int(budget) 248 if isinstance(level, str) and level.strip(): 249 normalized["thinkingLevel"] = level.strip().lower() 250 if isinstance(include, bool): 251 normalized["includeThoughts"] = include 252 return normalized or None 253 254 255 def build_gemini_request( 256 *, 257 messages: List[Dict[str, Any]], 258 tools: Any = None, 259 tool_choice: Any = None, 260 temperature: Optional[float] = None, 261 max_tokens: Optional[int] = None, 262 top_p: Optional[float] = None, 263 stop: Any = None, 264 thinking_config: Any = None, 265 ) -> Dict[str, Any]: 266 """Build the inner Gemini request body (goes inside ``request`` wrapper).""" 267 contents, system_instruction = _build_gemini_contents(messages) 268 269 body: Dict[str, Any] = {"contents": contents} 270 if system_instruction is not None: 271 body["systemInstruction"] = system_instruction 272 273 gemini_tools = _translate_tools_to_gemini(tools) 274 if gemini_tools: 275 body["tools"] = gemini_tools 276 tool_cfg = _translate_tool_choice_to_gemini(tool_choice) 277 if tool_cfg is not None: 278 body["toolConfig"] = tool_cfg 279 280 generation_config: Dict[str, Any] = {} 281 if isinstance(temperature, (int, float)): 282 generation_config["temperature"] = float(temperature) 283 if isinstance(max_tokens, int) and max_tokens > 0: 284 generation_config["maxOutputTokens"] = max_tokens 285 if isinstance(top_p, (int, float)): 286 generation_config["topP"] = float(top_p) 287 if isinstance(stop, str) and stop: 288 generation_config["stopSequences"] = [stop] 289 elif isinstance(stop, list) and stop: 290 generation_config["stopSequences"] = [str(s) for s in stop if s] 291 normalized_thinking = _normalize_thinking_config(thinking_config) 292 if normalized_thinking: 293 generation_config["thinkingConfig"] = normalized_thinking 294 if generation_config: 295 body["generationConfig"] = generation_config 296 297 return body 298 299 300 def wrap_code_assist_request( 301 *, 302 project_id: str, 303 model: str, 304 inner_request: Dict[str, Any], 305 user_prompt_id: Optional[str] = None, 306 ) -> Dict[str, Any]: 307 """Wrap the inner Gemini request in the Code Assist envelope.""" 308 return { 309 "project": project_id, 310 "model": model, 311 "user_prompt_id": user_prompt_id or str(uuid.uuid4()), 312 "request": inner_request, 313 } 314 315 316 # ============================================================================= 317 # Response translation: Gemini → OpenAI 318 # ============================================================================= 319 320 def _translate_gemini_response( 321 resp: Dict[str, Any], 322 model: str, 323 ) -> SimpleNamespace: 324 """Non-streaming Gemini response -> OpenAI-shaped SimpleNamespace. 325 326 Code Assist wraps the actual Gemini response inside ``response``, so we 327 unwrap it first if present. 328 """ 329 inner = resp.get("response") if isinstance(resp.get("response"), dict) else resp 330 331 candidates = inner.get("candidates") or [] 332 if not isinstance(candidates, list) or not candidates: 333 return _empty_response(model) 334 335 cand = candidates[0] 336 content_obj = cand.get("content") if isinstance(cand, dict) else {} 337 parts = content_obj.get("parts") if isinstance(content_obj, dict) else [] 338 339 text_pieces: List[str] = [] 340 reasoning_pieces: List[str] = [] 341 tool_calls: List[SimpleNamespace] = [] 342 343 for i, part in enumerate(parts or []): 344 if not isinstance(part, dict): 345 continue 346 # Thought parts are model's internal reasoning — surface as reasoning, 347 # don't mix into content. 348 if part.get("thought") is True: 349 if isinstance(part.get("text"), str): 350 reasoning_pieces.append(part["text"]) 351 continue 352 if isinstance(part.get("text"), str): 353 text_pieces.append(part["text"]) 354 continue 355 fc = part.get("functionCall") 356 if isinstance(fc, dict) and fc.get("name"): 357 try: 358 args_str = json.dumps(fc.get("args") or {}, ensure_ascii=False) 359 except (TypeError, ValueError): 360 args_str = "{}" 361 tool_calls.append(SimpleNamespace( 362 id=f"call_{uuid.uuid4().hex[:12]}", 363 type="function", 364 index=i, 365 function=SimpleNamespace(name=str(fc["name"]), arguments=args_str), 366 )) 367 368 finish_reason = "tool_calls" if tool_calls else _map_gemini_finish_reason( 369 str(cand.get("finishReason") or "") 370 ) 371 372 usage_meta = inner.get("usageMetadata") or {} 373 usage = SimpleNamespace( 374 prompt_tokens=int(usage_meta.get("promptTokenCount") or 0), 375 completion_tokens=int(usage_meta.get("candidatesTokenCount") or 0), 376 total_tokens=int(usage_meta.get("totalTokenCount") or 0), 377 prompt_tokens_details=SimpleNamespace( 378 cached_tokens=int(usage_meta.get("cachedContentTokenCount") or 0), 379 ), 380 ) 381 382 message = SimpleNamespace( 383 role="assistant", 384 content="".join(text_pieces) if text_pieces else None, 385 tool_calls=tool_calls or None, 386 reasoning="".join(reasoning_pieces) or None, 387 reasoning_content="".join(reasoning_pieces) or None, 388 reasoning_details=None, 389 ) 390 choice = SimpleNamespace( 391 index=0, 392 message=message, 393 finish_reason=finish_reason, 394 ) 395 return SimpleNamespace( 396 id=f"chatcmpl-{uuid.uuid4().hex[:12]}", 397 object="chat.completion", 398 created=int(time.time()), 399 model=model, 400 choices=[choice], 401 usage=usage, 402 ) 403 404 405 def _empty_response(model: str) -> SimpleNamespace: 406 message = SimpleNamespace( 407 role="assistant", content="", tool_calls=None, 408 reasoning=None, reasoning_content=None, reasoning_details=None, 409 ) 410 choice = SimpleNamespace(index=0, message=message, finish_reason="stop") 411 usage = SimpleNamespace( 412 prompt_tokens=0, completion_tokens=0, total_tokens=0, 413 prompt_tokens_details=SimpleNamespace(cached_tokens=0), 414 ) 415 return SimpleNamespace( 416 id=f"chatcmpl-{uuid.uuid4().hex[:12]}", 417 object="chat.completion", 418 created=int(time.time()), 419 model=model, 420 choices=[choice], 421 usage=usage, 422 ) 423 424 425 def _map_gemini_finish_reason(reason: str) -> str: 426 mapping = { 427 "STOP": "stop", 428 "MAX_TOKENS": "length", 429 "SAFETY": "content_filter", 430 "RECITATION": "content_filter", 431 "OTHER": "stop", 432 } 433 return mapping.get(reason.upper(), "stop") 434 435 436 # ============================================================================= 437 # Streaming SSE iterator 438 # ============================================================================= 439 440 class _GeminiStreamChunk(SimpleNamespace): 441 """Mimics an OpenAI ChatCompletionChunk with .choices[0].delta.""" 442 pass 443 444 445 def _make_stream_chunk( 446 *, 447 model: str, 448 content: str = "", 449 tool_call_delta: Optional[Dict[str, Any]] = None, 450 finish_reason: Optional[str] = None, 451 reasoning: str = "", 452 ) -> _GeminiStreamChunk: 453 delta_kwargs: Dict[str, Any] = {"role": "assistant"} 454 if content: 455 delta_kwargs["content"] = content 456 if tool_call_delta is not None: 457 delta_kwargs["tool_calls"] = [SimpleNamespace( 458 index=tool_call_delta.get("index", 0), 459 id=tool_call_delta.get("id") or f"call_{uuid.uuid4().hex[:12]}", 460 type="function", 461 function=SimpleNamespace( 462 name=tool_call_delta.get("name") or "", 463 arguments=tool_call_delta.get("arguments") or "", 464 ), 465 )] 466 if reasoning: 467 delta_kwargs["reasoning"] = reasoning 468 delta_kwargs["reasoning_content"] = reasoning 469 delta = SimpleNamespace(**delta_kwargs) 470 choice = SimpleNamespace(index=0, delta=delta, finish_reason=finish_reason) 471 return _GeminiStreamChunk( 472 id=f"chatcmpl-{uuid.uuid4().hex[:12]}", 473 object="chat.completion.chunk", 474 created=int(time.time()), 475 model=model, 476 choices=[choice], 477 usage=None, 478 ) 479 480 481 def _iter_sse_events(response: httpx.Response) -> Iterator[Dict[str, Any]]: 482 """Parse Server-Sent Events from an httpx streaming response.""" 483 buffer = "" 484 for chunk in response.iter_text(): 485 if not chunk: 486 continue 487 buffer += chunk 488 while "\n" in buffer: 489 line, buffer = buffer.split("\n", 1) 490 line = line.rstrip("\r") 491 if not line: 492 continue 493 if line.startswith("data: "): 494 data = line[6:] 495 if data == "[DONE]": 496 return 497 try: 498 yield json.loads(data) 499 except json.JSONDecodeError: 500 logger.debug("Non-JSON SSE line: %s", data[:200]) 501 502 503 def _translate_stream_event( 504 event: Dict[str, Any], 505 model: str, 506 tool_call_counter: List[int], 507 ) -> List[_GeminiStreamChunk]: 508 """Unwrap Code Assist envelope and emit OpenAI-shaped chunk(s). 509 510 ``tool_call_counter`` is a single-element list used as a mutable counter 511 across events in the same stream. Each ``functionCall`` part gets a 512 fresh, unique OpenAI ``index`` — keying by function name would collide 513 whenever the model issues parallel calls to the same tool (e.g. reading 514 three files in one turn). 515 """ 516 inner = event.get("response") if isinstance(event.get("response"), dict) else event 517 candidates = inner.get("candidates") or [] 518 if not candidates: 519 return [] 520 cand = candidates[0] 521 if not isinstance(cand, dict): 522 return [] 523 524 chunks: List[_GeminiStreamChunk] = [] 525 526 content = cand.get("content") or {} 527 parts = content.get("parts") if isinstance(content, dict) else [] 528 for part in parts or []: 529 if not isinstance(part, dict): 530 continue 531 if part.get("thought") is True and isinstance(part.get("text"), str): 532 chunks.append(_make_stream_chunk( 533 model=model, reasoning=part["text"], 534 )) 535 continue 536 if isinstance(part.get("text"), str) and part["text"]: 537 chunks.append(_make_stream_chunk(model=model, content=part["text"])) 538 fc = part.get("functionCall") 539 if isinstance(fc, dict) and fc.get("name"): 540 name = str(fc["name"]) 541 idx = tool_call_counter[0] 542 tool_call_counter[0] += 1 543 try: 544 args_str = json.dumps(fc.get("args") or {}, ensure_ascii=False) 545 except (TypeError, ValueError): 546 args_str = "{}" 547 chunks.append(_make_stream_chunk( 548 model=model, 549 tool_call_delta={ 550 "index": idx, 551 "name": name, 552 "arguments": args_str, 553 }, 554 )) 555 556 finish_reason_raw = str(cand.get("finishReason") or "") 557 if finish_reason_raw: 558 mapped = _map_gemini_finish_reason(finish_reason_raw) 559 if tool_call_counter[0] > 0: 560 mapped = "tool_calls" 561 chunks.append(_make_stream_chunk(model=model, finish_reason=mapped)) 562 return chunks 563 564 565 # ============================================================================= 566 # GeminiCloudCodeClient — OpenAI-compatible facade 567 # ============================================================================= 568 569 MARKER_BASE_URL = "cloudcode-pa://google" 570 571 572 class _GeminiChatCompletions: 573 def __init__(self, client: "GeminiCloudCodeClient"): 574 self._client = client 575 576 def create(self, **kwargs: Any) -> Any: 577 return self._client._create_chat_completion(**kwargs) 578 579 580 class _GeminiChatNamespace: 581 def __init__(self, client: "GeminiCloudCodeClient"): 582 self.completions = _GeminiChatCompletions(client) 583 584 585 class GeminiCloudCodeClient: 586 """Minimal OpenAI-SDK-compatible facade over Code Assist v1internal.""" 587 588 def __init__( 589 self, 590 *, 591 api_key: Optional[str] = None, 592 base_url: Optional[str] = None, 593 default_headers: Optional[Dict[str, str]] = None, 594 project_id: str = "", 595 **_: Any, 596 ): 597 # `api_key` here is a dummy — real auth is the OAuth access token 598 # fetched on every call via agent.google_oauth.get_valid_access_token(). 599 # We accept the kwarg for openai.OpenAI interface parity. 600 self.api_key = api_key or "google-oauth" 601 self.base_url = base_url or MARKER_BASE_URL 602 self._default_headers = dict(default_headers or {}) 603 self._configured_project_id = project_id 604 self._project_context: Optional[ProjectContext] = None 605 self._project_context_lock = False # simple single-thread guard 606 self.chat = _GeminiChatNamespace(self) 607 self.is_closed = False 608 self._http = httpx.Client(timeout=httpx.Timeout(connect=15.0, read=600.0, write=30.0, pool=30.0)) 609 610 def close(self) -> None: 611 self.is_closed = True 612 try: 613 self._http.close() 614 except Exception: 615 pass 616 617 # Implement the OpenAI SDK's context-manager-ish closure check 618 def __enter__(self): 619 return self 620 621 def __exit__(self, exc_type, exc_val, exc_tb): 622 self.close() 623 624 def _ensure_project_context(self, access_token: str, model: str) -> ProjectContext: 625 """Lazily resolve and cache the project context for this client.""" 626 if self._project_context is not None: 627 return self._project_context 628 629 env_project = google_oauth.resolve_project_id_from_env() 630 creds = google_oauth.load_credentials() 631 stored_project = creds.project_id if creds else "" 632 633 # Prefer what's already baked into the creds 634 if stored_project: 635 self._project_context = ProjectContext( 636 project_id=stored_project, 637 managed_project_id=creds.managed_project_id if creds else "", 638 tier_id="", 639 source="stored", 640 ) 641 return self._project_context 642 643 ctx = resolve_project_context( 644 access_token, 645 configured_project_id=self._configured_project_id, 646 env_project_id=env_project, 647 user_agent_model=model, 648 ) 649 # Persist discovered project back to the creds file so the next 650 # session doesn't re-run the discovery. 651 if ctx.project_id or ctx.managed_project_id: 652 google_oauth.update_project_ids( 653 project_id=ctx.project_id, 654 managed_project_id=ctx.managed_project_id, 655 ) 656 self._project_context = ctx 657 return ctx 658 659 def _create_chat_completion( 660 self, 661 *, 662 model: str = "gemini-2.5-flash", 663 messages: Optional[List[Dict[str, Any]]] = None, 664 stream: bool = False, 665 tools: Any = None, 666 tool_choice: Any = None, 667 temperature: Optional[float] = None, 668 max_tokens: Optional[int] = None, 669 top_p: Optional[float] = None, 670 stop: Any = None, 671 extra_body: Optional[Dict[str, Any]] = None, 672 timeout: Any = None, 673 **_: Any, 674 ) -> Any: 675 access_token = google_oauth.get_valid_access_token() 676 ctx = self._ensure_project_context(access_token, model) 677 678 thinking_config = None 679 if isinstance(extra_body, dict): 680 thinking_config = extra_body.get("thinking_config") or extra_body.get("thinkingConfig") 681 682 inner = build_gemini_request( 683 messages=messages or [], 684 tools=tools, 685 tool_choice=tool_choice, 686 temperature=temperature, 687 max_tokens=max_tokens, 688 top_p=top_p, 689 stop=stop, 690 thinking_config=thinking_config, 691 ) 692 wrapped = wrap_code_assist_request( 693 project_id=ctx.project_id, 694 model=model, 695 inner_request=inner, 696 ) 697 698 headers = { 699 "Content-Type": "application/json", 700 "Accept": "application/json", 701 "Authorization": f"Bearer {access_token}", 702 "User-Agent": "hermes-agent (gemini-cli-compat)", 703 "X-Goog-Api-Client": "gl-python/hermes", 704 "x-activity-request-id": str(uuid.uuid4()), 705 } 706 headers.update(self._default_headers) 707 708 if stream: 709 return self._stream_completion(model=model, wrapped=wrapped, headers=headers) 710 711 url = f"{CODE_ASSIST_ENDPOINT}/v1internal:generateContent" 712 response = self._http.post(url, json=wrapped, headers=headers) 713 if response.status_code != 200: 714 raise _gemini_http_error(response) 715 try: 716 payload = response.json() 717 except ValueError as exc: 718 raise CodeAssistError( 719 f"Invalid JSON from Code Assist: {exc}", 720 code="code_assist_invalid_json", 721 ) from exc 722 return _translate_gemini_response(payload, model=model) 723 724 def _stream_completion( 725 self, 726 *, 727 model: str, 728 wrapped: Dict[str, Any], 729 headers: Dict[str, str], 730 ) -> Iterator[_GeminiStreamChunk]: 731 """Generator that yields OpenAI-shaped streaming chunks.""" 732 url = f"{CODE_ASSIST_ENDPOINT}/v1internal:streamGenerateContent?alt=sse" 733 stream_headers = dict(headers) 734 stream_headers["Accept"] = "text/event-stream" 735 736 def _generator() -> Iterator[_GeminiStreamChunk]: 737 try: 738 with self._http.stream("POST", url, json=wrapped, headers=stream_headers) as response: 739 if response.status_code != 200: 740 # Materialize error body for better diagnostics 741 response.read() 742 raise _gemini_http_error(response) 743 tool_call_counter: List[int] = [0] 744 for event in _iter_sse_events(response): 745 for chunk in _translate_stream_event(event, model, tool_call_counter): 746 yield chunk 747 except httpx.HTTPError as exc: 748 raise CodeAssistError( 749 f"Streaming request failed: {exc}", 750 code="code_assist_stream_error", 751 ) from exc 752 753 return _generator() 754 755 756 def _gemini_http_error(response: httpx.Response) -> CodeAssistError: 757 """Translate an httpx response into a CodeAssistError with rich metadata. 758 759 Parses Google's error envelope (``{"error": {"code", "message", "status", 760 "details": [...]}}``) so the agent's error classifier can reason about 761 the failure — ``status_code`` enables the rate_limit / auth classification 762 paths, and ``response`` lets the main loop honor ``Retry-After`` just 763 like it does for OpenAI SDK exceptions. 764 765 Also lifts a few recognizable Google conditions into human-readable 766 messages so the user sees something better than a 500-char JSON dump: 767 768 MODEL_CAPACITY_EXHAUSTED → "Gemini model capacity exhausted for 769 <model>. This is a Google-side throttle..." 770 RESOURCE_EXHAUSTED w/o reason → quota-style message 771 404 → "Model <name> not found at cloudcode-pa..." 772 """ 773 status = response.status_code 774 775 # Parse the body once, surviving any weird encodings. 776 body_text = "" 777 body_json: Dict[str, Any] = {} 778 try: 779 body_text = response.text 780 except Exception: 781 body_text = "" 782 if body_text: 783 try: 784 parsed = json.loads(body_text) 785 if isinstance(parsed, dict): 786 body_json = parsed 787 except (ValueError, TypeError): 788 body_json = {} 789 790 # Dig into Google's error envelope. Shape is: 791 # {"error": {"code": 429, "message": "...", "status": "RESOURCE_EXHAUSTED", 792 # "details": [{"@type": ".../ErrorInfo", "reason": "MODEL_CAPACITY_EXHAUSTED", 793 # "metadata": {...}}, 794 # {"@type": ".../RetryInfo", "retryDelay": "30s"}]}} 795 err_obj = body_json.get("error") if isinstance(body_json, dict) else None 796 if not isinstance(err_obj, dict): 797 err_obj = {} 798 err_status = str(err_obj.get("status") or "").strip() 799 err_message = str(err_obj.get("message") or "").strip() 800 _raw_details = err_obj.get("details") 801 err_details_list = _raw_details if isinstance(_raw_details, list) else [] 802 803 # Extract google.rpc.ErrorInfo reason + metadata. There may be more 804 # than one ErrorInfo (rare), so we pick the first one with a reason. 805 error_reason = "" 806 error_metadata: Dict[str, Any] = {} 807 retry_delay_seconds: Optional[float] = None 808 for detail in err_details_list: 809 if not isinstance(detail, dict): 810 continue 811 type_url = str(detail.get("@type") or "") 812 if not error_reason and type_url.endswith("/google.rpc.ErrorInfo"): 813 reason = detail.get("reason") 814 if isinstance(reason, str) and reason: 815 error_reason = reason 816 md = detail.get("metadata") 817 if isinstance(md, dict): 818 error_metadata = md 819 elif retry_delay_seconds is None and type_url.endswith("/google.rpc.RetryInfo"): 820 # retryDelay is a google.protobuf.Duration string like "30s" or "1.5s". 821 delay_raw = detail.get("retryDelay") 822 if isinstance(delay_raw, str) and delay_raw.endswith("s"): 823 try: 824 retry_delay_seconds = float(delay_raw[:-1]) 825 except ValueError: 826 pass 827 elif isinstance(delay_raw, (int, float)): 828 retry_delay_seconds = float(delay_raw) 829 830 # Fall back to the Retry-After header if the body didn't include RetryInfo. 831 if retry_delay_seconds is None: 832 try: 833 header_val = response.headers.get("Retry-After") or response.headers.get("retry-after") 834 except Exception: 835 header_val = None 836 if header_val: 837 try: 838 retry_delay_seconds = float(header_val) 839 except (TypeError, ValueError): 840 retry_delay_seconds = None 841 842 # Classify the error code. ``code_assist_rate_limited`` stays the default 843 # for 429s; a more specific reason tag helps downstream callers (e.g. tests, 844 # logs) without changing the rate_limit classification path. 845 code = f"code_assist_http_{status}" 846 if status == 401: 847 code = "code_assist_unauthorized" 848 elif status == 429: 849 code = "code_assist_rate_limited" 850 if error_reason == "MODEL_CAPACITY_EXHAUSTED": 851 code = "code_assist_capacity_exhausted" 852 853 # Build a human-readable message. Keep the status + a raw-body tail for 854 # debugging, but lead with a friendlier summary when we recognize the 855 # Google signal. 856 model_hint = "" 857 if isinstance(error_metadata, dict): 858 model_hint = str(error_metadata.get("model") or error_metadata.get("modelId") or "").strip() 859 860 if status == 429 and error_reason == "MODEL_CAPACITY_EXHAUSTED": 861 target = model_hint or "this Gemini model" 862 message = ( 863 f"Gemini capacity exhausted for {target} (Google-side throttle, " 864 f"not a Hermes issue). Try a different Gemini model or set a " 865 f"fallback_providers entry to a non-Gemini provider." 866 ) 867 if retry_delay_seconds is not None: 868 message += f" Google suggests retrying in {retry_delay_seconds:g}s." 869 elif status == 429 and err_status == "RESOURCE_EXHAUSTED": 870 message = ( 871 f"Gemini quota exhausted ({err_message or 'RESOURCE_EXHAUSTED'}). " 872 f"Check /gquota for remaining daily requests." 873 ) 874 if retry_delay_seconds is not None: 875 message += f" Retry suggested in {retry_delay_seconds:g}s." 876 elif status == 404: 877 # Google returns 404 when a model has been retired or renamed. 878 target = model_hint or (err_message or "model") 879 message = ( 880 f"Code Assist 404: {target} is not available at " 881 f"cloudcode-pa.googleapis.com. It may have been renamed or " 882 f"retired. Check hermes_cli/models.py for the current list." 883 ) 884 elif err_message: 885 # Generic fallback with the parsed message. 886 message = f"Code Assist HTTP {status} ({err_status or 'error'}): {err_message}" 887 else: 888 # Last-ditch fallback — raw body snippet. 889 message = f"Code Assist returned HTTP {status}: {body_text[:500]}" 890 891 return CodeAssistError( 892 message, 893 code=code, 894 status_code=status, 895 response=response, 896 retry_after=retry_delay_seconds, 897 details={ 898 "status": err_status, 899 "reason": error_reason, 900 "metadata": error_metadata, 901 "message": err_message, 902 }, 903 )