tool_result_processor.py
1 """ 2 Processes ToolResult objects, handling artifact storage and response transformation. 3 4 This processor is invoked in the after_tool_callback chain and handles: 5 1. Detection of ToolResult vs raw dict returns 6 2. Processing DataObjects based on their disposition 7 3. Storing artifacts with metadata 8 4. Constructing the final dict response for the LLM 9 """ 10 11 import logging 12 import uuid 13 from datetime import datetime, timezone 14 from typing import Any, Dict, List, Optional, TYPE_CHECKING 15 16 from google.adk.tools import ToolContext 17 18 from ..tools.tool_result import ToolResult, DataObject, DataDisposition 19 from ..utils.artifact_helpers import ( 20 save_artifact_with_metadata, 21 is_filename_safe, 22 DEFAULT_SCHEMA_MAX_KEYS, 23 ) 24 from ..utils.context_helpers import get_original_session_id 25 from ...common.utils.mime_helpers import is_text_based_mime_type 26 27 if TYPE_CHECKING: 28 from ..sac.component import SamAgentComponent 29 30 log = logging.getLogger(__name__) 31 32 33 # Default configuration values 34 DEFAULT_AUTO_ARTIFACT_THRESHOLD_BYTES = 4096 35 DEFAULT_INLINE_TRUNCATION_BYTES = 8192 36 DEFAULT_PREVIEW_LENGTH_CHARS = 500 37 38 39 class ToolResultProcessorConfig: 40 """Configuration for ToolResultProcessor behavior.""" 41 42 def __init__( 43 self, 44 auto_artifact_threshold_bytes: int = DEFAULT_AUTO_ARTIFACT_THRESHOLD_BYTES, 45 inline_truncation_bytes: int = DEFAULT_INLINE_TRUNCATION_BYTES, 46 preview_length_chars: int = DEFAULT_PREVIEW_LENGTH_CHARS, 47 enabled: bool = True, 48 ): 49 self.auto_artifact_threshold_bytes = auto_artifact_threshold_bytes 50 self.inline_truncation_bytes = inline_truncation_bytes 51 self.preview_length_chars = preview_length_chars 52 self.enabled = enabled 53 54 @classmethod 55 def from_component(cls, host_component: "SamAgentComponent") -> "ToolResultProcessorConfig": 56 """Load configuration from host component.""" 57 return cls( 58 auto_artifact_threshold_bytes=host_component.get_config( 59 "tool_result_auto_artifact_threshold_bytes", 60 DEFAULT_AUTO_ARTIFACT_THRESHOLD_BYTES, 61 ), 62 inline_truncation_bytes=host_component.get_config( 63 "tool_result_inline_truncation_bytes", 64 DEFAULT_INLINE_TRUNCATION_BYTES, 65 ), 66 preview_length_chars=host_component.get_config( 67 "tool_result_preview_length_chars", 68 DEFAULT_PREVIEW_LENGTH_CHARS, 69 ), 70 enabled=host_component.get_config( 71 "tool_result_processing_enabled", 72 True, 73 ), 74 ) 75 76 77 class ToolResultProcessor: 78 """ 79 Processes ToolResult objects, converting DataObjects to artifacts as needed. 80 81 This processor handles: 82 - Detection of ToolResult instances vs raw dict returns 83 - Processing DataObjects based on their disposition (AUTO, ARTIFACT, INLINE, etc.) 84 - Storing artifacts with metadata 85 - Generating previews for large content 86 - Constructing the final dict response for the LLM 87 """ 88 89 def __init__( 90 self, 91 host_component: "SamAgentComponent", 92 config: Optional[ToolResultProcessorConfig] = None, 93 ): 94 self.host_component = host_component 95 self.config = config or ToolResultProcessorConfig.from_component(host_component) 96 self.log_identifier = "[ToolResultProcessor]" 97 98 async def process( 99 self, 100 tool_response: Any, 101 tool_context: ToolContext, 102 tool_name: str, 103 ) -> Dict[str, Any]: 104 """ 105 Process a tool response, handling ToolResult or passing through raw dicts. 106 107 Args: 108 tool_response: The raw response from the tool (ToolResult, dict, or other) 109 tool_context: The ADK ToolContext for accessing services 110 tool_name: Name of the tool for logging 111 112 Returns: 113 A dictionary suitable for return to the LLM 114 """ 115 log_id = f"{self.log_identifier}:{tool_name}" 116 117 # If processing is disabled, pass through 118 if not self.config.enabled: 119 log.debug("%s ToolResult processing is disabled, passing through", log_id) 120 return tool_response 121 122 # Handle None responses 123 if tool_response is None: 124 return None 125 126 # Check if this is a ToolResult instance 127 if isinstance(tool_response, ToolResult): 128 log.info("%s Processing ToolResult object", log_id) 129 return await self._process_tool_result(tool_response, tool_context, log_id) 130 131 # Check if this is a dict with data_objects (duck-typing ToolResult) 132 if isinstance(tool_response, dict) and "data_objects" in tool_response: 133 if isinstance(tool_response.get("data_objects"), list): 134 log.info("%s Detected dict with data_objects, converting to ToolResult", log_id) 135 try: 136 tool_result = ToolResult(**tool_response) 137 return await self._process_tool_result(tool_result, tool_context, log_id) 138 except Exception as e: 139 log.warning( 140 "%s Failed to convert dict to ToolResult: %s. Passing through.", 141 log_id, 142 e, 143 ) 144 145 # Pass through raw dict responses unchanged (backward compatibility) 146 if isinstance(tool_response, dict): 147 log.debug("%s Passing through raw dict response", log_id) 148 return tool_response 149 150 # Wrap non-dict responses in a dict 151 log.debug("%s Wrapping non-dict response (type: %s)", log_id, type(tool_response).__name__) 152 return {"result": tool_response} 153 154 async def _process_tool_result( 155 self, 156 result: ToolResult, 157 tool_context: ToolContext, 158 log_id: str, 159 ) -> Dict[str, Any]: 160 """Process a ToolResult, handling all DataObjects.""" 161 final_response: Dict[str, Any] = { 162 "status": result.status, 163 } 164 165 if result.message: 166 final_response["message"] = result.message 167 168 if result.error_code: 169 final_response["error_code"] = result.error_code 170 171 # Include inline data if present 172 if result.data: 173 final_response.update(result.data) 174 175 # Process each DataObject 176 if result.data_objects: 177 artifacts_created: List[Dict[str, Any]] = [] 178 inline_contents: List[Dict[str, Any]] = [] 179 180 for data_obj in result.data_objects: 181 processed = await self._process_data_object(data_obj, tool_context, log_id) 182 183 if processed.get("stored_as_artifact"): 184 artifacts_created.append({ 185 "filename": processed.get("filename"), 186 "version": processed.get("version"), 187 "description": processed.get("description"), 188 "mime_type": processed.get("mime_type"), 189 "size_bytes": processed.get("size_bytes"), 190 "preview": processed.get("preview"), 191 }) 192 else: 193 inline_contents.append({ 194 "name": processed.get("name"), 195 "content": processed.get("content"), 196 "truncated": processed.get("truncated", False), 197 }) 198 199 # Add artifact info to response 200 if artifacts_created: 201 final_response["artifacts_created"] = artifacts_created 202 203 # Add inline content to response 204 if inline_contents: 205 if len(inline_contents) == 1: 206 # Single inline content - add directly 207 final_response["content"] = inline_contents[0]["content"] 208 if inline_contents[0].get("truncated"): 209 final_response["content_truncated"] = True 210 else: 211 # Multiple inline contents 212 final_response["inline_outputs"] = inline_contents 213 214 inline_count = len(final_response.get("inline_outputs", [])) 215 inline_count += 1 if "content" in final_response else 0 216 log.debug( 217 "%s Processed ToolResult: status=%s, artifacts=%d, inline=%d", 218 log_id, 219 result.status, 220 len(final_response.get("artifacts_created", [])), 221 inline_count, 222 ) 223 224 return final_response 225 226 async def _process_data_object( 227 self, 228 data_obj: DataObject, 229 tool_context: ToolContext, 230 log_id: str, 231 ) -> Dict[str, Any]: 232 """Process a single DataObject based on its disposition.""" 233 # Resolve AUTO disposition 234 disposition = self._resolve_disposition(data_obj) 235 236 log.debug( 237 "%s Processing DataObject '%s': disposition=%s (resolved from %s)", 238 log_id, 239 data_obj.name, 240 disposition, 241 data_obj.disposition, 242 ) 243 244 if disposition in (DataDisposition.ARTIFACT, DataDisposition.ARTIFACT_WITH_PREVIEW): 245 return await self._store_as_artifact(data_obj, tool_context, log_id, disposition) 246 else: 247 return self._return_inline(data_obj, log_id) 248 249 def _resolve_disposition(self, data_obj: DataObject) -> DataDisposition: 250 """Resolve AUTO disposition to a concrete disposition.""" 251 if data_obj.disposition != DataDisposition.AUTO: 252 return DataDisposition(data_obj.disposition) 253 254 # Get content size 255 content = data_obj.content 256 if isinstance(content, bytes): 257 content_size = len(content) 258 else: 259 content_size = len(content.encode("utf-8")) if content else 0 260 261 # Binary content always goes to artifact 262 if isinstance(content, bytes): 263 return DataDisposition.ARTIFACT_WITH_PREVIEW 264 265 # Non-text MIME types go to artifact 266 if not is_text_based_mime_type(data_obj.mime_type): 267 return DataDisposition.ARTIFACT_WITH_PREVIEW 268 269 # Large text content goes to artifact with preview 270 if content_size > self.config.auto_artifact_threshold_bytes: 271 return DataDisposition.ARTIFACT_WITH_PREVIEW 272 273 # Small text content stays inline 274 return DataDisposition.INLINE 275 276 async def _store_as_artifact( 277 self, 278 data_obj: DataObject, 279 tool_context: ToolContext, 280 log_id: str, 281 disposition: DataDisposition, 282 ) -> Dict[str, Any]: 283 """Store a DataObject as an artifact.""" 284 # Validate filename 285 filename = data_obj.name 286 if not filename: 287 filename = self._generate_filename(data_obj.mime_type) 288 log.debug("%s Generated filename: %s", log_id, filename) 289 290 if not is_filename_safe(filename): 291 log.warning("%s Invalid filename '%s', cannot store as artifact", log_id, filename) 292 return { 293 "name": data_obj.name, 294 "stored_as_artifact": False, 295 "error": f"Invalid filename: {filename}", 296 } 297 298 try: 299 inv_context = tool_context._invocation_context 300 artifact_service = inv_context.artifact_service 301 302 if not artifact_service: 303 raise ValueError("ArtifactService not available in context") 304 305 # Prepare content bytes 306 content = data_obj.content 307 if isinstance(content, bytes): 308 content_bytes = content 309 else: 310 content_bytes = content.encode("utf-8") if content else b"" 311 312 content_size = len(content_bytes) 313 314 # Prepare metadata 315 metadata_dict = dict(data_obj.metadata) if data_obj.metadata else {} 316 if data_obj.description: 317 metadata_dict["description"] = data_obj.description 318 metadata_dict["source"] = "tool_result" 319 320 # Save artifact 321 result = await save_artifact_with_metadata( 322 artifact_service=artifact_service, 323 app_name=inv_context.app_name, 324 user_id=inv_context.user_id, 325 session_id=get_original_session_id(inv_context), 326 filename=filename, 327 content_bytes=content_bytes, 328 mime_type=data_obj.mime_type, 329 metadata_dict=metadata_dict, 330 timestamp=datetime.now(timezone.utc), 331 schema_max_keys=DEFAULT_SCHEMA_MAX_KEYS, 332 tags=data_obj.tags or None, 333 tool_context=tool_context, 334 ) 335 336 # Generate preview if needed 337 preview = None 338 if disposition == DataDisposition.ARTIFACT_WITH_PREVIEW: 339 preview = data_obj.preview or self._generate_preview(data_obj) 340 341 version = result.get("data_version") 342 log.info( 343 "%s Stored artifact '%s' version %s (%d bytes)", 344 log_id, 345 filename, 346 version, 347 content_size, 348 ) 349 350 return { 351 "name": data_obj.name, 352 "filename": filename, 353 "stored_as_artifact": True, 354 "version": version, 355 "description": data_obj.description, 356 "mime_type": data_obj.mime_type, 357 "size_bytes": content_size, 358 "preview": preview, 359 } 360 361 except Exception as e: 362 log.exception("%s Failed to store artifact '%s': %s", log_id, filename, e) 363 return { 364 "name": data_obj.name, 365 "stored_as_artifact": False, 366 "error": str(e), 367 } 368 369 def _return_inline(self, data_obj: DataObject, log_id: str) -> Dict[str, Any]: 370 """Return DataObject content inline, potentially truncated.""" 371 content = data_obj.content 372 truncated = False 373 374 if isinstance(content, str): 375 if len(content) > self.config.inline_truncation_bytes: 376 content = content[: self.config.inline_truncation_bytes] + "...[truncated]" 377 truncated = True 378 log.debug( 379 "%s Truncated inline content for '%s' to %d bytes", 380 log_id, 381 data_obj.name, 382 self.config.inline_truncation_bytes, 383 ) 384 elif isinstance(content, bytes): 385 # Binary content should not be returned inline - this shouldn't happen 386 # if disposition resolution is working correctly 387 content = f"[Binary content: {len(content)} bytes - use ARTIFACT disposition]" 388 log.warning( 389 "%s Binary content returned inline for '%s'. Consider using ARTIFACT disposition.", 390 log_id, 391 data_obj.name, 392 ) 393 394 return { 395 "name": data_obj.name, 396 "stored_as_artifact": False, 397 "content": content, 398 "truncated": truncated, 399 } 400 401 def _generate_preview(self, data_obj: DataObject) -> str: 402 """Generate a preview string for content.""" 403 # Use explicit preview if provided 404 if data_obj.preview: 405 return data_obj.preview 406 407 content = data_obj.content 408 409 # Binary content - just describe it 410 if isinstance(content, bytes): 411 return f"[Binary content: {len(content)} bytes, mime_type: {data_obj.mime_type}]" 412 413 # Text content - truncate 414 if not content: 415 return "[Empty content]" 416 417 if len(content) <= self.config.preview_length_chars: 418 return content 419 420 return content[: self.config.preview_length_chars] + "..." 421 422 def _generate_filename(self, mime_type: str) -> str: 423 """Generate a filename based on mime type.""" 424 ext_map = { 425 "text/plain": ".txt", 426 "text/csv": ".csv", 427 "text/html": ".html", 428 "text/markdown": ".md", 429 "application/json": ".json", 430 "application/xml": ".xml", 431 "text/xml": ".xml", 432 "application/yaml": ".yaml", 433 "text/yaml": ".yaml", 434 "image/png": ".png", 435 "image/jpeg": ".jpg", 436 "image/gif": ".gif", 437 "image/svg+xml": ".svg", 438 "application/pdf": ".pdf", 439 "audio/mpeg": ".mp3", 440 "audio/wav": ".wav", 441 } 442 ext = ext_map.get(mime_type, ".bin") 443 unique_id = uuid.uuid4().hex[:8] 444 return f"output_{unique_id}{ext}"