/ src / solace_agent_mesh / agent / utils / tool_context_facade.py
tool_context_facade.py
  1  """
  2  Provides a clean, simplified interface for tool authors to access context and artifacts.
  3  
  4  The ToolContextFacade hides all the boilerplate of extracting session info,
  5  accessing the artifact service, managing context, and sending status updates
  6  from the raw ADK ToolContext.
  7  
  8  Example usage:
  9      from solace_agent_mesh.agent.utils import ToolContextFacade
 10      from solace_agent_mesh.agent.tools import ToolResult, DataObject
 11  
 12      async def my_tool(filename: str, ctx: ToolContextFacade) -> ToolResult:
 13          # Send status updates - no boilerplate!
 14          ctx.send_status("Loading artifact...")
 15  
 16          # Load artifact - no boilerplate!
 17          data = await ctx.load_artifact(filename, as_text=True)
 18  
 19          # Access context properties
 20          print(f"User: {ctx.user_id}, Session: {ctx.session_id}")
 21  
 22          # Send progress update
 23          ctx.send_status("Processing data...")
 24  
 25          # Process and return
 26          result = process(data)
 27          return ToolResult.ok(
 28              "Done",
 29              data_objects=[DataObject(name="output.json", content=result)]
 30          )
 31  """
 32  
 33  import logging
 34  from typing import Any, Dict, List, Optional, Union, TYPE_CHECKING
 35  
 36  from google.adk.tools import ToolContext
 37  
 38  from .artifact_helpers import load_artifact_content_or_metadata
 39  from .context_helpers import get_original_session_id
 40  
 41  if TYPE_CHECKING:
 42      from google.adk.artifacts import BaseArtifactService
 43      from pydantic import BaseModel
 44  
 45  log = logging.getLogger(__name__)
 46  
 47  
 48  class ToolContextFacade:
 49      """
 50      A simplified interface for tool authors to access context, artifacts, and status updates.
 51  
 52      This facade provides:
 53      - Easy access to session/user/app context
 54      - Simplified artifact loading (content and metadata)
 55      - Artifact listing
 56      - Tool configuration access
 57      - Status update methods for sending progress to the frontend
 58  
 59      Note: This facade is intentionally read-only for artifact operations.
 60      All artifact saving should be done through the ToolResult/DataObject pattern,
 61      which provides a single, clear path for artifact creation.
 62      """
 63  
 64      def __init__(
 65          self,
 66          tool_context: ToolContext,
 67          tool_config: Optional[Dict[str, Any]] = None,
 68      ):
 69          """
 70          Initialize the facade.
 71  
 72          Args:
 73              tool_context: The raw ADK ToolContext
 74              tool_config: Optional tool-specific configuration dict
 75          """
 76          self._ctx = tool_context
 77          self._tool_config = tool_config or {}
 78  
 79          # Cache context values for efficiency
 80          self._session_id: Optional[str] = None
 81          self._user_id: Optional[str] = None
 82          self._app_name: Optional[str] = None
 83          self._artifact_service: Optional["BaseArtifactService"] = None
 84          self._host_component: Optional[Any] = None
 85          self._host_component_resolved: bool = False
 86  
 87      def _ensure_context(self) -> None:
 88          """Extract and cache context values from the invocation context."""
 89          if self._session_id is not None:
 90              return  # Already cached
 91  
 92          try:
 93              inv_context = self._ctx._invocation_context
 94              self._artifact_service = inv_context.artifact_service
 95              self._app_name = inv_context.app_name
 96              self._user_id = inv_context.user_id
 97              self._session_id = get_original_session_id(inv_context)
 98          except AttributeError as e:
 99              log.warning(
100                  "[ToolContextFacade] Could not extract context: %s", e
101              )
102              # Set defaults to avoid repeated attempts
103              self._session_id = ""
104              self._user_id = ""
105              self._app_name = ""
106  
107      @property
108      def session_id(self) -> str:
109          """Get the current session ID."""
110          self._ensure_context()
111          return self._session_id or ""
112  
113      @property
114      def user_id(self) -> str:
115          """Get the current user ID."""
116          self._ensure_context()
117          return self._user_id or ""
118  
119      @property
120      def app_name(self) -> str:
121          """Get the application name."""
122          self._ensure_context()
123          return self._app_name or ""
124  
125      @property
126      def raw_tool_context(self) -> ToolContext:
127          """
128          Get the underlying ADK ToolContext.
129  
130          Use this escape hatch when you need access to ADK-specific features
131          not exposed by the facade.
132          """
133          return self._ctx
134  
135      @property
136      def state(self) -> Dict[str, Any]:
137          """
138          Get the tool context state dictionary.
139  
140          This provides access to shared state across tool invocations.
141          """
142          return self._ctx.state
143  
144      def get_config(self, key: str, default: Any = None) -> Any:
145          """
146          Get a value from the tool configuration.
147  
148          Args:
149              key: The configuration key to look up
150              default: Default value if key is not found
151  
152          Returns:
153              The configuration value or the default
154          """
155          return self._tool_config.get(key, default)
156  
157      async def load_artifact(
158          self,
159          filename: str,
160          version: Union[int, str] = "latest",
161          as_text: bool = False,
162      ) -> Union[bytes, str]:
163          """
164          Load artifact content from the artifact store.
165  
166          Args:
167              filename: The artifact filename to load
168              version: Version to load ("latest" or specific version number)
169              as_text: If True, decode bytes to string (UTF-8)
170  
171          Returns:
172              The artifact content as bytes, or as string if as_text=True
173  
174          Raises:
175              ValueError: If the artifact cannot be loaded
176              FileNotFoundError: If the artifact does not exist
177          """
178          self._ensure_context()
179  
180          if not self._artifact_service:
181              raise ValueError("Artifact service not available in context")
182  
183          log_id = f"[ToolContextFacade:load_artifact:{filename}]"
184  
185          result = await load_artifact_content_or_metadata(
186              artifact_service=self._artifact_service,
187              app_name=self._app_name,
188              user_id=self._user_id,
189              session_id=self._session_id,
190              filename=filename,
191              version=version,
192              return_raw_bytes=True,
193          )
194  
195          status = result.get("status")
196          if status == "not_found":
197              raise FileNotFoundError(
198                  f"Artifact '{filename}' not found: {result.get('message')}"
199              )
200          elif status != "success":
201              raise ValueError(
202                  f"Failed to load artifact '{filename}': {result.get('message')}"
203              )
204  
205          content = result.get("raw_bytes")
206          if content is None:
207              # Fallback for text content returned directly
208              content = result.get("content", b"")
209              if isinstance(content, str):
210                  content = content.encode("utf-8")
211  
212          log.debug(
213              "%s Loaded %d bytes (version: %s)",
214              log_id,
215              len(content) if content else 0,
216              result.get("version"),
217          )
218  
219          if as_text:
220              return content.decode("utf-8") if isinstance(content, bytes) else content
221  
222          return content
223  
224      async def load_artifact_metadata(
225          self,
226          filename: str,
227          version: Union[int, str] = "latest",
228      ) -> Dict[str, Any]:
229          """
230          Load artifact metadata without loading the content.
231  
232          Args:
233              filename: The artifact filename
234              version: Version to load metadata for ("latest" or specific version)
235  
236          Returns:
237              Dictionary containing artifact metadata (mime_type, size_bytes,
238              description, schema, etc.)
239  
240          Raises:
241              ValueError: If metadata cannot be loaded
242              FileNotFoundError: If the artifact does not exist
243          """
244          self._ensure_context()
245  
246          if not self._artifact_service:
247              raise ValueError("Artifact service not available in context")
248  
249          result = await load_artifact_content_or_metadata(
250              artifact_service=self._artifact_service,
251              app_name=self._app_name,
252              user_id=self._user_id,
253              session_id=self._session_id,
254              filename=filename,
255              version=version,
256              load_metadata_only=True,
257          )
258  
259          status = result.get("status")
260          if status == "not_found":
261              raise FileNotFoundError(
262                  f"Artifact '{filename}' not found: {result.get('message')}"
263              )
264          elif status != "success":
265              raise ValueError(
266                  f"Failed to load metadata for '{filename}': {result.get('message')}"
267              )
268  
269          metadata = result.get("metadata", {})
270          metadata["version"] = result.get("version")
271          return metadata
272  
273      async def list_artifacts(self) -> List[str]:
274          """
275          List all artifact filenames in the current session.
276  
277          Returns:
278              List of artifact filenames (excluding metadata files)
279  
280          Raises:
281              ValueError: If artifacts cannot be listed
282          """
283          self._ensure_context()
284  
285          if not self._artifact_service:
286              raise ValueError("Artifact service not available in context")
287  
288          try:
289              list_keys_method = getattr(self._artifact_service, "list_artifact_keys")
290              keys = await list_keys_method(
291                  app_name=self._app_name,
292                  user_id=self._user_id,
293                  session_id=self._session_id,
294              )
295  
296              # Filter out metadata files
297              return [k for k in keys if not k.endswith(".metadata.json")]
298  
299          except Exception as e:
300              log.error("[ToolContextFacade] Failed to list artifacts: %s", e)
301              raise ValueError(f"Failed to list artifacts: {e}") from e
302  
303      async def artifact_exists(self, filename: str) -> bool:
304          """
305          Check if an artifact exists in the current session.
306  
307          Args:
308              filename: The artifact filename to check
309  
310          Returns:
311              True if the artifact exists, False otherwise
312          """
313          try:
314              artifacts = await self.list_artifacts()
315              return filename in artifacts
316          except ValueError:
317              return False
318  
319      # -------------------------------------------------------------------------
320      # Status Update Methods
321      # -------------------------------------------------------------------------
322  
323      def _ensure_host_component(self) -> Optional[Any]:
324          """
325          Extract and cache the host component from the invocation context.
326  
327          The host component is needed for publishing status updates.
328          Returns None if the component cannot be accessed (e.g., in tests).
329          """
330          if self._host_component_resolved:
331              return self._host_component
332  
333          self._host_component_resolved = True
334          try:
335              inv_context = getattr(self._ctx, "_invocation_context", None)
336              if inv_context:
337                  agent = getattr(inv_context, "agent", None)
338                  if agent:
339                      self._host_component = getattr(agent, "host_component", None)
340          except Exception as e:
341              log.debug(
342                  "[ToolContextFacade] Could not get host_component: %s", e
343              )
344  
345          return self._host_component
346  
347      @property
348      def a2a_context(self) -> Optional[Dict[str, Any]]:
349          """
350          Get the A2A context for this request.
351  
352          The A2A context contains routing information needed for status updates.
353          Returns None if not available (e.g., in tests).
354          """
355          return self._ctx.state.get("a2a_context")
356  
357      def send_status(self, message: str) -> bool:
358          """
359          Send a simple text status update to the frontend.
360  
361          This is the easiest way to show progress during long-running tool operations.
362          The message will appear in the UI as a progress indicator.
363  
364          Args:
365              message: Human-readable progress message (e.g., "Analyzing data...",
366                      "Processing file 3 of 10...", "Fetching results...")
367  
368          Returns:
369              True if the update was successfully scheduled, False otherwise.
370              Returns False if the context is not available (e.g., in unit tests).
371  
372          Example:
373              async def my_tool(data: str, ctx: ToolContextFacade) -> ToolResult:
374                  ctx.send_status("Starting analysis...")
375                  # ... do work ...
376                  ctx.send_status("Almost done...")
377                  return ToolResult.ok("Complete")
378          """
379          from ...common.data_parts import AgentProgressUpdateData
380  
381          host = self._ensure_host_component()
382          a2a_ctx = self.a2a_context
383  
384          if not host or not a2a_ctx:
385              log.debug(
386                  "[ToolContextFacade] Cannot send status: missing host_component or a2a_context"
387              )
388              return False
389  
390          signal = AgentProgressUpdateData(status_text=message)
391          return host.publish_data_signal_from_thread(
392              a2a_context=a2a_ctx,
393              signal_data=signal,
394          )
395  
396      def send_signal(
397          self,
398          signal_data: "BaseModel",
399          skip_buffer_flush: bool = False,
400      ) -> bool:
401          """
402          Send a custom signal/data update to the frontend.
403  
404          Use this for specialized signals that need structured data beyond
405          a simple text message. For simple progress messages, prefer send_status().
406  
407          Args:
408              signal_data: A Pydantic model instance from solace_agent_mesh.common.data_parts.
409                          Common types include:
410                          - AgentProgressUpdateData: Simple text status
411                          - ArtifactCreationProgressData: Artifact streaming progress
412                          - DeepResearchProgressData: Structured research progress
413              skip_buffer_flush: If True, skip flushing the output buffer before
414                                sending. Usually False for immediate updates.
415  
416          Returns:
417              True if the update was successfully scheduled, False otherwise.
418              Returns False if the context is not available (e.g., in unit tests).
419  
420          Example:
421              from solace_agent_mesh.common.data_parts import DeepResearchProgressData
422  
423              async def research_tool(query: str, ctx: ToolContextFacade) -> ToolResult:
424                  ctx.send_signal(DeepResearchProgressData(
425                      phase="searching",
426                      status_text="Searching for sources...",
427                      progress_percentage=25,
428                      current_iteration=1,
429                      total_iterations=3,
430                      sources_found=5,
431                      elapsed_seconds=10,
432                  ))
433                  # ... do work ...
434                  return ToolResult.ok("Complete")
435          """
436          host = self._ensure_host_component()
437          a2a_ctx = self.a2a_context
438  
439          if not host or not a2a_ctx:
440              log.debug(
441                  "[ToolContextFacade] Cannot send signal: missing host_component or a2a_context"
442              )
443              return False
444  
445          return host.publish_data_signal_from_thread(
446              a2a_context=a2a_ctx,
447              signal_data=signal_data,
448              skip_buffer_flush=skip_buffer_flush,
449          )
450  
451      def __repr__(self) -> str:
452          self._ensure_context()
453          return (
454              f"ToolContextFacade(app={self._app_name}, "
455              f"user={self._user_id}, session={self._session_id})"
456          )