/ agent / gemini_cloudcode_adapter.py
gemini_cloudcode_adapter.py
  1  """OpenAI-compatible facade that talks to Google's Cloud Code Assist backend.
  2  
  3  This adapter lets Hermes use the ``google-gemini-cli`` provider as if it were
  4  a standard OpenAI-shaped chat completion endpoint, while the underlying HTTP
  5  traffic goes to ``cloudcode-pa.googleapis.com/v1internal:{generateContent,
  6  streamGenerateContent}`` with a Bearer access token obtained via OAuth PKCE.
  7  
  8  Architecture
  9  ------------
 10  - ``GeminiCloudCodeClient`` exposes ``.chat.completions.create(**kwargs)``
 11    mirroring the subset of the OpenAI SDK that ``run_agent.py`` uses.
 12  - Incoming OpenAI ``messages[]`` / ``tools[]`` / ``tool_choice`` are translated
 13    to Gemini's native ``contents[]`` / ``tools[].functionDeclarations`` /
 14    ``toolConfig`` / ``systemInstruction`` shape.
 15  - The request body is wrapped ``{project, model, user_prompt_id, request}``
 16    per Code Assist API expectations.
 17  - Responses (``candidates[].content.parts[]``) are converted back to
 18    OpenAI ``choices[0].message`` shape with ``content`` + ``tool_calls``.
 19  - Streaming uses SSE (``?alt=sse``) and yields OpenAI-shaped delta chunks.
 20  
 21  Attribution
 22  -----------
 23  Translation semantics follow jenslys/opencode-gemini-auth (MIT) and the public
 24  Gemini API docs. Request envelope shape
 25  (``{project, model, user_prompt_id, request}``) is documented nowhere; it is
 26  reverse-engineered from the opencode-gemini-auth and clawdbot implementations.
 27  """
 28  
 29  from __future__ import annotations
 30  
 31  import json
 32  import logging
 33  import time
 34  import uuid
 35  from types import SimpleNamespace
 36  from typing import Any, Dict, Iterator, List, Optional
 37  
 38  import httpx
 39  
 40  from agent import google_oauth
 41  from agent.gemini_schema import sanitize_gemini_tool_parameters
 42  from agent.google_code_assist import (
 43      CODE_ASSIST_ENDPOINT,
 44      CodeAssistError,
 45      ProjectContext,
 46      resolve_project_context,
 47  )
 48  
 49  logger = logging.getLogger(__name__)
 50  
 51  
 52  # =============================================================================
 53  # Request translation: OpenAI → Gemini
 54  # =============================================================================
 55  
 56  _ROLE_MAP_OPENAI_TO_GEMINI = {
 57      "user": "user",
 58      "assistant": "model",
 59      "system": "user",   # handled separately via systemInstruction
 60      "tool": "user",     # functionResponse is wrapped in a user-role turn
 61      "function": "user",
 62  }
 63  
 64  
 65  def _coerce_content_to_text(content: Any) -> str:
 66      """OpenAI content may be str or a list of parts; reduce to plain text."""
 67      if content is None:
 68          return ""
 69      if isinstance(content, str):
 70          return content
 71      if isinstance(content, list):
 72          pieces: List[str] = []
 73          for p in content:
 74              if isinstance(p, str):
 75                  pieces.append(p)
 76              elif isinstance(p, dict):
 77                  if p.get("type") == "text" and isinstance(p.get("text"), str):
 78                      pieces.append(p["text"])
 79                  # Multimodal (image_url, etc.) — stub for now; log and skip
 80                  elif p.get("type") in ("image_url", "input_audio"):
 81                      logger.debug("Dropping multimodal part (not yet supported): %s", p.get("type"))
 82          return "\n".join(pieces)
 83      return str(content)
 84  
 85  
 86  def _translate_tool_call_to_gemini(tool_call: Dict[str, Any]) -> Dict[str, Any]:
 87      """OpenAI tool_call -> Gemini functionCall part."""
 88      fn = tool_call.get("function") or {}
 89      args_raw = fn.get("arguments", "")
 90      try:
 91          args = json.loads(args_raw) if isinstance(args_raw, str) and args_raw else {}
 92      except json.JSONDecodeError:
 93          args = {"_raw": args_raw}
 94      if not isinstance(args, dict):
 95          args = {"_value": args}
 96      return {
 97          "functionCall": {
 98              "name": fn.get("name") or "",
 99              "args": args,
100          },
101          # Sentinel signature — matches opencode-gemini-auth's approach.
102          # Without this, Code Assist rejects function calls that originated
103          # outside its own chain.
104          "thoughtSignature": "skip_thought_signature_validator",
105      }
106  
107  
108  def _translate_tool_result_to_gemini(message: Dict[str, Any]) -> Dict[str, Any]:
109      """OpenAI tool-role message -> Gemini functionResponse part.
110  
111      The function name isn't in the OpenAI tool message directly; it must be
112      passed via the assistant message that issued the call. For simplicity we
113      look up ``name`` on the message (OpenAI SDK copies it there) or on the
114      ``tool_call_id`` cross-reference.
115      """
116      name = str(message.get("name") or message.get("tool_call_id") or "tool")
117      content = _coerce_content_to_text(message.get("content"))
118      # Gemini expects the response as a dict under `response`. We wrap plain
119      # text in {"output": "..."}.
120      try:
121          parsed = json.loads(content) if content.strip().startswith(("{", "[")) else None
122      except json.JSONDecodeError:
123          parsed = None
124      response = parsed if isinstance(parsed, dict) else {"output": content}
125      return {
126          "functionResponse": {
127              "name": name,
128              "response": response,
129          },
130      }
131  
132  
133  def _build_gemini_contents(
134      messages: List[Dict[str, Any]],
135  ) -> tuple[List[Dict[str, Any]], Optional[Dict[str, Any]]]:
136      """Convert OpenAI messages[] to Gemini contents[] + systemInstruction."""
137      system_text_parts: List[str] = []
138      contents: List[Dict[str, Any]] = []
139  
140      for msg in messages:
141          if not isinstance(msg, dict):
142              continue
143          role = str(msg.get("role") or "user")
144  
145          if role == "system":
146              system_text_parts.append(_coerce_content_to_text(msg.get("content")))
147              continue
148  
149          # Tool result message — emit a user-role turn with functionResponse
150          if role == "tool" or role == "function":
151              contents.append({
152                  "role": "user",
153                  "parts": [_translate_tool_result_to_gemini(msg)],
154              })
155              continue
156  
157          gemini_role = _ROLE_MAP_OPENAI_TO_GEMINI.get(role, "user")
158          parts: List[Dict[str, Any]] = []
159  
160          text = _coerce_content_to_text(msg.get("content"))
161          if text:
162              parts.append({"text": text})
163  
164          # Assistant messages can carry tool_calls
165          tool_calls = msg.get("tool_calls") or []
166          if isinstance(tool_calls, list):
167              for tc in tool_calls:
168                  if isinstance(tc, dict):
169                      parts.append(_translate_tool_call_to_gemini(tc))
170  
171          if not parts:
172              # Gemini rejects empty parts; skip the turn entirely
173              continue
174  
175          contents.append({"role": gemini_role, "parts": parts})
176  
177      system_instruction: Optional[Dict[str, Any]] = None
178      joined_system = "\n".join(p for p in system_text_parts if p).strip()
179      if joined_system:
180          system_instruction = {
181              "role": "system",
182              "parts": [{"text": joined_system}],
183          }
184  
185      return contents, system_instruction
186  
187  
188  def _translate_tools_to_gemini(tools: Any) -> List[Dict[str, Any]]:
189      """OpenAI tools[] -> Gemini tools[].functionDeclarations[]."""
190      if not isinstance(tools, list) or not tools:
191          return []
192      declarations: List[Dict[str, Any]] = []
193      for t in tools:
194          if not isinstance(t, dict):
195              continue
196          fn = t.get("function") or {}
197          if not isinstance(fn, dict):
198              continue
199          name = fn.get("name")
200          if not name:
201              continue
202          decl = {"name": str(name)}
203          if fn.get("description"):
204              decl["description"] = str(fn["description"])
205          params = fn.get("parameters")
206          if isinstance(params, dict):
207              decl["parameters"] = sanitize_gemini_tool_parameters(params)
208          declarations.append(decl)
209      if not declarations:
210          return []
211      return [{"functionDeclarations": declarations}]
212  
213  
214  def _translate_tool_choice_to_gemini(tool_choice: Any) -> Optional[Dict[str, Any]]:
215      """OpenAI tool_choice -> Gemini toolConfig.functionCallingConfig."""
216      if tool_choice is None:
217          return None
218      if isinstance(tool_choice, str):
219          if tool_choice == "auto":
220              return {"functionCallingConfig": {"mode": "AUTO"}}
221          if tool_choice == "required":
222              return {"functionCallingConfig": {"mode": "ANY"}}
223          if tool_choice == "none":
224              return {"functionCallingConfig": {"mode": "NONE"}}
225      if isinstance(tool_choice, dict):
226          fn = tool_choice.get("function") or {}
227          name = fn.get("name")
228          if name:
229              return {
230                  "functionCallingConfig": {
231                      "mode": "ANY",
232                      "allowedFunctionNames": [str(name)],
233                  },
234              }
235      return None
236  
237  
238  def _normalize_thinking_config(config: Any) -> Optional[Dict[str, Any]]:
239      """Accept thinkingBudget / thinkingLevel / includeThoughts (+ snake_case)."""
240      if not isinstance(config, dict) or not config:
241          return None
242      budget = config.get("thinkingBudget", config.get("thinking_budget"))
243      level = config.get("thinkingLevel", config.get("thinking_level"))
244      include = config.get("includeThoughts", config.get("include_thoughts"))
245      normalized: Dict[str, Any] = {}
246      if isinstance(budget, (int, float)):
247          normalized["thinkingBudget"] = int(budget)
248      if isinstance(level, str) and level.strip():
249          normalized["thinkingLevel"] = level.strip().lower()
250      if isinstance(include, bool):
251          normalized["includeThoughts"] = include
252      return normalized or None
253  
254  
255  def build_gemini_request(
256      *,
257      messages: List[Dict[str, Any]],
258      tools: Any = None,
259      tool_choice: Any = None,
260      temperature: Optional[float] = None,
261      max_tokens: Optional[int] = None,
262      top_p: Optional[float] = None,
263      stop: Any = None,
264      thinking_config: Any = None,
265  ) -> Dict[str, Any]:
266      """Build the inner Gemini request body (goes inside ``request`` wrapper)."""
267      contents, system_instruction = _build_gemini_contents(messages)
268  
269      body: Dict[str, Any] = {"contents": contents}
270      if system_instruction is not None:
271          body["systemInstruction"] = system_instruction
272  
273      gemini_tools = _translate_tools_to_gemini(tools)
274      if gemini_tools:
275          body["tools"] = gemini_tools
276      tool_cfg = _translate_tool_choice_to_gemini(tool_choice)
277      if tool_cfg is not None:
278          body["toolConfig"] = tool_cfg
279  
280      generation_config: Dict[str, Any] = {}
281      if isinstance(temperature, (int, float)):
282          generation_config["temperature"] = float(temperature)
283      if isinstance(max_tokens, int) and max_tokens > 0:
284          generation_config["maxOutputTokens"] = max_tokens
285      if isinstance(top_p, (int, float)):
286          generation_config["topP"] = float(top_p)
287      if isinstance(stop, str) and stop:
288          generation_config["stopSequences"] = [stop]
289      elif isinstance(stop, list) and stop:
290          generation_config["stopSequences"] = [str(s) for s in stop if s]
291      normalized_thinking = _normalize_thinking_config(thinking_config)
292      if normalized_thinking:
293          generation_config["thinkingConfig"] = normalized_thinking
294      if generation_config:
295          body["generationConfig"] = generation_config
296  
297      return body
298  
299  
300  def wrap_code_assist_request(
301      *,
302      project_id: str,
303      model: str,
304      inner_request: Dict[str, Any],
305      user_prompt_id: Optional[str] = None,
306  ) -> Dict[str, Any]:
307      """Wrap the inner Gemini request in the Code Assist envelope."""
308      return {
309          "project": project_id,
310          "model": model,
311          "user_prompt_id": user_prompt_id or str(uuid.uuid4()),
312          "request": inner_request,
313      }
314  
315  
316  # =============================================================================
317  # Response translation: Gemini → OpenAI
318  # =============================================================================
319  
320  def _translate_gemini_response(
321      resp: Dict[str, Any],
322      model: str,
323  ) -> SimpleNamespace:
324      """Non-streaming Gemini response -> OpenAI-shaped SimpleNamespace.
325  
326      Code Assist wraps the actual Gemini response inside ``response``, so we
327      unwrap it first if present.
328      """
329      inner = resp.get("response") if isinstance(resp.get("response"), dict) else resp
330  
331      candidates = inner.get("candidates") or []
332      if not isinstance(candidates, list) or not candidates:
333          return _empty_response(model)
334  
335      cand = candidates[0]
336      content_obj = cand.get("content") if isinstance(cand, dict) else {}
337      parts = content_obj.get("parts") if isinstance(content_obj, dict) else []
338  
339      text_pieces: List[str] = []
340      reasoning_pieces: List[str] = []
341      tool_calls: List[SimpleNamespace] = []
342  
343      for i, part in enumerate(parts or []):
344          if not isinstance(part, dict):
345              continue
346          # Thought parts are model's internal reasoning — surface as reasoning,
347          # don't mix into content.
348          if part.get("thought") is True:
349              if isinstance(part.get("text"), str):
350                  reasoning_pieces.append(part["text"])
351              continue
352          if isinstance(part.get("text"), str):
353              text_pieces.append(part["text"])
354              continue
355          fc = part.get("functionCall")
356          if isinstance(fc, dict) and fc.get("name"):
357              try:
358                  args_str = json.dumps(fc.get("args") or {}, ensure_ascii=False)
359              except (TypeError, ValueError):
360                  args_str = "{}"
361              tool_calls.append(SimpleNamespace(
362                  id=f"call_{uuid.uuid4().hex[:12]}",
363                  type="function",
364                  index=i,
365                  function=SimpleNamespace(name=str(fc["name"]), arguments=args_str),
366              ))
367  
368      finish_reason = "tool_calls" if tool_calls else _map_gemini_finish_reason(
369          str(cand.get("finishReason") or "")
370      )
371  
372      usage_meta = inner.get("usageMetadata") or {}
373      usage = SimpleNamespace(
374          prompt_tokens=int(usage_meta.get("promptTokenCount") or 0),
375          completion_tokens=int(usage_meta.get("candidatesTokenCount") or 0),
376          total_tokens=int(usage_meta.get("totalTokenCount") or 0),
377          prompt_tokens_details=SimpleNamespace(
378              cached_tokens=int(usage_meta.get("cachedContentTokenCount") or 0),
379          ),
380      )
381  
382      message = SimpleNamespace(
383          role="assistant",
384          content="".join(text_pieces) if text_pieces else None,
385          tool_calls=tool_calls or None,
386          reasoning="".join(reasoning_pieces) or None,
387          reasoning_content="".join(reasoning_pieces) or None,
388          reasoning_details=None,
389      )
390      choice = SimpleNamespace(
391          index=0,
392          message=message,
393          finish_reason=finish_reason,
394      )
395      return SimpleNamespace(
396          id=f"chatcmpl-{uuid.uuid4().hex[:12]}",
397          object="chat.completion",
398          created=int(time.time()),
399          model=model,
400          choices=[choice],
401          usage=usage,
402      )
403  
404  
405  def _empty_response(model: str) -> SimpleNamespace:
406      message = SimpleNamespace(
407          role="assistant", content="", tool_calls=None,
408          reasoning=None, reasoning_content=None, reasoning_details=None,
409      )
410      choice = SimpleNamespace(index=0, message=message, finish_reason="stop")
411      usage = SimpleNamespace(
412          prompt_tokens=0, completion_tokens=0, total_tokens=0,
413          prompt_tokens_details=SimpleNamespace(cached_tokens=0),
414      )
415      return SimpleNamespace(
416          id=f"chatcmpl-{uuid.uuid4().hex[:12]}",
417          object="chat.completion",
418          created=int(time.time()),
419          model=model,
420          choices=[choice],
421          usage=usage,
422      )
423  
424  
425  def _map_gemini_finish_reason(reason: str) -> str:
426      mapping = {
427          "STOP": "stop",
428          "MAX_TOKENS": "length",
429          "SAFETY": "content_filter",
430          "RECITATION": "content_filter",
431          "OTHER": "stop",
432      }
433      return mapping.get(reason.upper(), "stop")
434  
435  
436  # =============================================================================
437  # Streaming SSE iterator
438  # =============================================================================
439  
440  class _GeminiStreamChunk(SimpleNamespace):
441      """Mimics an OpenAI ChatCompletionChunk with .choices[0].delta."""
442      pass
443  
444  
445  def _make_stream_chunk(
446      *,
447      model: str,
448      content: str = "",
449      tool_call_delta: Optional[Dict[str, Any]] = None,
450      finish_reason: Optional[str] = None,
451      reasoning: str = "",
452  ) -> _GeminiStreamChunk:
453      delta_kwargs: Dict[str, Any] = {"role": "assistant"}
454      if content:
455          delta_kwargs["content"] = content
456      if tool_call_delta is not None:
457          delta_kwargs["tool_calls"] = [SimpleNamespace(
458              index=tool_call_delta.get("index", 0),
459              id=tool_call_delta.get("id") or f"call_{uuid.uuid4().hex[:12]}",
460              type="function",
461              function=SimpleNamespace(
462                  name=tool_call_delta.get("name") or "",
463                  arguments=tool_call_delta.get("arguments") or "",
464              ),
465          )]
466      if reasoning:
467          delta_kwargs["reasoning"] = reasoning
468          delta_kwargs["reasoning_content"] = reasoning
469      delta = SimpleNamespace(**delta_kwargs)
470      choice = SimpleNamespace(index=0, delta=delta, finish_reason=finish_reason)
471      return _GeminiStreamChunk(
472          id=f"chatcmpl-{uuid.uuid4().hex[:12]}",
473          object="chat.completion.chunk",
474          created=int(time.time()),
475          model=model,
476          choices=[choice],
477          usage=None,
478      )
479  
480  
481  def _iter_sse_events(response: httpx.Response) -> Iterator[Dict[str, Any]]:
482      """Parse Server-Sent Events from an httpx streaming response."""
483      buffer = ""
484      for chunk in response.iter_text():
485          if not chunk:
486              continue
487          buffer += chunk
488          while "\n" in buffer:
489              line, buffer = buffer.split("\n", 1)
490              line = line.rstrip("\r")
491              if not line:
492                  continue
493              if line.startswith("data: "):
494                  data = line[6:]
495                  if data == "[DONE]":
496                      return
497                  try:
498                      yield json.loads(data)
499                  except json.JSONDecodeError:
500                      logger.debug("Non-JSON SSE line: %s", data[:200])
501  
502  
503  def _translate_stream_event(
504      event: Dict[str, Any],
505      model: str,
506      tool_call_counter: List[int],
507  ) -> List[_GeminiStreamChunk]:
508      """Unwrap Code Assist envelope and emit OpenAI-shaped chunk(s).
509  
510      ``tool_call_counter`` is a single-element list used as a mutable counter
511      across events in the same stream. Each ``functionCall`` part gets a
512      fresh, unique OpenAI ``index`` — keying by function name would collide
513      whenever the model issues parallel calls to the same tool (e.g. reading
514      three files in one turn).
515      """
516      inner = event.get("response") if isinstance(event.get("response"), dict) else event
517      candidates = inner.get("candidates") or []
518      if not candidates:
519          return []
520      cand = candidates[0]
521      if not isinstance(cand, dict):
522          return []
523  
524      chunks: List[_GeminiStreamChunk] = []
525  
526      content = cand.get("content") or {}
527      parts = content.get("parts") if isinstance(content, dict) else []
528      for part in parts or []:
529          if not isinstance(part, dict):
530              continue
531          if part.get("thought") is True and isinstance(part.get("text"), str):
532              chunks.append(_make_stream_chunk(
533                  model=model, reasoning=part["text"],
534              ))
535              continue
536          if isinstance(part.get("text"), str) and part["text"]:
537              chunks.append(_make_stream_chunk(model=model, content=part["text"]))
538          fc = part.get("functionCall")
539          if isinstance(fc, dict) and fc.get("name"):
540              name = str(fc["name"])
541              idx = tool_call_counter[0]
542              tool_call_counter[0] += 1
543              try:
544                  args_str = json.dumps(fc.get("args") or {}, ensure_ascii=False)
545              except (TypeError, ValueError):
546                  args_str = "{}"
547              chunks.append(_make_stream_chunk(
548                  model=model,
549                  tool_call_delta={
550                      "index": idx,
551                      "name": name,
552                      "arguments": args_str,
553                  },
554              ))
555  
556      finish_reason_raw = str(cand.get("finishReason") or "")
557      if finish_reason_raw:
558          mapped = _map_gemini_finish_reason(finish_reason_raw)
559          if tool_call_counter[0] > 0:
560              mapped = "tool_calls"
561          chunks.append(_make_stream_chunk(model=model, finish_reason=mapped))
562      return chunks
563  
564  
565  # =============================================================================
566  # GeminiCloudCodeClient — OpenAI-compatible facade
567  # =============================================================================
568  
569  MARKER_BASE_URL = "cloudcode-pa://google"
570  
571  
572  class _GeminiChatCompletions:
573      def __init__(self, client: "GeminiCloudCodeClient"):
574          self._client = client
575  
576      def create(self, **kwargs: Any) -> Any:
577          return self._client._create_chat_completion(**kwargs)
578  
579  
580  class _GeminiChatNamespace:
581      def __init__(self, client: "GeminiCloudCodeClient"):
582          self.completions = _GeminiChatCompletions(client)
583  
584  
585  class GeminiCloudCodeClient:
586      """Minimal OpenAI-SDK-compatible facade over Code Assist v1internal."""
587  
588      def __init__(
589          self,
590          *,
591          api_key: Optional[str] = None,
592          base_url: Optional[str] = None,
593          default_headers: Optional[Dict[str, str]] = None,
594          project_id: str = "",
595          **_: Any,
596      ):
597          # `api_key` here is a dummy — real auth is the OAuth access token
598          # fetched on every call via agent.google_oauth.get_valid_access_token().
599          # We accept the kwarg for openai.OpenAI interface parity.
600          self.api_key = api_key or "google-oauth"
601          self.base_url = base_url or MARKER_BASE_URL
602          self._default_headers = dict(default_headers or {})
603          self._configured_project_id = project_id
604          self._project_context: Optional[ProjectContext] = None
605          self._project_context_lock = False  # simple single-thread guard
606          self.chat = _GeminiChatNamespace(self)
607          self.is_closed = False
608          self._http = httpx.Client(timeout=httpx.Timeout(connect=15.0, read=600.0, write=30.0, pool=30.0))
609  
610      def close(self) -> None:
611          self.is_closed = True
612          try:
613              self._http.close()
614          except Exception:
615              pass
616  
617      # Implement the OpenAI SDK's context-manager-ish closure check
618      def __enter__(self):
619          return self
620  
621      def __exit__(self, exc_type, exc_val, exc_tb):
622          self.close()
623  
624      def _ensure_project_context(self, access_token: str, model: str) -> ProjectContext:
625          """Lazily resolve and cache the project context for this client."""
626          if self._project_context is not None:
627              return self._project_context
628  
629          env_project = google_oauth.resolve_project_id_from_env()
630          creds = google_oauth.load_credentials()
631          stored_project = creds.project_id if creds else ""
632  
633          # Prefer what's already baked into the creds
634          if stored_project:
635              self._project_context = ProjectContext(
636                  project_id=stored_project,
637                  managed_project_id=creds.managed_project_id if creds else "",
638                  tier_id="",
639                  source="stored",
640              )
641              return self._project_context
642  
643          ctx = resolve_project_context(
644              access_token,
645              configured_project_id=self._configured_project_id,
646              env_project_id=env_project,
647              user_agent_model=model,
648          )
649          # Persist discovered project back to the creds file so the next
650          # session doesn't re-run the discovery.
651          if ctx.project_id or ctx.managed_project_id:
652              google_oauth.update_project_ids(
653                  project_id=ctx.project_id,
654                  managed_project_id=ctx.managed_project_id,
655              )
656          self._project_context = ctx
657          return ctx
658  
659      def _create_chat_completion(
660          self,
661          *,
662          model: str = "gemini-2.5-flash",
663          messages: Optional[List[Dict[str, Any]]] = None,
664          stream: bool = False,
665          tools: Any = None,
666          tool_choice: Any = None,
667          temperature: Optional[float] = None,
668          max_tokens: Optional[int] = None,
669          top_p: Optional[float] = None,
670          stop: Any = None,
671          extra_body: Optional[Dict[str, Any]] = None,
672          timeout: Any = None,
673          **_: Any,
674      ) -> Any:
675          access_token = google_oauth.get_valid_access_token()
676          ctx = self._ensure_project_context(access_token, model)
677  
678          thinking_config = None
679          if isinstance(extra_body, dict):
680              thinking_config = extra_body.get("thinking_config") or extra_body.get("thinkingConfig")
681  
682          inner = build_gemini_request(
683              messages=messages or [],
684              tools=tools,
685              tool_choice=tool_choice,
686              temperature=temperature,
687              max_tokens=max_tokens,
688              top_p=top_p,
689              stop=stop,
690              thinking_config=thinking_config,
691          )
692          wrapped = wrap_code_assist_request(
693              project_id=ctx.project_id,
694              model=model,
695              inner_request=inner,
696          )
697  
698          headers = {
699              "Content-Type": "application/json",
700              "Accept": "application/json",
701              "Authorization": f"Bearer {access_token}",
702              "User-Agent": "hermes-agent (gemini-cli-compat)",
703              "X-Goog-Api-Client": "gl-python/hermes",
704              "x-activity-request-id": str(uuid.uuid4()),
705          }
706          headers.update(self._default_headers)
707  
708          if stream:
709              return self._stream_completion(model=model, wrapped=wrapped, headers=headers)
710  
711          url = f"{CODE_ASSIST_ENDPOINT}/v1internal:generateContent"
712          response = self._http.post(url, json=wrapped, headers=headers)
713          if response.status_code != 200:
714              raise _gemini_http_error(response)
715          try:
716              payload = response.json()
717          except ValueError as exc:
718              raise CodeAssistError(
719                  f"Invalid JSON from Code Assist: {exc}",
720                  code="code_assist_invalid_json",
721              ) from exc
722          return _translate_gemini_response(payload, model=model)
723  
724      def _stream_completion(
725          self,
726          *,
727          model: str,
728          wrapped: Dict[str, Any],
729          headers: Dict[str, str],
730      ) -> Iterator[_GeminiStreamChunk]:
731          """Generator that yields OpenAI-shaped streaming chunks."""
732          url = f"{CODE_ASSIST_ENDPOINT}/v1internal:streamGenerateContent?alt=sse"
733          stream_headers = dict(headers)
734          stream_headers["Accept"] = "text/event-stream"
735  
736          def _generator() -> Iterator[_GeminiStreamChunk]:
737              try:
738                  with self._http.stream("POST", url, json=wrapped, headers=stream_headers) as response:
739                      if response.status_code != 200:
740                          # Materialize error body for better diagnostics
741                          response.read()
742                          raise _gemini_http_error(response)
743                      tool_call_counter: List[int] = [0]
744                      for event in _iter_sse_events(response):
745                          for chunk in _translate_stream_event(event, model, tool_call_counter):
746                              yield chunk
747              except httpx.HTTPError as exc:
748                  raise CodeAssistError(
749                      f"Streaming request failed: {exc}",
750                      code="code_assist_stream_error",
751                  ) from exc
752  
753          return _generator()
754  
755  
756  def _gemini_http_error(response: httpx.Response) -> CodeAssistError:
757      """Translate an httpx response into a CodeAssistError with rich metadata.
758  
759      Parses Google's error envelope (``{"error": {"code", "message", "status",
760      "details": [...]}}``) so the agent's error classifier can reason about
761      the failure — ``status_code`` enables the rate_limit / auth classification
762      paths, and ``response`` lets the main loop honor ``Retry-After`` just
763      like it does for OpenAI SDK exceptions.
764  
765      Also lifts a few recognizable Google conditions into human-readable
766      messages so the user sees something better than a 500-char JSON dump:
767  
768          MODEL_CAPACITY_EXHAUSTED → "Gemini model capacity exhausted for
769              <model>. This is a Google-side throttle..."
770          RESOURCE_EXHAUSTED w/o reason → quota-style message
771          404 → "Model <name> not found at cloudcode-pa..."
772      """
773      status = response.status_code
774  
775      # Parse the body once, surviving any weird encodings.
776      body_text = ""
777      body_json: Dict[str, Any] = {}
778      try:
779          body_text = response.text
780      except Exception:
781          body_text = ""
782      if body_text:
783          try:
784              parsed = json.loads(body_text)
785              if isinstance(parsed, dict):
786                  body_json = parsed
787          except (ValueError, TypeError):
788              body_json = {}
789  
790      # Dig into Google's error envelope.  Shape is:
791      #   {"error": {"code": 429, "message": "...", "status": "RESOURCE_EXHAUSTED",
792      #              "details": [{"@type": ".../ErrorInfo", "reason": "MODEL_CAPACITY_EXHAUSTED",
793      #                           "metadata": {...}},
794      #                          {"@type": ".../RetryInfo", "retryDelay": "30s"}]}}
795      err_obj = body_json.get("error") if isinstance(body_json, dict) else None
796      if not isinstance(err_obj, dict):
797          err_obj = {}
798      err_status = str(err_obj.get("status") or "").strip()
799      err_message = str(err_obj.get("message") or "").strip()
800      _raw_details = err_obj.get("details")
801      err_details_list = _raw_details if isinstance(_raw_details, list) else []
802  
803      # Extract google.rpc.ErrorInfo reason + metadata.  There may be more
804      # than one ErrorInfo (rare), so we pick the first one with a reason.
805      error_reason = ""
806      error_metadata: Dict[str, Any] = {}
807      retry_delay_seconds: Optional[float] = None
808      for detail in err_details_list:
809          if not isinstance(detail, dict):
810              continue
811          type_url = str(detail.get("@type") or "")
812          if not error_reason and type_url.endswith("/google.rpc.ErrorInfo"):
813              reason = detail.get("reason")
814              if isinstance(reason, str) and reason:
815                  error_reason = reason
816              md = detail.get("metadata")
817              if isinstance(md, dict):
818                  error_metadata = md
819          elif retry_delay_seconds is None and type_url.endswith("/google.rpc.RetryInfo"):
820              # retryDelay is a google.protobuf.Duration string like "30s" or "1.5s".
821              delay_raw = detail.get("retryDelay")
822              if isinstance(delay_raw, str) and delay_raw.endswith("s"):
823                  try:
824                      retry_delay_seconds = float(delay_raw[:-1])
825                  except ValueError:
826                      pass
827              elif isinstance(delay_raw, (int, float)):
828                  retry_delay_seconds = float(delay_raw)
829  
830      # Fall back to the Retry-After header if the body didn't include RetryInfo.
831      if retry_delay_seconds is None:
832          try:
833              header_val = response.headers.get("Retry-After") or response.headers.get("retry-after")
834          except Exception:
835              header_val = None
836          if header_val:
837              try:
838                  retry_delay_seconds = float(header_val)
839              except (TypeError, ValueError):
840                  retry_delay_seconds = None
841  
842      # Classify the error code.  ``code_assist_rate_limited`` stays the default
843      # for 429s; a more specific reason tag helps downstream callers (e.g. tests,
844      # logs) without changing the rate_limit classification path.
845      code = f"code_assist_http_{status}"
846      if status == 401:
847          code = "code_assist_unauthorized"
848      elif status == 429:
849          code = "code_assist_rate_limited"
850          if error_reason == "MODEL_CAPACITY_EXHAUSTED":
851              code = "code_assist_capacity_exhausted"
852  
853      # Build a human-readable message.  Keep the status + a raw-body tail for
854      # debugging, but lead with a friendlier summary when we recognize the
855      # Google signal.
856      model_hint = ""
857      if isinstance(error_metadata, dict):
858          model_hint = str(error_metadata.get("model") or error_metadata.get("modelId") or "").strip()
859  
860      if status == 429 and error_reason == "MODEL_CAPACITY_EXHAUSTED":
861          target = model_hint or "this Gemini model"
862          message = (
863              f"Gemini capacity exhausted for {target} (Google-side throttle, "
864              f"not a Hermes issue). Try a different Gemini model or set a "
865              f"fallback_providers entry to a non-Gemini provider."
866          )
867          if retry_delay_seconds is not None:
868              message += f" Google suggests retrying in {retry_delay_seconds:g}s."
869      elif status == 429 and err_status == "RESOURCE_EXHAUSTED":
870          message = (
871              f"Gemini quota exhausted ({err_message or 'RESOURCE_EXHAUSTED'}). "
872              f"Check /gquota for remaining daily requests."
873          )
874          if retry_delay_seconds is not None:
875              message += f" Retry suggested in {retry_delay_seconds:g}s."
876      elif status == 404:
877          # Google returns 404 when a model has been retired or renamed.
878          target = model_hint or (err_message or "model")
879          message = (
880              f"Code Assist 404: {target} is not available at "
881              f"cloudcode-pa.googleapis.com. It may have been renamed or "
882              f"retired. Check hermes_cli/models.py for the current list."
883          )
884      elif err_message:
885          # Generic fallback with the parsed message.
886          message = f"Code Assist HTTP {status} ({err_status or 'error'}): {err_message}"
887      else:
888          # Last-ditch fallback — raw body snippet.
889          message = f"Code Assist returned HTTP {status}: {body_text[:500]}"
890  
891      return CodeAssistError(
892          message,
893          code=code,
894          status_code=status,
895          response=response,
896          retry_after=retry_delay_seconds,
897          details={
898              "status": err_status,
899              "reason": error_reason,
900              "metadata": error_metadata,
901              "message": err_message,
902          },
903      )