/ src / solace_agent_mesh / agent / tools / executors / unified_python_executor.py
unified_python_executor.py
  1  """
  2  Python tool loader for handling all Python tool patterns.
  3  
  4  This loader provides a unified interface for loading Python tools:
  5  - Simple functions (module + function_name)
  6  - DynamicTool classes (module + class_name)
  7  - DynamicToolProvider classes (auto-discovery or explicit)
  8  
  9  It consolidates the various Python tool loading patterns into one place.
 10  """
 11  
 12  import importlib
 13  import inspect
 14  import logging
 15  from typing import Any, Callable, Dict, List, Optional, Type, Union, TYPE_CHECKING
 16  
 17  from google.adk.tools import BaseTool, ToolContext
 18  from pydantic import BaseModel
 19  
 20  from .base import ToolExecutor, ToolExecutionResult
 21  from ..tool_result import ToolResult
 22  from .executor_tool import ExecutorBasedTool
 23  from ..dynamic_tool import (
 24      DynamicTool,
 25      DynamicToolProvider,
 26      _get_schema_from_signature,
 27      _SchemaDetectionResult,
 28  )
 29  from ..artifact_types import ArtifactTypeInfo
 30  
 31  if TYPE_CHECKING:
 32      from ...sac.component import SamAgentComponent
 33  
 34  log = logging.getLogger(__name__)
 35  
 36  
 37  def _is_subclass_by_name(cls: type, base_name: str) -> bool:
 38      """Check if cls is a subclass of a class with the given name."""
 39      for base in cls.__mro__:
 40          if base.__name__ == base_name:
 41              return True
 42      return False
 43  
 44  
 45  def _find_dynamic_tool_class(module: Any) -> Optional[Type[DynamicTool]]:
 46      """Find a DynamicTool subclass in a module (excluding providers)."""
 47      candidates = []
 48      for name in dir(module):
 49          obj = getattr(module, name)
 50          if (
 51              isinstance(obj, type)
 52              and _is_subclass_by_name(obj, "DynamicTool")
 53              and not _is_subclass_by_name(obj, "DynamicToolProvider")
 54              and obj.__module__ == module.__name__
 55          ):
 56              candidates.append(obj)
 57  
 58      if len(candidates) == 1:
 59          return candidates[0]
 60      if len(candidates) > 1:
 61          raise TypeError(
 62              f"Module '{module.__name__}' contains multiple DynamicTool subclasses: "
 63              f"{[c.__name__ for c in candidates]}. Specify 'class_name' explicitly."
 64          )
 65      return None
 66  
 67  
 68  def _find_dynamic_tool_provider_class(module: Any) -> Optional[Type[DynamicToolProvider]]:
 69      """Find a DynamicToolProvider subclass in a module."""
 70      candidates = []
 71      for name in dir(module):
 72          obj = getattr(module, name)
 73          if (
 74              isinstance(obj, type)
 75              and _is_subclass_by_name(obj, "DynamicToolProvider")
 76              and obj.__module__ == module.__name__
 77          ):
 78              candidates.append(obj)
 79  
 80      if len(candidates) == 1:
 81          return candidates[0]
 82      if len(candidates) > 1:
 83          raise TypeError(
 84              f"Module '{module.__name__}' contains multiple DynamicToolProvider subclasses: "
 85              f"{[c.__name__ for c in candidates]}. Specify 'class_name' explicitly."
 86          )
 87      return None
 88  
 89  
 90  class _FunctionExecutor(ToolExecutor):
 91      """
 92      Internal executor for simple function execution.
 93  
 94      Created from an already-loaded function, used internally by
 95      PythonToolLoader to wrap simple functions as ExecutorBasedTools.
 96      """
 97  
 98      def __init__(
 99          self,
100          func: Callable,
101          pass_tool_context: bool = True,
102          pass_tool_config: bool = True,
103      ):
104          self._func = func
105          self._pass_tool_context = pass_tool_context
106          self._pass_tool_config = pass_tool_config
107          self._is_async = inspect.iscoroutinefunction(func)
108  
109          # Check signature for accepted parameters
110          try:
111              sig = inspect.signature(func)
112              self._accepts_tool_context = "tool_context" in sig.parameters
113              self._accepts_tool_config = "tool_config" in sig.parameters
114          except (ValueError, TypeError):
115              self._accepts_tool_context = False
116              self._accepts_tool_config = False
117  
118      @property
119      def executor_type(self) -> str:
120          return "function"
121  
122      async def initialize(
123          self,
124          component: "SamAgentComponent",
125          executor_config: Dict[str, Any],
126      ) -> None:
127          """Nothing to initialize - function already loaded."""
128          pass
129  
130      async def execute(
131          self,
132          args: Dict[str, Any],
133          tool_context: ToolContext,
134          tool_config: Dict[str, Any],
135      ) -> ToolExecutionResult:
136          """Execute the function."""
137          import asyncio
138          import functools
139  
140          kwargs = dict(args)
141  
142          if self._pass_tool_context and self._accepts_tool_context:
143              kwargs["tool_context"] = tool_context
144          if self._pass_tool_config and self._accepts_tool_config:
145              kwargs["tool_config"] = tool_config
146  
147          try:
148              if self._is_async:
149                  result = await self._func(**kwargs)
150              else:
151                  loop = asyncio.get_running_loop()
152                  result = await loop.run_in_executor(
153                      None, functools.partial(self._func, **kwargs)
154                  )
155  
156              if isinstance(result, ToolExecutionResult):
157                  return result
158              # Pass ToolResult through directly so ExecutorBasedTool._run_async_impl
159              # can forward it to the ToolResultProcessor for proper handling
160              if isinstance(result, ToolResult):
161                  return result
162              elif isinstance(result, dict) and result.get("status") == "error":
163                  return ToolExecutionResult.fail(
164                      error=result.get("message", "Unknown error"),
165                      error_code=result.get("error_code"),
166                  )
167              return ToolExecutionResult.ok(data=result)
168  
169          except Exception as e:
170              log.exception("Function execution failed: %s", e)
171              return ToolExecutionResult.fail(
172                  error=f"Execution failed: {str(e)}",
173                  error_code="EXECUTION_ERROR",
174              )
175  
176      async def cleanup(
177          self,
178          component: "SamAgentComponent",
179          executor_config: Dict[str, Any],
180      ) -> None:
181          """Nothing to clean up."""
182          pass
183  
184  
185  class PythonToolLoader:
186      """
187      Loader that handles all Python tool loading patterns.
188  
189      This is a factory/loader — it imports modules, discovers classes, and
190      returns BaseTool instances via get_loaded_tools(). It supports:
191  
192      1. Simple functions: module + function_name
193         - Creates ExecutorBasedTool with internal function executor
194         - Auto-detects parameter schema from function signature
195         - Auto-detects Artifact and ToolContextFacade parameters
196  
197      2. DynamicTool classes: module + class_name
198         - Instantiates the DynamicTool class directly
199         - Returns the DynamicTool (no additional wrapping needed)
200  
201      3. DynamicToolProvider classes: module (auto-discover) or module + class_name
202         - Instantiates the provider
203         - Calls get_all_tools_for_framework() to get all tools
204         - Returns list of DynamicTool instances
205  
206      Configuration:
207          module: Python module path (required)
208          function_name: Function to load (for simple function pattern)
209          class_name: Class to load (for explicit class loading)
210          tool_config: Configuration to pass to tools
211          tool_name: Override the tool name (for functions)
212          tool_description: Override the description (for functions)
213          raw_string_args: List of args that should not have embeds resolved
214          pass_tool_context: Whether to inject tool_context (default: True)
215          pass_tool_config: Whether to inject tool_config (default: True)
216      """
217  
218      def __init__(
219          self,
220          module: str,
221          function_name: Optional[str] = None,
222          class_name: Optional[str] = None,
223          tool_config: Optional[Dict[str, Any]] = None,
224          tool_name: Optional[str] = None,
225          tool_description: Optional[str] = None,
226          raw_string_args: Optional[List[str]] = None,
227          pass_tool_context: bool = True,
228          pass_tool_config: bool = True,
229          base_path: Optional[str] = None,
230      ):
231          self._module_path = module
232          self._function_name = function_name
233          self._class_name = class_name
234          self._tool_config = tool_config
235          self._tool_name = tool_name
236          self._tool_description = tool_description
237          self._raw_string_args = raw_string_args or []
238          self._pass_tool_context = pass_tool_context
239          self._pass_tool_config = pass_tool_config
240          self._base_path = base_path
241  
242          # Loaded state
243          self._module: Any = None
244          self._loaded_tools: List[BaseTool] = []
245  
246      async def initialize(
247          self,
248          component: "SamAgentComponent",
249          executor_config: Dict[str, Any],
250      ) -> None:
251          """Load the module and create tools."""
252          log_id = f"[PythonToolLoader:{self._module_path}]"
253  
254          # Import module
255          try:
256              if self._base_path:
257                  import sys
258                  if self._base_path not in sys.path:
259                      sys.path.insert(0, self._base_path)
260              self._module = importlib.import_module(self._module_path)
261              log.debug("%s Imported module", log_id)
262          except Exception as e:
263              log.error("%s Failed to import module: %s", log_id, e)
264              raise
265  
266          # Load tools based on configuration pattern
267          if self._function_name:
268              # Pattern 1: Simple function
269              tools = self._load_function_tool()
270          else:
271              # Pattern 2/3: Class-based (DynamicTool or DynamicToolProvider)
272              tools = self._load_class_based_tools(component)
273  
274          self._loaded_tools = tools
275          log.info(
276              "%s Loaded %d tool(s): %s",
277              log_id,
278              len(tools),
279              [getattr(t, "name", getattr(t, "__name__", "unknown")) for t in tools],
280          )
281  
282      def _load_function_tool(self) -> List[BaseTool]:
283          """Load a simple function as an ExecutorBasedTool."""
284          log_id = f"[PythonToolLoader:{self._module_path}.{self._function_name}]"
285  
286          func = getattr(self._module, self._function_name, None)
287          if func is None:
288              raise AttributeError(
289                  f"Module '{self._module_path}' has no attribute '{self._function_name}'"
290              )
291          if not callable(func):
292              raise TypeError(
293                  f"'{self._function_name}' in module '{self._module_path}' is not callable"
294              )
295  
296          # Detect schema from function signature
297          detection_result = _SchemaDetectionResult()
298          schema = _get_schema_from_signature(
299              func, detection_result=detection_result
300          )
301  
302          # Build artifact params
303          artifact_params: Dict[str, ArtifactTypeInfo] = dict(detection_result.artifact_params)
304  
305          # Determine tool name and description
306          tool_name = self._tool_name or self._function_name
307          tool_description = self._tool_description or (func.__doc__ or f"Execute {tool_name}")
308  
309          # Create internal executor for this function
310          func_executor = _FunctionExecutor(
311              func,
312              pass_tool_context=self._pass_tool_context,
313              pass_tool_config=self._pass_tool_config,
314          )
315  
316          # Create ExecutorBasedTool
317          tool = ExecutorBasedTool(
318              name=tool_name,
319              description=tool_description,
320              parameters_schema=schema,
321              executor=func_executor,
322              tool_config=self._tool_config,
323              artifact_params=artifact_params,
324              raw_string_args=self._raw_string_args,
325              ctx_facade_param_name=detection_result.ctx_facade_param_name,
326          )
327  
328          log.debug(
329              "%s Created ExecutorBasedTool (artifacts=%s, ctx_facade=%s)",
330              log_id,
331              list(artifact_params.keys()),
332              detection_result.ctx_facade_param_name,
333          )
334  
335          return [tool]
336  
337      def _load_class_based_tools(
338          self, component: "SamAgentComponent"
339      ) -> List[DynamicTool]:
340          """Load DynamicTool or DynamicToolProvider class."""
341          log_id = f"[PythonToolLoader:{self._module_path}]"
342  
343          # Determine the class to load
344          tool_class = None
345          if self._class_name:
346              tool_class = getattr(self._module, self._class_name, None)
347              if tool_class is None:
348                  raise AttributeError(
349                      f"Module '{self._module_path}' has no class '{self._class_name}'"
350                  )
351          else:
352              # Auto-discover: try provider first, then single tool
353              tool_class = _find_dynamic_tool_provider_class(self._module)
354              if not tool_class:
355                  tool_class = _find_dynamic_tool_class(self._module)
356  
357          if not tool_class:
358              raise TypeError(
359                  f"Module '{self._module_path}' has no DynamicTool or DynamicToolProvider "
360                  f"to auto-discover. Specify 'function_name' or 'class_name'."
361              )
362  
363          # Validate and get config
364          validated_config = self._validate_tool_config(tool_class, component)
365  
366          # Instantiate based on type
367          if _is_subclass_by_name(tool_class, "DynamicToolProvider"):
368              provider_instance = tool_class()
369              tools = provider_instance.get_all_tools_for_framework(
370                  tool_config=validated_config
371              )
372              log.info(
373                  "%s Loaded %d tools from DynamicToolProvider '%s'",
374                  log_id,
375                  len(tools),
376                  tool_class.__name__,
377              )
378          elif _is_subclass_by_name(tool_class, "DynamicTool"):
379              tool_instance = tool_class(tool_config=validated_config)
380              tools = [tool_instance]
381              log.info(
382                  "%s Loaded DynamicTool '%s'",
383                  log_id,
384                  tool_class.__name__,
385              )
386          else:
387              raise TypeError(
388                  f"Class '{tool_class.__name__}' is not a DynamicTool or DynamicToolProvider"
389              )
390  
391          return tools
392  
393      def _validate_tool_config(
394          self, tool_class: type, component: "SamAgentComponent"
395      ) -> Union[dict, BaseModel, None]:
396          """Validate tool_config against class's config_model if defined."""
397          from pydantic import ValidationError
398  
399          config_model: Optional[Type[BaseModel]] = getattr(
400              tool_class, "config_model", None
401          )
402  
403          if not config_model:
404              return self._tool_config
405  
406          log.debug(
407              "Validating tool_config against %s.config_model (%s)",
408              tool_class.__name__,
409              config_model.__name__,
410          )
411  
412          try:
413              return config_model.model_validate(self._tool_config or {})
414          except ValidationError as e:
415              error_msg = (
416                  f"Configuration error for tool '{tool_class.__name__}'. "
417                  f"The provided 'tool_config' is invalid:\n{e}"
418              )
419              log.error("%s %s", component.log_identifier, error_msg)
420              raise ValueError(error_msg) from e
421  
422      def get_loaded_tools(self) -> List[BaseTool]:
423          """
424          Get the tools that were loaded during initialization.
425  
426          This is the primary interface for retrieving loaded tools.
427          Call this after initialize() has completed.
428          """
429          return self._loaded_tools
430  
431      async def cleanup(
432          self,
433          component: "SamAgentComponent",
434          executor_config: Dict[str, Any],
435      ) -> None:
436          """Clean up loaded tools."""
437          for tool in self._loaded_tools:
438              if hasattr(tool, "cleanup") and callable(tool.cleanup):
439                  try:
440                      await tool.cleanup(component, executor_config)
441                  except Exception as e:
442                      log.warning(
443                          "Error cleaning up tool %s: %s",
444                          getattr(tool, "name", "unknown"),
445                          e,
446                      )
447          self._loaded_tools = []
448          self._module = None