/ src / solace_agent_mesh / agent / tools / image_tools.py
image_tools.py
   1  """
   2  Collection of Python tools for image generation, manipulation, and multimodal content analysis.
   3  Includes tools for image description and audio description using vision and audio APIs.
   4  """
   5  
   6  import logging
   7  import asyncio
   8  import base64
   9  import json
  10  import os
  11  import uuid
  12  from datetime import datetime, timezone
  13  from typing import Any, Dict, Optional
  14  
  15  import httpx
  16  from google.adk.tools import ToolContext
  17  
  18  from google.genai import types as adk_types
  19  from .tool_definition import BuiltinTool
  20  from .tool_result import ToolResult, DataObject, DataDisposition
  21  from .artifact_types import Artifact
  22  from .registry import tool_registry
  23  from ...agent.utils.context_helpers import get_original_session_id
  24  from ...agent.utils.artifact_helpers import save_artifact_with_metadata, DEFAULT_SCHEMA_MAX_KEYS
  25  
  26  log = logging.getLogger(__name__)
  27  
  28  
  29  def _resolve_tool_config(tool_config: dict | None, log_identifier: str, required_keys: list[str] | None = None) -> dict:
  30      config = tool_config if tool_config is not None else {}
  31      if not config:
  32          log.warning(f"{log_identifier} Tool-specific configuration (tool_config) is empty.")
  33      for key in (required_keys or []):
  34          if config.get(key) is None:
  35              raise ValueError(f"'{key}' configuration is missing in tool_config.")
  36      return config
  37  
  38  
  39  async def create_image_from_description(
  40      image_description: str,
  41      output_filename: Optional[str] = None,
  42      tool_context: ToolContext = None,
  43      tool_config: Optional[Dict[str, Any]] = None,
  44  ) -> ToolResult:
  45      """
  46      Generates an image based on a textual description using LiteLLM and saves it as a PNG artifact.
  47      Configuration for LiteLLM (model, api_key, etc.) is expected in `tool_config`.
  48  
  49      Args:
  50          image_description: The textual prompt to use for image generation.
  51          output_filename: Optional. The desired filename for the output PNG image.
  52                           If not provided, a unique name like 'generated_image_<uuid>.png' will be used.
  53          tool_context: The context provided by the ADK framework.
  54          tool_config: Optional dictionary containing specific configuration for this tool.
  55  
  56      Returns:
  57          ToolResult with output artifact details (artifact storage handled by ToolResultProcessor).
  58      """
  59      log_identifier = "[ImageTools:create_image_from_description]"
  60      if not tool_context:
  61          log.error(f"{log_identifier} ToolContext is missing.")
  62          return ToolResult.error("ToolContext is missing.")
  63  
  64      try:
  65          current_tool_config = _resolve_tool_config(
  66              tool_config, log_identifier, required_keys=["model", "api_key", "api_base"]
  67          )
  68          model_name = current_tool_config.get("model")
  69          api_key = current_tool_config.get("api_key")
  70          api_base = current_tool_config.get("api_base")
  71          extra_params = current_tool_config.get("extra_params", {})
  72  
  73          if "/" in model_name:
  74              original_model_name = model_name
  75              model_name = model_name.split("/", 1)[-1]
  76              log.debug(
  77                  f"{log_identifier} Original model name '{original_model_name}' processed to '{model_name}' for API call."
  78              )
  79  
  80          log.debug(
  81              f"{log_identifier} Using image generation model: {model_name} via direct API call to: {api_base}"
  82          )
  83  
  84          api_url = f"{api_base.rstrip('/')}/v1/images/generations"
  85          headers = {
  86              "Content-Type": "application/json",
  87              "Authorization": f"Bearer {api_key}",
  88          }
  89          payload = {"model": model_name, "prompt": image_description, **extra_params}
  90  
  91          log.debug(
  92              f"{log_identifier} Calling image generation API with prompt: '{image_description[:100]}...' and payload: {json.dumps(payload)}"
  93          )
  94  
  95          try:
  96              async with httpx.AsyncClient(timeout=60.0) as client:
  97                  http_response = await client.post(
  98                      api_url, headers=headers, json=payload
  99                  )
 100                  http_response.raise_for_status()
 101                  response_data = http_response.json()
 102          except httpx.HTTPStatusError as hse:
 103              log.error(
 104                  f"{log_identifier} HTTP error calling image generation API {hse.request.url}: {hse.response.status_code} - {hse.response.text}"
 105              )
 106              return ToolResult.error(
 107                  f"API error generating image: {hse.response.status_code} - {hse.response.text}"
 108              )
 109          except httpx.RequestError as re:
 110              log.error(
 111                  f"{log_identifier} Request error calling image generation API {re.request.url}: {re}"
 112              )
 113              return ToolResult.error(f"Request error generating image: {re}")
 114          except Exception as e:
 115              log.error(f"{log_identifier} Error calling image generation API: {e}")
 116              return ToolResult.error(f"Error generating image: {e}")
 117  
 118          log.debug(f"{log_identifier} Image generation API response received.")
 119  
 120          if (
 121              not response_data
 122              or not response_data.get("data")
 123              or not response_data["data"][0]
 124          ):
 125              log.error(
 126                  f"{log_identifier} API did not return valid image data. Response: {json.dumps(response_data)}"
 127              )
 128              raise ValueError("Image generation API did not return valid image data.")
 129  
 130          image_data_item = response_data["data"][0]
 131          image_bytes = None
 132  
 133          if image_data_item.get("url"):
 134              image_url = image_data_item["url"]
 135              log.info(f"{log_identifier} Fetching image from URL: {image_url}")
 136              async with httpx.AsyncClient() as client:
 137                  http_response = await client.get(image_url, timeout=30.0)
 138                  http_response.raise_for_status()
 139                  image_bytes = http_response.content
 140              log.info(f"{log_identifier} Image fetched successfully from URL.")
 141          elif image_data_item.get("b64_json"):
 142              log.info(f"{log_identifier} Decoding image from b64_json.")
 143              image_bytes = base64.b64decode(image_data_item["b64_json"])
 144              log.info(f"{log_identifier} Image decoded successfully from b64_json.")
 145          else:
 146              raise ValueError(
 147                  "No valid image data (URL or b64_json) found in LiteLLM response."
 148              )
 149  
 150          if not image_bytes:
 151              raise ValueError("Failed to retrieve image bytes.")
 152  
 153          # Determine output filename
 154          final_output_filename = ""
 155          if output_filename:
 156              if not output_filename.lower().endswith(".png"):
 157                  final_output_filename = f"{output_filename}.png"
 158              else:
 159                  final_output_filename = output_filename
 160          else:
 161              final_output_filename = f"generated_image_{uuid.uuid4()}.png"
 162          log.debug(
 163              f"{log_identifier} Determined output filename: {final_output_filename}"
 164          )
 165  
 166          # Build metadata for the artifact
 167          current_timestamp_iso = datetime.now(timezone.utc).isoformat()
 168          metadata_dict = {
 169              "source_prompt": image_description,
 170              "generation_tool": "direct_api",
 171              "generation_model": model_name,
 172              "request_timestamp": current_timestamp_iso,
 173              "original_requested_filename": (
 174                  output_filename if output_filename else "N/A"
 175              ),
 176          }
 177          if extra_params:
 178              metadata_dict["api_request_params"] = json.dumps(extra_params)
 179  
 180          log.info(
 181              f"{log_identifier} Returning image as DataObject for artifact storage: '{final_output_filename}'"
 182          )
 183  
 184          # Return ToolResult with DataObject - artifact storage handled by ToolResultProcessor
 185          return ToolResult.ok(
 186              "Image generated successfully.",
 187              data_objects=[
 188                  DataObject(
 189                      name=final_output_filename,
 190                      content=image_bytes,
 191                      mime_type="image/png",
 192                      disposition=DataDisposition.ARTIFACT,
 193                      description=f"Image generated from prompt: {image_description}",
 194                      metadata=metadata_dict,
 195                  )
 196              ],
 197          )
 198  
 199      except ValueError as ve:
 200          log.error(f"{log_identifier} Value error: {ve}")
 201          return ToolResult.error(str(ve))
 202      except httpx.HTTPStatusError as hse:
 203          log.error(
 204              f"{log_identifier} HTTP error fetching image from URL {hse.request.url}: {hse.response.status_code} - {hse.response.text}"
 205          )
 206          return ToolResult.error(
 207              f"HTTP error fetching image: {hse.response.status_code}"
 208          )
 209      except httpx.RequestError as re:
 210          log.error(
 211              f"{log_identifier} Request error fetching image from URL {re.request.url}: {re}"
 212          )
 213          return ToolResult.error(f"Request error fetching image: {re}")
 214      except Exception as e:
 215          log.exception(
 216              f"{log_identifier} Unexpected error in create_image_from_description: {e}"
 217          )
 218          return ToolResult.error(f"An unexpected error occurred: {e}")
 219  
 220  
 221  def _get_image_mime_type(filename: str) -> str:
 222      """Get MIME type from file extension."""
 223      ext = os.path.splitext(filename)[1].lower()
 224      mime_mapping = {
 225          ".jpg": "image/jpeg",
 226          ".jpeg": "image/jpeg",
 227          ".png": "image/png",
 228          ".webp": "image/webp",
 229          ".gif": "image/gif",
 230      }
 231      return mime_mapping.get(ext, "application/octet-stream")
 232  
 233  
 234  def _is_supported_image_format(filename: str) -> bool:
 235      """Check if the image format is supported."""
 236      ext = os.path.splitext(filename)[1].lower()
 237      supported_formats = {".jpg", ".jpeg", ".png", ".webp", ".gif"}
 238      return ext in supported_formats
 239  
 240  
 241  def _create_data_url(image_bytes: bytes, mime_type: str) -> str:
 242      """Create base64 data URL from image bytes."""
 243      base64_data = base64.b64encode(image_bytes).decode("utf-8")
 244      return f"data:{mime_type};base64,{base64_data}"
 245  
 246  
 247  async def describe_image(
 248      input_image: Artifact,
 249      prompt: str = "What is in this image?",
 250      tool_context: ToolContext = None,
 251      tool_config: Optional[Dict[str, Any]] = None,
 252  ) -> ToolResult:
 253      """
 254      Describes an image using an OpenAI-compatible vision API.
 255  
 256      Args:
 257          input_image: The input image artifact (pre-loaded by the framework).
 258          prompt: Custom prompt for image analysis (default: "What is in this image?").
 259          tool_context: The context provided by the ADK framework.
 260          tool_config: Configuration dictionary containing model, api_base, api_key.
 261  
 262      Returns:
 263          ToolResult with description data.
 264      """
 265      log_identifier = f"[ImageTools:describe_image:{input_image.filename}]"
 266  
 267      try:
 268          current_tool_config = _resolve_tool_config(
 269              tool_config, log_identifier, required_keys=["model", "api_key", "api_base"]
 270          )
 271          model_name = current_tool_config.get("model")
 272          api_key = current_tool_config.get("api_key")
 273          api_base = current_tool_config.get("api_base")
 274  
 275          log.debug(f"{log_identifier} Using model: {model_name}, API base: {api_base}")
 276  
 277          if not _is_supported_image_format(input_image.filename):
 278              raise ValueError(
 279                  "Unsupported image format. Supported formats: .png, .jpg, .jpeg, .webp, .gif"
 280              )
 281  
 282          # Get image data from pre-loaded artifact
 283          image_bytes = input_image.as_bytes()
 284          log.debug(f"{log_identifier} Using pre-loaded image: {len(image_bytes)} bytes")
 285  
 286          mime_type = _get_image_mime_type(input_image.filename)
 287          data_url = _create_data_url(image_bytes, mime_type)
 288          log.debug(f"{log_identifier} Created data URL with MIME type: {mime_type}")
 289  
 290          api_url = f"{api_base.rstrip('/')}/v1/chat/completions"
 291          headers = {
 292              "Content-Type": "application/json",
 293              "Authorization": f"Bearer {api_key}",
 294          }
 295  
 296          request_data = {
 297              "model": model_name,
 298              "messages": [
 299                  {
 300                      "role": "user",
 301                      "content": [
 302                          {"type": "text", "text": prompt},
 303                          {"type": "image_url", "image_url": {"url": data_url}},
 304                      ],
 305                  }
 306              ],
 307          }
 308  
 309          log.debug(
 310              f"{log_identifier} Calling vision API with prompt: '{prompt[:100]}...'"
 311          )
 312  
 313          async with httpx.AsyncClient(timeout=60.0) as client:
 314              response = await client.post(api_url, headers=headers, json=request_data)
 315              response.raise_for_status()
 316              response_data = response.json()
 317          log.debug(f"{log_identifier} Vision API response received.")
 318  
 319          if not response_data.get("choices") or not response_data["choices"]:
 320              raise ValueError("API response does not contain valid choices.")
 321  
 322          choice = response_data["choices"][0]
 323          if not choice.get("message") or not choice["message"].get("content"):
 324              raise ValueError("API response does not contain valid message content.")
 325  
 326          description = choice["message"]["content"]
 327  
 328          tokens_used = response_data.get("usage", {})
 329  
 330          log.info(
 331              f"{log_identifier} Image described successfully. Description length: {len(description)} characters"
 332          )
 333  
 334          return ToolResult.ok(
 335              "Image described successfully",
 336              data={
 337                  "description": description,
 338                  "image_filename": input_image.filename,
 339                  "image_version": input_image.version,
 340                  "tokens_used": tokens_used,
 341              },
 342          )
 343  
 344      except json.JSONDecodeError as jde:
 345          log.error(f"{log_identifier} JSON decode error: {jde}")
 346          return ToolResult.error("Invalid JSON response from API")
 347      except ValueError as ve:
 348          log.error(f"{log_identifier} Value error: {ve}")
 349          return ToolResult.error(str(ve))
 350      except httpx.HTTPStatusError as hse:
 351          log.error(
 352              f"{log_identifier} HTTP error calling vision API: {hse.response.status_code} - {hse.response.text}"
 353          )
 354          return ToolResult.error(f"API error: {hse.response.status_code}")
 355      except httpx.RequestError as re:
 356          log.error(f"{log_identifier} Request error calling vision API: {re}")
 357          return ToolResult.error(f"Request error: {re}")
 358      except Exception as e:
 359          log.exception(f"{log_identifier} Unexpected error in describe_image: {e}")
 360          return ToolResult.error(f"An unexpected error occurred: {e}")
 361  
 362  
 363  def _get_audio_format(filename: str) -> str:
 364      """Get audio format from file extension."""
 365      ext = os.path.splitext(filename)[1].lower()
 366      format_mapping = {".wav": "wav", ".mp3": "mp3"}
 367      return format_mapping.get(ext, "wav")
 368  
 369  
 370  def _is_supported_audio_format(filename: str) -> bool:
 371      """Check if the audio format is supported."""
 372      ext = os.path.splitext(filename)[1].lower()
 373      supported_formats = {".wav", ".mp3"}
 374      return ext in supported_formats
 375  
 376  
 377  def _encode_audio_to_base64(audio_bytes: bytes) -> str:
 378      """Encode audio bytes to base64 string."""
 379      return base64.b64encode(audio_bytes).decode("utf-8")
 380  
 381  
 382  async def describe_audio(
 383      input_audio: Artifact,
 384      prompt: str = "What is in this recording?",
 385      tool_context: ToolContext = None,
 386      tool_config: Optional[Dict[str, Any]] = None,
 387  ) -> ToolResult:
 388      """
 389      Describes an audio recording using an OpenAI-compatible audio API.
 390  
 391      Args:
 392          input_audio: The input audio artifact (pre-loaded by the framework).
 393          prompt: Custom prompt for audio analysis (default: "What is in this recording?").
 394          tool_context: The context provided by the ADK framework.
 395          tool_config: Configuration dictionary containing model, api_base, api_key.
 396  
 397      Returns:
 398          ToolResult with description data.
 399      """
 400      log_identifier = f"[ImageTools:describe_audio:{input_audio.filename}]"
 401  
 402      try:
 403          current_tool_config = _resolve_tool_config(
 404              tool_config, log_identifier, required_keys=["model", "api_key", "api_base"]
 405          )
 406          model_name = current_tool_config.get("model")
 407          api_key = current_tool_config.get("api_key")
 408          api_base = current_tool_config.get("api_base")
 409  
 410          log.debug(f"{log_identifier} Using model: {model_name}, API base: {api_base}")
 411  
 412          if not _is_supported_audio_format(input_audio.filename):
 413              raise ValueError("Unsupported audio format. Supported formats: .wav, .mp3")
 414  
 415          # Get audio data from pre-loaded artifact
 416          audio_bytes = input_audio.as_bytes()
 417          log.debug(f"{log_identifier} Using pre-loaded audio: {len(audio_bytes)} bytes")
 418  
 419          audio_format = _get_audio_format(input_audio.filename)
 420          base64_audio = _encode_audio_to_base64(audio_bytes)
 421          log.debug(
 422              f"{log_identifier} Encoded audio to base64 with format: {audio_format}"
 423          )
 424  
 425          api_url = f"{api_base.rstrip('/')}/v1/chat/completions"
 426          headers = {
 427              "Content-Type": "application/json",
 428              "Authorization": f"Bearer {api_key}",
 429          }
 430  
 431          request_data = {
 432              "model": model_name,
 433              "modalities": ["audio", "text"],
 434              "audio": {"voice": "alloy", "format": audio_format},
 435              "messages": [
 436                  {
 437                      "role": "user",
 438                      "content": [
 439                          {"type": "text", "text": prompt},
 440                          {
 441                              "type": "input_audio",
 442                              "input_audio": {
 443                                  "data": base64_audio,
 444                                  "format": audio_format,
 445                              },
 446                          },
 447                      ],
 448                  }
 449              ],
 450          }
 451  
 452          log.debug(
 453              f"{log_identifier} Calling audio API with prompt: '{prompt[:100]}...'"
 454          )
 455  
 456          async with httpx.AsyncClient(timeout=60.0) as client:
 457              response = await client.post(api_url, headers=headers, json=request_data)
 458              response.raise_for_status()
 459              response_data = response.json()
 460  
 461          log.debug(f"{log_identifier} Audio API response received.")
 462  
 463          if not response_data.get("choices") or not response_data["choices"]:
 464              raise ValueError("API response does not contain valid choices.")
 465  
 466          choice = response_data["choices"][0]
 467          if not choice.get("message") or not choice["message"].get("content"):
 468              raise ValueError("API response does not contain valid message content.")
 469  
 470          description = choice["message"]["content"]
 471  
 472          tokens_used = response_data.get("usage", {})
 473  
 474          log.info(
 475              f"{log_identifier} Audio described successfully. Description length: {len(description)} characters"
 476          )
 477  
 478          return ToolResult.ok(
 479              "Audio described successfully",
 480              data={
 481                  "description": description,
 482                  "audio_filename": input_audio.filename,
 483                  "audio_version": input_audio.version,
 484                  "tokens_used": tokens_used,
 485              },
 486          )
 487  
 488      except json.JSONDecodeError as jde:
 489          log.error(f"{log_identifier} JSON decode error: {jde}")
 490          return ToolResult.error("Invalid JSON response from API")
 491      except ValueError as ve:
 492          log.error(f"{log_identifier} Value error: {ve}")
 493          return ToolResult.error(str(ve))
 494      except httpx.HTTPStatusError as hse:
 495          log.error(
 496              f"{log_identifier} HTTP error calling audio API: {hse.response.status_code} - {hse.response.text}"
 497          )
 498          return ToolResult.error(f"API error: {hse.response.status_code}")
 499      except httpx.RequestError as re:
 500          log.error(f"{log_identifier} Request error calling audio API: {re}")
 501          return ToolResult.error(f"Request error: {re}")
 502      except Exception as e:
 503          log.exception(f"{log_identifier} Unexpected error in describe_audio: {e}")
 504          return ToolResult.error(f"An unexpected error occurred: {e}")
 505  
 506  
 507  async def edit_image_with_gemini(
 508      input_image: Artifact,
 509      edit_prompt: str,
 510      output_filename: Optional[str] = None,
 511      use_pro_model: bool = False,
 512      tool_context: ToolContext = None,
 513      tool_config: Optional[Dict[str, Any]] = None,
 514  ) -> ToolResult:
 515      """
 516      Edits an existing image based on a text prompt using Google's Gemini image generation models.
 517  
 518      Two models are available (configured via tool_config):
 519      - Standard model: Default, optimized for speed, efficiency, and lower cost.
 520      - Pro model: Professional quality for complex tasks requiring advanced reasoning,
 521        high-fidelity text rendering, and up to 4K resolution. More expensive, so use only
 522        when truly necessary for infographics, charts, diagrams, technical illustrations,
 523        or tasks requiring precise text placement.
 524  
 525      Args:
 526          input_image: The input image artifact (pre-loaded by the framework).
 527          edit_prompt: Text description of the desired edits to apply to the image.
 528          output_filename: Optional. The desired filename for the output edited image.
 529                          If not provided, a unique name like 'edited_image_<uuid>.jpg' will be used.
 530          use_pro_model: If True, uses the pro model for professional quality output with
 531                        advanced reasoning and high-fidelity text rendering. More expensive.
 532                        If False (default), uses the standard model which is faster and cheaper.
 533          tool_context: The context provided by the ADK framework.
 534          tool_config: Configuration dictionary containing gemini_api_key, model, and optionally pro_model.
 535  
 536      Returns:
 537          ToolResult with output artifact details (artifact storage handled by ToolResultProcessor).
 538      """
 539      log_identifier = f"[ImageTools:edit_image_with_gemini:{input_image.filename}]"
 540  
 541      try:
 542          try:
 543              from google import genai
 544              from google.genai import types
 545              from PIL import Image as PILImage
 546              from io import BytesIO
 547          except ImportError as ie:
 548              log.error(f"{log_identifier} Required dependencies not available: {ie}")
 549              return ToolResult.error(f"Required dependencies not available: {ie}")
 550  
 551          current_tool_config = _resolve_tool_config(
 552              tool_config, log_identifier, required_keys=["gemini_api_key"]
 553          )
 554          gemini_api_key = current_tool_config.get("gemini_api_key")
 555          # Standard model - optimized for speed, efficiency, and lower cost
 556          default_model = current_tool_config.get(
 557              "model", "gemini-3.1-flash-image-preview"
 558          )
 559          # Pro model - for professional asset production with advanced reasoning,
 560          # high-fidelity text rendering, and up to 4K resolution. More expensive.
 561          pro_model = current_tool_config.get(
 562              "pro_model", "gemini-3-pro-image-preview"
 563          )
 564  
 565          # Model selection is determined by the calling LLM via use_pro_model parameter
 566          model_name = pro_model if use_pro_model else default_model
 567          
 568          log.info(
 569              f"{log_identifier} Model selection: using {'pro' if use_pro_model else 'standard'} model "
 570              f"({model_name})"
 571          )
 572  
 573          if not _is_supported_image_format(input_image.filename):
 574              raise ValueError(
 575                  "Unsupported image format. Supported formats: .png, .jpg, .jpeg, .webp, .gif"
 576              )
 577  
 578          # Get image data from pre-loaded artifact
 579          image_bytes = input_image.as_bytes()
 580          log.debug(f"{log_identifier} Using pre-loaded image: {len(image_bytes)} bytes")
 581  
 582          try:
 583              from PIL import UnidentifiedImageError
 584  
 585              pil_image = PILImage.open(BytesIO(image_bytes))
 586              log.debug(
 587                  f"{log_identifier} Converted to PIL Image: {pil_image.size}, mode: {pil_image.mode}"
 588              )
 589          except UnidentifiedImageError as e:
 590              log.error(f"{log_identifier} Unidentified image error: {e}")
 591              raise ValueError(f"Cannot identify image file: {e}")
 592          except IOError as e:
 593              log.error(f"{log_identifier} IO error: {e}")
 594              raise ValueError(f"Cannot identify image file: {e}")
 595          except Exception as e:
 596              raise ValueError(f"Failed to process image data: {e}")
 597  
 598          try:
 599              client = genai.Client(api_key=gemini_api_key)
 600              log.debug(f"{log_identifier} Initialized Gemini client")
 601          except Exception as e:
 602              raise ValueError(f"Failed to initialize Gemini client: {e}")
 603  
 604          text_input = (edit_prompt,)
 605  
 606          log.debug(
 607              f"{log_identifier} Calling Gemini API with edit prompt: '{edit_prompt[:100]}...'"
 608          )
 609  
 610          try:
 611              response = await asyncio.to_thread(
 612                  client.models.generate_content,
 613                  model=model_name,
 614                  contents=[text_input, pil_image],
 615                  config=types.GenerateContentConfig(
 616                      response_modalities=["TEXT", "IMAGE"]
 617                  ),
 618              )
 619              log.debug(f"{log_identifier} Gemini API response received.")
 620          except Exception as e:
 621              raise ValueError(f"Gemini API call failed: {e}")
 622  
 623          edited_image_bytes = None
 624          response_text = None
 625  
 626          if not response.candidates or not response.candidates[0].content.parts:
 627              raise ValueError("Gemini API did not return valid content.")
 628  
 629          for part in response.candidates[0].content.parts:
 630              if part.text is not None:
 631                  response_text = part.text
 632                  log.debug(
 633                      f"{log_identifier} Received text response: {response_text[:100]}..."
 634                  )
 635              elif part.inline_data is not None:
 636                  edited_pil_image = PILImage.open(BytesIO(part.inline_data.data))
 637                  output_buffer = BytesIO()
 638                  if edited_pil_image.mode == "RGBA":
 639                      rgb_image = PILImage.new(
 640                          "RGB", edited_pil_image.size, (255, 255, 255)
 641                      )
 642                      rgb_image.paste(edited_pil_image, mask=edited_pil_image.split()[-1])
 643                      rgb_image.save(output_buffer, format="JPEG", quality=95)
 644                  else:
 645                      edited_pil_image.save(output_buffer, format="JPEG", quality=95)
 646                  edited_image_bytes = output_buffer.getvalue()
 647                  log.debug(
 648                      f"{log_identifier} Processed edited image: {len(edited_image_bytes)} bytes"
 649                  )
 650  
 651          if not edited_image_bytes:
 652              raise ValueError("No edited image data received from Gemini API.")
 653  
 654          # Determine output filename
 655          final_output_filename = ""
 656          if output_filename:
 657              sane_filename = os.path.basename(output_filename)
 658              if not sane_filename.lower().endswith((".jpg", ".jpeg")):
 659                  final_output_filename = f"{sane_filename}.jpg"
 660              else:
 661                  final_output_filename = sane_filename
 662          else:
 663              base_name = os.path.splitext(input_image.filename)[0]
 664              final_output_filename = f"edited_{base_name}_{uuid.uuid4().hex[:8]}.jpg"
 665  
 666          log.debug(
 667              f"{log_identifier} Determined output filename: {final_output_filename}"
 668          )
 669  
 670          # Build metadata for the artifact
 671          current_timestamp_iso = datetime.now(timezone.utc).isoformat()
 672          metadata_dict = {
 673              "original_image": input_image.filename,
 674              "original_version": input_image.version,
 675              "edit_prompt": edit_prompt,
 676              "editing_tool": "gemini",
 677              "editing_model": model_name,
 678              "request_timestamp": current_timestamp_iso,
 679              "original_requested_filename": (
 680                  output_filename if output_filename else "N/A"
 681              ),
 682          }
 683          if response_text:
 684              metadata_dict["gemini_response_text"] = response_text
 685  
 686          log.info(
 687              f"{log_identifier} Returning edited image as DataObject for artifact storage: '{final_output_filename}'"
 688          )
 689  
 690          # Return ToolResult with DataObject - artifact storage handled by ToolResultProcessor
 691          return ToolResult.ok(
 692              "Image edited successfully.",
 693              data={
 694                  "original_filename": input_image.filename,
 695                  "original_version": input_image.version,
 696              },
 697              data_objects=[
 698                  DataObject(
 699                      name=final_output_filename,
 700                      content=edited_image_bytes,
 701                      mime_type="image/jpeg",
 702                      disposition=DataDisposition.ARTIFACT,
 703                      description=f"Image edited with prompt: {edit_prompt}",
 704                      metadata=metadata_dict,
 705                  )
 706              ],
 707          )
 708  
 709      except ValueError as ve:
 710          log.error(f"{log_identifier} Value error: {ve}")
 711          return ToolResult.error(str(ve))
 712      except Exception as e:
 713          log.exception(
 714              f"{log_identifier} Unexpected error in edit_image_with_gemini: {e}"
 715          )
 716          return ToolResult.error(f"An unexpected error occurred: {e}")
 717  
 718  
 719  async def generate_image_with_gemini(
 720      image_description: str,
 721      output_filename: Optional[str] = None,
 722      use_pro_model: bool = False,
 723      tool_context: ToolContext = None,
 724      tool_config: Optional[Dict[str, Any]] = None,
 725  ) -> Dict[str, Any]:
 726      """
 727      Generates an image from a text description using Google's Gemini image generation models.
 728      
 729      Two models are available (configured via tool_config):
 730      - Standard model: Default, optimized for speed, efficiency, and lower cost.
 731      - Pro model: Professional quality for complex tasks requiring advanced reasoning,
 732        high-fidelity text rendering, and up to 4K resolution. More expensive, so use only
 733        when truly necessary for infographics, charts, diagrams, technical illustrations,
 734        or tasks requiring precise text placement.
 735  
 736      Args:
 737          image_description: The textual prompt to use for image generation.
 738          output_filename: Optional. The desired filename for the output image.
 739                          If not provided, a unique name like 'generated_image_<uuid>.png' will be used.
 740          use_pro_model: If True, uses the pro model for professional quality output with
 741                        advanced reasoning and high-fidelity text rendering. More expensive.
 742                        If False (default), uses the standard model which is faster and cheaper.
 743          tool_context: The context provided by the ADK framework.
 744          tool_config: Configuration dictionary containing gemini_api_key, model, and optionally pro_model.
 745  
 746      Returns:
 747          A dictionary containing:
 748          - "status": "success" or "error".
 749          - "message": A descriptive message about the outcome.
 750          - "output_filename": The name of the saved image artifact (if successful).
 751          - "output_version": The version of the saved image artifact (if successful).
 752          - "result_preview": A brief preview message (if successful).
 753          - "model_used": The model that was used for generation (if successful).
 754          - "used_pro_model": Whether the pro model was used (if successful).
 755      """
 756      log_identifier = "[ImageTools:generate_image_with_gemini]"
 757      if not tool_context:
 758          log.error(f"{log_identifier} ToolContext is missing.")
 759          return {"status": "error", "message": "ToolContext is missing."}
 760  
 761      try:
 762          try:
 763              from google import genai
 764              from google.genai import types
 765              from PIL import Image as PILImage
 766              from io import BytesIO
 767          except ImportError as ie:
 768              log.error(f"{log_identifier} Required dependencies not available: {ie}")
 769              return {
 770                  "status": "error",
 771                  "message": f"Required dependencies not available: {ie}",
 772              }
 773  
 774          inv_context = tool_context._invocation_context
 775          if not inv_context:
 776              raise ValueError("InvocationContext is not available.")
 777  
 778          app_name = getattr(inv_context, "app_name", None)
 779          user_id = getattr(inv_context, "user_id", None)
 780          session_id = get_original_session_id(inv_context)
 781          artifact_service = getattr(inv_context, "artifact_service", None)
 782  
 783          if not all([app_name, user_id, session_id, artifact_service]):
 784              missing_parts = [
 785                  part
 786                  for part, val in [
 787                      ("app_name", app_name),
 788                      ("user_id", user_id),
 789                      ("session_id", session_id),
 790                      ("artifact_service", artifact_service),
 791                  ]
 792                  if not val
 793              ]
 794              raise ValueError(
 795                  f"Missing required context parts: {', '.join(missing_parts)}"
 796              )
 797  
 798          log.info(
 799              f"{log_identifier} Processing image generation request for session {session_id}."
 800          )
 801  
 802          current_tool_config = _resolve_tool_config(
 803              tool_config, log_identifier, required_keys=["gemini_api_key"]
 804          )
 805          gemini_api_key = current_tool_config.get("gemini_api_key")
 806          # Standard model - optimized for speed, efficiency, and lower cost
 807          default_model = current_tool_config.get(
 808              "model", "gemini-3.1-flash-image-preview"
 809          )
 810          # Pro model - for professional asset production with advanced reasoning,
 811          # high-fidelity text rendering, and up to 4K resolution. More expensive.
 812          pro_model = current_tool_config.get(
 813              "pro_model", "gemini-3-pro-image-preview"
 814          )
 815  
 816          # Model selection is determined by the calling LLM via use_pro_model parameter
 817          model_name = pro_model if use_pro_model else default_model
 818          
 819          log.info(
 820              f"{log_identifier} Model selection: using {'pro' if use_pro_model else 'standard'} model "
 821              f"({model_name})"
 822          )
 823  
 824          try:
 825              client = genai.Client(api_key=gemini_api_key)
 826              log.debug(f"{log_identifier} Initialized Gemini client")
 827          except Exception as e:
 828              raise ValueError(f"Failed to initialize Gemini client: {e}")
 829  
 830          log.debug(
 831              f"{log_identifier} Calling Gemini API with prompt: '{image_description[:100]}...'"
 832          )
 833  
 834          try:
 835              response = await asyncio.to_thread(
 836                  client.models.generate_content,
 837                  model=model_name,
 838                  contents=[image_description],
 839                  config=types.GenerateContentConfig(
 840                      response_modalities=["TEXT", "IMAGE"]
 841                  ),
 842              )
 843              log.debug(f"{log_identifier} Gemini API response received.")
 844          except Exception as e:
 845              raise ValueError(f"Gemini API call failed: {e}")
 846  
 847          generated_image_bytes = None
 848          response_text = None
 849  
 850          if not response.candidates or not response.candidates[0].content.parts:
 851              raise ValueError("Gemini API did not return valid content.")
 852  
 853          for part in response.candidates[0].content.parts:
 854              if part.text is not None:
 855                  response_text = part.text
 856                  log.debug(
 857                      f"{log_identifier} Received text response: {response_text[:100]}..."
 858                  )
 859              elif part.inline_data is not None:
 860                  generated_pil_image = PILImage.open(BytesIO(part.inline_data.data))
 861                  output_buffer = BytesIO()
 862                  # Save as PNG for generated images
 863                  generated_pil_image.save(output_buffer, format="PNG")
 864                  generated_image_bytes = output_buffer.getvalue()
 865                  log.debug(
 866                      f"{log_identifier} Processed generated image: {len(generated_image_bytes)} bytes"
 867                  )
 868  
 869          if not generated_image_bytes:
 870              raise ValueError("No image data received from Gemini API.")
 871  
 872          final_output_filename = ""
 873          if output_filename:
 874              sane_filename = os.path.basename(output_filename)
 875              if not sane_filename.lower().endswith(".png"):
 876                  final_output_filename = f"{sane_filename}.png"
 877              else:
 878                  final_output_filename = sane_filename
 879          else:
 880              final_output_filename = f"generated_image_{uuid.uuid4()}.png"
 881  
 882          log.debug(
 883              f"{log_identifier} Determined output filename: {final_output_filename}"
 884          )
 885  
 886          output_mime_type = "image/png"
 887          current_timestamp_iso = datetime.now(timezone.utc).isoformat()
 888  
 889          metadata_dict = {
 890              "description": f"Image generated from prompt: {image_description}",
 891              "source_prompt": image_description,
 892              "generation_tool": "gemini",
 893              "generation_model": model_name,
 894              "used_pro_model": use_pro_model,
 895              "request_timestamp": current_timestamp_iso,
 896              "original_requested_filename": (
 897                  output_filename if output_filename else "N/A"
 898              ),
 899          }
 900          if response_text:
 901              metadata_dict["gemini_response_text"] = response_text
 902  
 903          log.info(
 904              f"{log_identifier} Saving generated image artifact '{final_output_filename}' with mime_type '{output_mime_type}'."
 905          )
 906          save_result = await save_artifact_with_metadata(
 907              artifact_service=artifact_service,
 908              app_name=app_name,
 909              user_id=user_id,
 910              session_id=session_id,
 911              filename=final_output_filename,
 912              content_bytes=generated_image_bytes,
 913              mime_type=output_mime_type,
 914              metadata_dict=metadata_dict,
 915              timestamp=datetime.now(timezone.utc),
 916              schema_max_keys=DEFAULT_SCHEMA_MAX_KEYS,
 917              tool_context=tool_context,
 918          )
 919  
 920          if save_result.get("status") == "error":
 921              raise IOError(
 922                  f"Failed to save generated image artifact: {save_result.get('message', 'Unknown error')}"
 923              )
 924  
 925          log.info(
 926              f"{log_identifier} Generated image artifact '{final_output_filename}' v{save_result['data_version']} saved successfully."
 927          )
 928  
 929          return {
 930              "status": "success",
 931              "message": "Image generated and saved successfully.",
 932              "output_filename": final_output_filename,
 933              "output_version": save_result["data_version"],
 934              "result_preview": f"Image '{final_output_filename}' (v{save_result['data_version']}) created from prompt: \"{image_description[:50]}...\"",
 935              "model_used": model_name,
 936              "used_pro_model": use_pro_model,
 937          }
 938  
 939      except ValueError as ve:
 940          log.error(f"{log_identifier} Value error: {ve}")
 941          return {"status": "error", "message": str(ve)}
 942      except IOError as ioe:
 943          log.error(f"{log_identifier} IO error: {ioe}")
 944          return {"status": "error", "message": str(ioe)}
 945      except Exception as e:
 946          log.exception(
 947              f"{log_identifier} Unexpected error in generate_image_with_gemini: {e}"
 948          )
 949          return {"status": "error", "message": f"An unexpected error occurred: {e}"}
 950  
 951  
 952  create_image_from_description_tool_def = BuiltinTool(
 953      name="create_image_from_description",
 954      implementation=create_image_from_description,
 955      description="Generates an image based on a textual description using a configured image generation model (e.g., via LiteLLM) and saves it as a PNG artifact.",
 956      category="image",
 957      required_scopes=["tool:image:create"],
 958      parameters=adk_types.Schema(
 959          type=adk_types.Type.OBJECT,
 960          properties={
 961              "image_description": adk_types.Schema(
 962                  type=adk_types.Type.STRING,
 963                  description="The textual prompt to use for image generation.",
 964              ),
 965              "output_filename": adk_types.Schema(
 966                  type=adk_types.Type.STRING,
 967                  description="Optional. The desired filename for the output PNG image.",
 968                  nullable=True,
 969              ),
 970          },
 971          required=["image_description"],
 972      ),
 973      examples=[],
 974  )
 975  
 976  describe_image_tool_def = BuiltinTool(
 977      name="describe_image",
 978      implementation=describe_image,
 979      description="Describes an image using an OpenAI-compatible vision API.",
 980      category="image",
 981      required_scopes=["tool:image:describe"],
 982      parameters=adk_types.Schema(
 983          type=adk_types.Type.OBJECT,
 984          properties={
 985              "input_image": adk_types.Schema(
 986                  type=adk_types.Type.STRING,
 987                  description="The filename (and optional :version) of the input image artifact.",
 988              ),
 989              "prompt": adk_types.Schema(
 990                  type=adk_types.Type.STRING,
 991                  description="Custom prompt for image analysis.",
 992                  nullable=True,
 993              ),
 994          },
 995          required=["input_image"],
 996      ),
 997      examples=[],
 998  )
 999  
