/ src / solace_agent_mesh / agent / adk / tool_result_processor.py
tool_result_processor.py
  1  """
  2  Processes ToolResult objects, handling artifact storage and response transformation.
  3  
  4  This processor is invoked in the after_tool_callback chain and handles:
  5  1. Detection of ToolResult vs raw dict returns
  6  2. Processing DataObjects based on their disposition
  7  3. Storing artifacts with metadata
  8  4. Constructing the final dict response for the LLM
  9  """
 10  
 11  import logging
 12  import uuid
 13  from datetime import datetime, timezone
 14  from typing import Any, Dict, List, Optional, TYPE_CHECKING
 15  
 16  from google.adk.tools import ToolContext
 17  
 18  from ..tools.tool_result import ToolResult, DataObject, DataDisposition
 19  from ..utils.artifact_helpers import (
 20      save_artifact_with_metadata,
 21      is_filename_safe,
 22      DEFAULT_SCHEMA_MAX_KEYS,
 23  )
 24  from ..utils.context_helpers import get_original_session_id
 25  from ...common.utils.mime_helpers import is_text_based_mime_type
 26  
 27  if TYPE_CHECKING:
 28      from ..sac.component import SamAgentComponent
 29  
 30  log = logging.getLogger(__name__)
 31  
 32  
 33  # Default configuration values
 34  DEFAULT_AUTO_ARTIFACT_THRESHOLD_BYTES = 4096
 35  DEFAULT_INLINE_TRUNCATION_BYTES = 8192
 36  DEFAULT_PREVIEW_LENGTH_CHARS = 500
 37  
 38  
 39  class ToolResultProcessorConfig:
 40      """Configuration for ToolResultProcessor behavior."""
 41  
 42      def __init__(
 43          self,
 44          auto_artifact_threshold_bytes: int = DEFAULT_AUTO_ARTIFACT_THRESHOLD_BYTES,
 45          inline_truncation_bytes: int = DEFAULT_INLINE_TRUNCATION_BYTES,
 46          preview_length_chars: int = DEFAULT_PREVIEW_LENGTH_CHARS,
 47          enabled: bool = True,
 48      ):
 49          self.auto_artifact_threshold_bytes = auto_artifact_threshold_bytes
 50          self.inline_truncation_bytes = inline_truncation_bytes
 51          self.preview_length_chars = preview_length_chars
 52          self.enabled = enabled
 53  
 54      @classmethod
 55      def from_component(cls, host_component: "SamAgentComponent") -> "ToolResultProcessorConfig":
 56          """Load configuration from host component."""
 57          return cls(
 58              auto_artifact_threshold_bytes=host_component.get_config(
 59                  "tool_result_auto_artifact_threshold_bytes",
 60                  DEFAULT_AUTO_ARTIFACT_THRESHOLD_BYTES,
 61              ),
 62              inline_truncation_bytes=host_component.get_config(
 63                  "tool_result_inline_truncation_bytes",
 64                  DEFAULT_INLINE_TRUNCATION_BYTES,
 65              ),
 66              preview_length_chars=host_component.get_config(
 67                  "tool_result_preview_length_chars",
 68                  DEFAULT_PREVIEW_LENGTH_CHARS,
 69              ),
 70              enabled=host_component.get_config(
 71                  "tool_result_processing_enabled",
 72                  True,
 73              ),
 74          )
 75  
 76  
 77  class ToolResultProcessor:
 78      """
 79      Processes ToolResult objects, converting DataObjects to artifacts as needed.
 80  
 81      This processor handles:
 82      - Detection of ToolResult instances vs raw dict returns
 83      - Processing DataObjects based on their disposition (AUTO, ARTIFACT, INLINE, etc.)
 84      - Storing artifacts with metadata
 85      - Generating previews for large content
 86      - Constructing the final dict response for the LLM
 87      """
 88  
 89      def __init__(
 90          self,
 91          host_component: "SamAgentComponent",
 92          config: Optional[ToolResultProcessorConfig] = None,
 93      ):
 94          self.host_component = host_component
 95          self.config = config or ToolResultProcessorConfig.from_component(host_component)
 96          self.log_identifier = "[ToolResultProcessor]"
 97  
 98      async def process(
 99          self,
100          tool_response: Any,
101          tool_context: ToolContext,
102          tool_name: str,
103      ) -> Dict[str, Any]:
104          """
105          Process a tool response, handling ToolResult or passing through raw dicts.
106  
107          Args:
108              tool_response: The raw response from the tool (ToolResult, dict, or other)
109              tool_context: The ADK ToolContext for accessing services
110              tool_name: Name of the tool for logging
111  
112          Returns:
113              A dictionary suitable for return to the LLM
114          """
115          log_id = f"{self.log_identifier}:{tool_name}"
116  
117          # If processing is disabled, pass through
118          if not self.config.enabled:
119              log.debug("%s ToolResult processing is disabled, passing through", log_id)
120              return tool_response
121  
122          # Handle None responses
123          if tool_response is None:
124              return None
125  
126          # Check if this is a ToolResult instance
127          if isinstance(tool_response, ToolResult):
128              log.info("%s Processing ToolResult object", log_id)
129              return await self._process_tool_result(tool_response, tool_context, log_id)
130  
131          # Check if this is a dict with data_objects (duck-typing ToolResult)
132          if isinstance(tool_response, dict) and "data_objects" in tool_response:
133              if isinstance(tool_response.get("data_objects"), list):
134                  log.info("%s Detected dict with data_objects, converting to ToolResult", log_id)
135                  try:
136                      tool_result = ToolResult(**tool_response)
137                      return await self._process_tool_result(tool_result, tool_context, log_id)
138                  except Exception as e:
139                      log.warning(
140                          "%s Failed to convert dict to ToolResult: %s. Passing through.",
141                          log_id,
142                          e,
143                      )
144  
145          # Pass through raw dict responses unchanged (backward compatibility)
146          if isinstance(tool_response, dict):
147              log.debug("%s Passing through raw dict response", log_id)
148              return tool_response
149  
150          # Wrap non-dict responses in a dict
151          log.debug("%s Wrapping non-dict response (type: %s)", log_id, type(tool_response).__name__)
152          return {"result": tool_response}
153  
154      async def _process_tool_result(
155          self,
156          result: ToolResult,
157          tool_context: ToolContext,
158          log_id: str,
159      ) -> Dict[str, Any]:
160          """Process a ToolResult, handling all DataObjects."""
161          final_response: Dict[str, Any] = {
162              "status": result.status,
163          }
164  
165          if result.message:
166              final_response["message"] = result.message
167  
168          if result.error_code:
169              final_response["error_code"] = result.error_code
170  
171          # Include inline data if present
172          if result.data:
173              final_response.update(result.data)
174  
175          # Process each DataObject
176          if result.data_objects:
177              artifacts_created: List[Dict[str, Any]] = []
178              inline_contents: List[Dict[str, Any]] = []
179  
180              for data_obj in result.data_objects:
181                  processed = await self._process_data_object(data_obj, tool_context, log_id)
182  
183                  if processed.get("stored_as_artifact"):
184                      artifacts_created.append({
185                          "filename": processed.get("filename"),
186                          "version": processed.get("version"),
187                          "description": processed.get("description"),
188                          "mime_type": processed.get("mime_type"),
189                          "size_bytes": processed.get("size_bytes"),
190                          "preview": processed.get("preview"),
191                      })
192                  else:
193                      inline_contents.append({
194                          "name": processed.get("name"),
195                          "content": processed.get("content"),
196                          "truncated": processed.get("truncated", False),
197                      })
198  
199              # Add artifact info to response
200              if artifacts_created:
201                  final_response["artifacts_created"] = artifacts_created
202  
203              # Add inline content to response
204              if inline_contents:
205                  if len(inline_contents) == 1:
206                      # Single inline content - add directly
207                      final_response["content"] = inline_contents[0]["content"]
208                      if inline_contents[0].get("truncated"):
209                          final_response["content_truncated"] = True
210                  else:
211                      # Multiple inline contents
212                      final_response["inline_outputs"] = inline_contents
213  
214          inline_count = len(final_response.get("inline_outputs", []))
215          inline_count += 1 if "content" in final_response else 0
216          log.debug(
217              "%s Processed ToolResult: status=%s, artifacts=%d, inline=%d",
218              log_id,
219              result.status,
220              len(final_response.get("artifacts_created", [])),
221              inline_count,
222          )
223  
224          return final_response
225  
226      async def _process_data_object(
227          self,
228          data_obj: DataObject,
229          tool_context: ToolContext,
230          log_id: str,
231      ) -> Dict[str, Any]:
232          """Process a single DataObject based on its disposition."""
233          # Resolve AUTO disposition
234          disposition = self._resolve_disposition(data_obj)
235  
236          log.debug(
237              "%s Processing DataObject '%s': disposition=%s (resolved from %s)",
238              log_id,
239              data_obj.name,
240              disposition,
241              data_obj.disposition,
242          )
243  
244          if disposition in (DataDisposition.ARTIFACT, DataDisposition.ARTIFACT_WITH_PREVIEW):
245              return await self._store_as_artifact(data_obj, tool_context, log_id, disposition)
246          else:
247              return self._return_inline(data_obj, log_id)
248  
249      def _resolve_disposition(self, data_obj: DataObject) -> DataDisposition:
250          """Resolve AUTO disposition to a concrete disposition."""
251          if data_obj.disposition != DataDisposition.AUTO:
252              return DataDisposition(data_obj.disposition)
253  
254          # Get content size
255          content = data_obj.content
256          if isinstance(content, bytes):
257              content_size = len(content)
258          else:
259              content_size = len(content.encode("utf-8")) if content else 0
260  
261          # Binary content always goes to artifact
262          if isinstance(content, bytes):
263              return DataDisposition.ARTIFACT_WITH_PREVIEW
264  
265          # Non-text MIME types go to artifact
266          if not is_text_based_mime_type(data_obj.mime_type):
267              return DataDisposition.ARTIFACT_WITH_PREVIEW
268  
269          # Large text content goes to artifact with preview
270          if content_size > self.config.auto_artifact_threshold_bytes:
271              return DataDisposition.ARTIFACT_WITH_PREVIEW
272  
273          # Small text content stays inline
274          return DataDisposition.INLINE
275  
276      async def _store_as_artifact(
277          self,
278          data_obj: DataObject,
279          tool_context: ToolContext,
280          log_id: str,
281          disposition: DataDisposition,
282      ) -> Dict[str, Any]:
283          """Store a DataObject as an artifact."""
284          # Validate filename
285          filename = data_obj.name
286          if not filename:
287              filename = self._generate_filename(data_obj.mime_type)
288              log.debug("%s Generated filename: %s", log_id, filename)
289  
290          if not is_filename_safe(filename):
291              log.warning("%s Invalid filename '%s', cannot store as artifact", log_id, filename)
292              return {
293                  "name": data_obj.name,
294                  "stored_as_artifact": False,
295                  "error": f"Invalid filename: {filename}",
296              }
297  
298          try:
299              inv_context = tool_context._invocation_context
300              artifact_service = inv_context.artifact_service
301  
302              if not artifact_service:
303                  raise ValueError("ArtifactService not available in context")
304  
305              # Prepare content bytes
306              content = data_obj.content
307              if isinstance(content, bytes):
308                  content_bytes = content
309              else:
310                  content_bytes = content.encode("utf-8") if content else b""
311  
312              content_size = len(content_bytes)
313  
314              # Prepare metadata
315              metadata_dict = dict(data_obj.metadata) if data_obj.metadata else {}
316              if data_obj.description:
317                  metadata_dict["description"] = data_obj.description
318              metadata_dict["source"] = "tool_result"
319  
320              # Save artifact
321              result = await save_artifact_with_metadata(
322                  artifact_service=artifact_service,
323                  app_name=inv_context.app_name,
324                  user_id=inv_context.user_id,
325                  session_id=get_original_session_id(inv_context),
326                  filename=filename,
327                  content_bytes=content_bytes,
328                  mime_type=data_obj.mime_type,
329                  metadata_dict=metadata_dict,
330                  timestamp=datetime.now(timezone.utc),
331                  schema_max_keys=DEFAULT_SCHEMA_MAX_KEYS,
332                  tags=data_obj.tags or None,
333                  tool_context=tool_context,
334              )
335  
336              # Generate preview if needed
337              preview = None
338              if disposition == DataDisposition.ARTIFACT_WITH_PREVIEW:
339                  preview = data_obj.preview or self._generate_preview(data_obj)
340  
341              version = result.get("data_version")
342              log.info(
343                  "%s Stored artifact '%s' version %s (%d bytes)",
344                  log_id,
345                  filename,
346                  version,
347                  content_size,
348              )
349  
350              return {
351                  "name": data_obj.name,
352                  "filename": filename,
353                  "stored_as_artifact": True,
354                  "version": version,
355                  "description": data_obj.description,
356                  "mime_type": data_obj.mime_type,
357                  "size_bytes": content_size,
358                  "preview": preview,
359              }
360  
361          except Exception as e:
362              log.exception("%s Failed to store artifact '%s': %s", log_id, filename, e)
363              return {
364                  "name": data_obj.name,
365                  "stored_as_artifact": False,
366                  "error": str(e),
367              }
368  
369      def _return_inline(self, data_obj: DataObject, log_id: str) -> Dict[str, Any]:
370          """Return DataObject content inline, potentially truncated."""
371          content = data_obj.content
372          truncated = False
373  
374          if isinstance(content, str):
375              if len(content) > self.config.inline_truncation_bytes:
376                  content = content[: self.config.inline_truncation_bytes] + "...[truncated]"
377                  truncated = True
378                  log.debug(
379                      "%s Truncated inline content for '%s' to %d bytes",
380                      log_id,
381                      data_obj.name,
382                      self.config.inline_truncation_bytes,
383                  )
384          elif isinstance(content, bytes):
385              # Binary content should not be returned inline - this shouldn't happen
386              # if disposition resolution is working correctly
387              content = f"[Binary content: {len(content)} bytes - use ARTIFACT disposition]"
388              log.warning(
389                  "%s Binary content returned inline for '%s'. Consider using ARTIFACT disposition.",
390                  log_id,
391                  data_obj.name,
392              )
393  
394          return {
395              "name": data_obj.name,
396              "stored_as_artifact": False,
397              "content": content,
398              "truncated": truncated,
399          }
400  
401      def _generate_preview(self, data_obj: DataObject) -> str:
402          """Generate a preview string for content."""
403          # Use explicit preview if provided
404          if data_obj.preview:
405              return data_obj.preview
406  
407          content = data_obj.content
408  
409          # Binary content - just describe it
410          if isinstance(content, bytes):
411              return f"[Binary content: {len(content)} bytes, mime_type: {data_obj.mime_type}]"
412  
413          # Text content - truncate
414          if not content:
415              return "[Empty content]"
416  
417          if len(content) <= self.config.preview_length_chars:
418              return content
419  
420          return content[: self.config.preview_length_chars] + "..."
421  
422      def _generate_filename(self, mime_type: str) -> str:
423          """Generate a filename based on mime type."""
424          ext_map = {
425              "text/plain": ".txt",
426              "text/csv": ".csv",
427              "text/html": ".html",
428              "text/markdown": ".md",
429              "application/json": ".json",
430              "application/xml": ".xml",
431              "text/xml": ".xml",
432              "application/yaml": ".yaml",
433              "text/yaml": ".yaml",
434              "image/png": ".png",
435              "image/jpeg": ".jpg",
436              "image/gif": ".gif",
437              "image/svg+xml": ".svg",
438              "application/pdf": ".pdf",
439              "audio/mpeg": ".mp3",
440              "audio/wav": ".wav",
441          }
442          ext = ext_map.get(mime_type, ".bin")
443          unique_id = uuid.uuid4().hex[:8]
444          return f"output_{unique_id}{ext}"