tool_context_facade.py
1 """ 2 Provides a clean, simplified interface for tool authors to access context and artifacts. 3 4 The ToolContextFacade hides all the boilerplate of extracting session info, 5 accessing the artifact service, managing context, and sending status updates 6 from the raw ADK ToolContext. 7 8 Example usage: 9 from solace_agent_mesh.agent.utils import ToolContextFacade 10 from solace_agent_mesh.agent.tools import ToolResult, DataObject 11 12 async def my_tool(filename: str, ctx: ToolContextFacade) -> ToolResult: 13 # Send status updates - no boilerplate! 14 ctx.send_status("Loading artifact...") 15 16 # Load artifact - no boilerplate! 17 data = await ctx.load_artifact(filename, as_text=True) 18 19 # Access context properties 20 print(f"User: {ctx.user_id}, Session: {ctx.session_id}") 21 22 # Send progress update 23 ctx.send_status("Processing data...") 24 25 # Process and return 26 result = process(data) 27 return ToolResult.ok( 28 "Done", 29 data_objects=[DataObject(name="output.json", content=result)] 30 ) 31 """ 32 33 import logging 34 from typing import Any, Dict, List, Optional, Union, TYPE_CHECKING 35 36 from google.adk.tools import ToolContext 37 38 from .artifact_helpers import load_artifact_content_or_metadata 39 from .context_helpers import get_original_session_id 40 41 if TYPE_CHECKING: 42 from google.adk.artifacts import BaseArtifactService 43 from pydantic import BaseModel 44 45 log = logging.getLogger(__name__) 46 47 48 class ToolContextFacade: 49 """ 50 A simplified interface for tool authors to access context, artifacts, and status updates. 51 52 This facade provides: 53 - Easy access to session/user/app context 54 - Simplified artifact loading (content and metadata) 55 - Artifact listing 56 - Tool configuration access 57 - Status update methods for sending progress to the frontend 58 59 Note: This facade is intentionally read-only for artifact operations. 60 All artifact saving should be done through the ToolResult/DataObject pattern, 61 which provides a single, clear path for artifact creation. 62 """ 63 64 def __init__( 65 self, 66 tool_context: ToolContext, 67 tool_config: Optional[Dict[str, Any]] = None, 68 ): 69 """ 70 Initialize the facade. 71 72 Args: 73 tool_context: The raw ADK ToolContext 74 tool_config: Optional tool-specific configuration dict 75 """ 76 self._ctx = tool_context 77 self._tool_config = tool_config or {} 78 79 # Cache context values for efficiency 80 self._session_id: Optional[str] = None 81 self._user_id: Optional[str] = None 82 self._app_name: Optional[str] = None 83 self._artifact_service: Optional["BaseArtifactService"] = None 84 self._host_component: Optional[Any] = None 85 self._host_component_resolved: bool = False 86 87 def _ensure_context(self) -> None: 88 """Extract and cache context values from the invocation context.""" 89 if self._session_id is not None: 90 return # Already cached 91 92 try: 93 inv_context = self._ctx._invocation_context 94 self._artifact_service = inv_context.artifact_service 95 self._app_name = inv_context.app_name 96 self._user_id = inv_context.user_id 97 self._session_id = get_original_session_id(inv_context) 98 except AttributeError as e: 99 log.warning( 100 "[ToolContextFacade] Could not extract context: %s", e 101 ) 102 # Set defaults to avoid repeated attempts 103 self._session_id = "" 104 self._user_id = "" 105 self._app_name = "" 106 107 @property 108 def session_id(self) -> str: 109 """Get the current session ID.""" 110 self._ensure_context() 111 return self._session_id or "" 112 113 @property 114 def user_id(self) -> str: 115 """Get the current user ID.""" 116 self._ensure_context() 117 return self._user_id or "" 118 119 @property 120 def app_name(self) -> str: 121 """Get the application name.""" 122 self._ensure_context() 123 return self._app_name or "" 124 125 @property 126 def raw_tool_context(self) -> ToolContext: 127 """ 128 Get the underlying ADK ToolContext. 129 130 Use this escape hatch when you need access to ADK-specific features 131 not exposed by the facade. 132 """ 133 return self._ctx 134 135 @property 136 def state(self) -> Dict[str, Any]: 137 """ 138 Get the tool context state dictionary. 139 140 This provides access to shared state across tool invocations. 141 """ 142 return self._ctx.state 143 144 def get_config(self, key: str, default: Any = None) -> Any: 145 """ 146 Get a value from the tool configuration. 147 148 Args: 149 key: The configuration key to look up 150 default: Default value if key is not found 151 152 Returns: 153 The configuration value or the default 154 """ 155 return self._tool_config.get(key, default) 156 157 async def load_artifact( 158 self, 159 filename: str, 160 version: Union[int, str] = "latest", 161 as_text: bool = False, 162 ) -> Union[bytes, str]: 163 """ 164 Load artifact content from the artifact store. 165 166 Args: 167 filename: The artifact filename to load 168 version: Version to load ("latest" or specific version number) 169 as_text: If True, decode bytes to string (UTF-8) 170 171 Returns: 172 The artifact content as bytes, or as string if as_text=True 173 174 Raises: 175 ValueError: If the artifact cannot be loaded 176 FileNotFoundError: If the artifact does not exist 177 """ 178 self._ensure_context() 179 180 if not self._artifact_service: 181 raise ValueError("Artifact service not available in context") 182 183 log_id = f"[ToolContextFacade:load_artifact:{filename}]" 184 185 result = await load_artifact_content_or_metadata( 186 artifact_service=self._artifact_service, 187 app_name=self._app_name, 188 user_id=self._user_id, 189 session_id=self._session_id, 190 filename=filename, 191 version=version, 192 return_raw_bytes=True, 193 ) 194 195 status = result.get("status") 196 if status == "not_found": 197 raise FileNotFoundError( 198 f"Artifact '{filename}' not found: {result.get('message')}" 199 ) 200 elif status != "success": 201 raise ValueError( 202 f"Failed to load artifact '{filename}': {result.get('message')}" 203 ) 204 205 content = result.get("raw_bytes") 206 if content is None: 207 # Fallback for text content returned directly 208 content = result.get("content", b"") 209 if isinstance(content, str): 210 content = content.encode("utf-8") 211 212 log.debug( 213 "%s Loaded %d bytes (version: %s)", 214 log_id, 215 len(content) if content else 0, 216 result.get("version"), 217 ) 218 219 if as_text: 220 return content.decode("utf-8") if isinstance(content, bytes) else content 221 222 return content 223 224 async def load_artifact_metadata( 225 self, 226 filename: str, 227 version: Union[int, str] = "latest", 228 ) -> Dict[str, Any]: 229 """ 230 Load artifact metadata without loading the content. 231 232 Args: 233 filename: The artifact filename 234 version: Version to load metadata for ("latest" or specific version) 235 236 Returns: 237 Dictionary containing artifact metadata (mime_type, size_bytes, 238 description, schema, etc.) 239 240 Raises: 241 ValueError: If metadata cannot be loaded 242 FileNotFoundError: If the artifact does not exist 243 """ 244 self._ensure_context() 245 246 if not self._artifact_service: 247 raise ValueError("Artifact service not available in context") 248 249 result = await load_artifact_content_or_metadata( 250 artifact_service=self._artifact_service, 251 app_name=self._app_name, 252 user_id=self._user_id, 253 session_id=self._session_id, 254 filename=filename, 255 version=version, 256 load_metadata_only=True, 257 ) 258 259 status = result.get("status") 260 if status == "not_found": 261 raise FileNotFoundError( 262 f"Artifact '{filename}' not found: {result.get('message')}" 263 ) 264 elif status != "success": 265 raise ValueError( 266 f"Failed to load metadata for '{filename}': {result.get('message')}" 267 ) 268 269 metadata = result.get("metadata", {}) 270 metadata["version"] = result.get("version") 271 return metadata 272 273 async def list_artifacts(self) -> List[str]: 274 """ 275 List all artifact filenames in the current session. 276 277 Returns: 278 List of artifact filenames (excluding metadata files) 279 280 Raises: 281 ValueError: If artifacts cannot be listed 282 """ 283 self._ensure_context() 284 285 if not self._artifact_service: 286 raise ValueError("Artifact service not available in context") 287 288 try: 289 list_keys_method = getattr(self._artifact_service, "list_artifact_keys") 290 keys = await list_keys_method( 291 app_name=self._app_name, 292 user_id=self._user_id, 293 session_id=self._session_id, 294 ) 295 296 # Filter out metadata files 297 return [k for k in keys if not k.endswith(".metadata.json")] 298 299 except Exception as e: 300 log.error("[ToolContextFacade] Failed to list artifacts: %s", e) 301 raise ValueError(f"Failed to list artifacts: {e}") from e 302 303 async def artifact_exists(self, filename: str) -> bool: 304 """ 305 Check if an artifact exists in the current session. 306 307 Args: 308 filename: The artifact filename to check 309 310 Returns: 311 True if the artifact exists, False otherwise 312 """ 313 try: 314 artifacts = await self.list_artifacts() 315 return filename in artifacts 316 except ValueError: 317 return False 318 319 # ------------------------------------------------------------------------- 320 # Status Update Methods 321 # ------------------------------------------------------------------------- 322 323 def _ensure_host_component(self) -> Optional[Any]: 324 """ 325 Extract and cache the host component from the invocation context. 326 327 The host component is needed for publishing status updates. 328 Returns None if the component cannot be accessed (e.g., in tests). 329 """ 330 if self._host_component_resolved: 331 return self._host_component 332 333 self._host_component_resolved = True 334 try: 335 inv_context = getattr(self._ctx, "_invocation_context", None) 336 if inv_context: 337 agent = getattr(inv_context, "agent", None) 338 if agent: 339 self._host_component = getattr(agent, "host_component", None) 340 except Exception as e: 341 log.debug( 342 "[ToolContextFacade] Could not get host_component: %s", e 343 ) 344 345 return self._host_component 346 347 @property 348 def a2a_context(self) -> Optional[Dict[str, Any]]: 349 """ 350 Get the A2A context for this request. 351 352 The A2A context contains routing information needed for status updates. 353 Returns None if not available (e.g., in tests). 354 """ 355 return self._ctx.state.get("a2a_context") 356 357 def send_status(self, message: str) -> bool: 358 """ 359 Send a simple text status update to the frontend. 360 361 This is the easiest way to show progress during long-running tool operations. 362 The message will appear in the UI as a progress indicator. 363 364 Args: 365 message: Human-readable progress message (e.g., "Analyzing data...", 366 "Processing file 3 of 10...", "Fetching results...") 367 368 Returns: 369 True if the update was successfully scheduled, False otherwise. 370 Returns False if the context is not available (e.g., in unit tests). 371 372 Example: 373 async def my_tool(data: str, ctx: ToolContextFacade) -> ToolResult: 374 ctx.send_status("Starting analysis...") 375 # ... do work ... 376 ctx.send_status("Almost done...") 377 return ToolResult.ok("Complete") 378 """ 379 from ...common.data_parts import AgentProgressUpdateData 380 381 host = self._ensure_host_component() 382 a2a_ctx = self.a2a_context 383 384 if not host or not a2a_ctx: 385 log.debug( 386 "[ToolContextFacade] Cannot send status: missing host_component or a2a_context" 387 ) 388 return False 389 390 signal = AgentProgressUpdateData(status_text=message) 391 return host.publish_data_signal_from_thread( 392 a2a_context=a2a_ctx, 393 signal_data=signal, 394 ) 395 396 def send_signal( 397 self, 398 signal_data: "BaseModel", 399 skip_buffer_flush: bool = False, 400 ) -> bool: 401 """ 402 Send a custom signal/data update to the frontend. 403 404 Use this for specialized signals that need structured data beyond 405 a simple text message. For simple progress messages, prefer send_status(). 406 407 Args: 408 signal_data: A Pydantic model instance from solace_agent_mesh.common.data_parts. 409 Common types include: 410 - AgentProgressUpdateData: Simple text status 411 - ArtifactCreationProgressData: Artifact streaming progress 412 - DeepResearchProgressData: Structured research progress 413 skip_buffer_flush: If True, skip flushing the output buffer before 414 sending. Usually False for immediate updates. 415 416 Returns: 417 True if the update was successfully scheduled, False otherwise. 418 Returns False if the context is not available (e.g., in unit tests). 419 420 Example: 421 from solace_agent_mesh.common.data_parts import DeepResearchProgressData 422 423 async def research_tool(query: str, ctx: ToolContextFacade) -> ToolResult: 424 ctx.send_signal(DeepResearchProgressData( 425 phase="searching", 426 status_text="Searching for sources...", 427 progress_percentage=25, 428 current_iteration=1, 429 total_iterations=3, 430 sources_found=5, 431 elapsed_seconds=10, 432 )) 433 # ... do work ... 434 return ToolResult.ok("Complete") 435 """ 436 host = self._ensure_host_component() 437 a2a_ctx = self.a2a_context 438 439 if not host or not a2a_ctx: 440 log.debug( 441 "[ToolContextFacade] Cannot send signal: missing host_component or a2a_context" 442 ) 443 return False 444 445 return host.publish_data_signal_from_thread( 446 a2a_context=a2a_ctx, 447 signal_data=signal_data, 448 skip_buffer_flush=skip_buffer_flush, 449 ) 450 451 def __repr__(self) -> str: 452 self._ensure_context() 453 return ( 454 f"ToolContextFacade(app={self._app_name}, " 455 f"user={self._user_id}, session={self._session_id})" 456 )