1000  describe_audio_tool_def = BuiltinTool(
1001      name="describe_audio",
1002      implementation=describe_audio,
1003      description="Describes an audio recording using a multimodal API.",
1004      category="image",
1005      required_scopes=["tool:audio:describe"],
1006      parameters=adk_types.Schema(
1007          type=adk_types.Type.OBJECT,
1008          properties={
1009              "input_audio": adk_types.Schema(
1010                  type=adk_types.Type.STRING,
1011                  description="The filename (and optional :version) of the input audio artifact.",
1012              ),
1013              "prompt": adk_types.Schema(
1014                  type=adk_types.Type.STRING,
1015                  description="Custom prompt for audio analysis.",
1016                  nullable=True,
1017              ),
1018          },
1019          required=["input_audio"],
1020      ),
1021      examples=[],
1022  )
1023  
1024  edit_image_with_gemini_tool_def = BuiltinTool(
1025      name="edit_image_with_gemini",
1026      implementation=edit_image_with_gemini,
1027      description=(
1028          "Edits an existing image based on a text prompt using Google's Gemini image generation models. "
1029          "Two models are available: a standard model (fast, efficient, and cheaper) and a pro model "
1030          "(professional quality but more expensive). Use the pro model only when truly necessary for "
1031          "complex tasks like infographics, charts, diagrams, or images requiring precise text placement."
1032      ),
1033      category="image",
1034      required_scopes=["tool:image:edit"],
1035      parameters=adk_types.Schema(
1036          type=adk_types.Type.OBJECT,
1037          properties={
1038              "input_image": adk_types.Schema(
1039                  type=adk_types.Type.STRING,
1040                  description="The filename (and optional :version) of the input image artifact.",
1041              ),
1042              "edit_prompt": adk_types.Schema(
1043                  type=adk_types.Type.STRING,
1044                  description="Text description of the desired edits to apply to the image.",
1045              ),
1046              "output_filename": adk_types.Schema(
1047                  type=adk_types.Type.STRING,
1048                  description="Optional. The desired filename for the output edited image.",
1049                  nullable=True,
1050              ),
1051              "use_pro_model": adk_types.Schema(
1052                  type=adk_types.Type.BOOLEAN,
1053                  description=(
1054                      "Set to true to use the pro model for professional quality output with advanced reasoning, "
1055                      "high-fidelity text rendering, and up to 4K resolution. The pro model is MORE EXPENSIVE, "
1056                      "so only use it when truly necessary for: infographics, charts, diagrams, technical "
1057                      "illustrations, or complex visual content requiring precise text placement. "
1058                      "Set to false (default) to use the standard model which is faster, efficient, and cheaper."
1059                  ),
1060                  nullable=True,
1061              ),
1062          },
1063          required=["input_image", "edit_prompt"],
1064      ),
1065      examples=[],
1066  )
1067  
1068  generate_image_with_gemini_tool_def = BuiltinTool(
1069      name="generate_image_with_gemini",
1070      implementation=generate_image_with_gemini,
1071      description=(
1072          "Generates an image from a text description using Google's Gemini image generation models. "
1073          "Two models are available: a standard model (fast, efficient, and cheaper) and a pro model "
1074          "(professional quality but more expensive). Use the pro model only when truly necessary for "
1075          "complex tasks like infographics, charts, diagrams, or images requiring precise text placement."
1076      ),
1077      category="image",
1078      required_scopes=["tool:image:create"],
1079      parameters=adk_types.Schema(
1080          type=adk_types.Type.OBJECT,
1081          properties={
1082              "image_description": adk_types.Schema(
1083                  type=adk_types.Type.STRING,
1084                  description="The textual prompt to use for image generation.",
1085              ),
1086              "output_filename": adk_types.Schema(
1087                  type=adk_types.Type.STRING,
1088                  description="Optional. The desired filename for the output PNG image.",
1089                  nullable=True,
1090              ),
1091              "use_pro_model": adk_types.Schema(
1092                  type=adk_types.Type.BOOLEAN,
1093                  description=(
1094                      "Set to true to use the pro model for professional quality output with advanced reasoning, "
1095                      "high-fidelity text rendering, and up to 4K resolution. The pro model is MORE EXPENSIVE, "
1096                      "so only use it when truly necessary for: infographics, charts, diagrams, technical "
1097                      "illustrations, or complex visual content requiring precise text placement. "
1098                      "Set to false (default) to use the standard model which is faster, efficient, and cheaper."
1099                  ),
1100                  nullable=True,
1101              ),
1102          },
1103          required=["image_description"],
1104      ),
1105      examples=[],
1106  )
1107  
1108  tool_registry.register(create_image_from_description_tool_def)
1109  tool_registry.register(describe_image_tool_def)
1110  tool_registry.register(describe_audio_tool_def)
1111  tool_registry.register(edit_image_with_gemini_tool_def)
1112  tool_registry.register(generate_image_with_gemini_tool_def)