/ src / solace_agent_mesh / common / utils / embeds / evaluators.py
evaluators.py
  1  """
  2  Contains individual evaluator functions for different embed types
  3  and the mapping dictionary.
  4  """
  5  
  6  import logging
  7  import json
  8  from datetime import datetime
  9  import uuid
 10  from typing import Any, Callable, Dict, Optional, Tuple
 11  from asteval import Interpreter
 12  import math, random
 13  
 14  from ....agent.utils.artifact_helpers import format_metadata_for_llm
 15  from .constants import EMBED_CHAIN_DELIMITER
 16  
 17  log = logging.getLogger(__name__)
 18  
 19  MATH_SAFE_SYMBOLS = {
 20      # Basic math operations
 21      "abs": abs,
 22      "round": round,
 23      "min": min,
 24      "max": max,
 25      # Math module functions
 26      "sin": math.sin,
 27      "cos": math.cos,
 28      "tan": math.tan,
 29      "sqrt": math.sqrt,
 30      "pow": math.pow,
 31      "exp": math.exp,
 32      "log": math.log,
 33      "log10": math.log10,
 34      "pi": math.pi,
 35      "e": math.e,
 36      "inf": math.inf,
 37      "ceil": math.ceil,
 38      "floor": math.floor,
 39      # Trigonometric helpers
 40      "radians": math.radians,
 41      # Combinatorics/Statistical
 42      "factorial": math.factorial,
 43      "sum": sum,
 44      # Hyperbolic functions
 45      "sinh": math.sinh,
 46      "cosh": math.cosh,
 47      "tanh": math.tanh,
 48      # Random functions
 49      "random": random.random,
 50      "randint": random.randint,
 51      "uniform": random.uniform,
 52  }
 53  
 54  
 55  def _evaluate_math_embed(
 56      expression: str,
 57      context: Any,
 58      log_identifier: str,
 59      format_spec: Optional[str] = None,
 60  ) -> Tuple[str, Optional[str], int]:
 61      """
 62      Evaluates a 'math' embed using asteval.
 63      Applies format_spec if the result is numeric and format_spec is provided.
 64      Returns (string_value, error_message, size_of_string_value).
 65      """
 66      try:
 67          user_symtable = MATH_SAFE_SYMBOLS.copy()
 68  
 69          local_interpreter = Interpreter(symtable=user_symtable)
 70          result = local_interpreter.eval(expression.strip())
 71  
 72          if local_interpreter.error:
 73              error_messages = [err.msg for err in local_interpreter.error]
 74              error_msg_str = "; ".join(error_messages)
 75              err_str = f"[Error: Math evaluation error: {error_msg_str}]"
 76              return err_str, error_msg_str, len(err_str.encode("utf-8"))
 77  
 78          str_value: str
 79          if isinstance(result, (int, float)) and format_spec:
 80              try:
 81                  str_value = format(result, format_spec)
 82              except ValueError as fmt_err:
 83                  log.warning(
 84                      "%s Invalid format_spec '%s' for math result %s: %s. Falling back to str().",
 85                      log_identifier,
 86                      format_spec,
 87                      result,
 88                      fmt_err,
 89                  )
 90                  str_value = str(result)
 91          else:
 92              str_value = str(result)
 93  
 94          return str_value, None, len(str_value.encode("utf-8"))
 95      except ImportError:
 96          err_msg = "Math evaluation skipped: 'asteval' not installed"
 97          log.warning("%s %s", log_identifier, err_msg)
 98          err_str = f"[Error: {err_msg}]"
 99          return err_str, err_msg, len(err_str.encode("utf-8"))
100      except Exception as e:
101          err_msg = f"Math evaluation error: {e}"
102          err_str = f"[Error: {err_msg}]"
103          return err_str, err_msg, len(err_str.encode("utf-8"))
104  
105  
106  def _evaluate_datetime_embed(
107      expression: str,
108      context: Any,
109      log_identifier: str,
110      format_spec: Optional[str] = None,
111  ) -> Tuple[str, Optional[str], int]:
112      """
113      Evaluates a 'datetime' embed. Ignores format_spec from '|' syntax.
114      Returns (string_value, error_message, size_of_string_value).
115      """
116      format_str = expression.strip()
117      now = datetime.now()
118      try:
119          value = None
120          if not format_str or format_str.lower() in ["now", "iso"]:
121              value = now.isoformat()
122          elif format_str.lower() == "timestamp":
123              value = str(now.timestamp())
124          elif format_str.lower() == "date":
125              value = now.strftime("%Y-%m-%d")
126          elif format_str.lower() == "time":
127              value = now.strftime("%H:%M:%S")
128          else:
129              value = now.strftime(format_str)
130          return value, None, len(value.encode("utf-8"))
131      except Exception as e:
132          err_msg = f"Datetime formatting error: {e}"
133          err_str = f"[Error: {err_msg}]"
134          return err_str, err_msg, len(err_str.encode("utf-8"))
135  
136  
137  def _evaluate_uuid_embed(
138      expression: str,
139      context: Any,
140      log_identifier: str,
141      format_spec: Optional[str] = None,
142  ) -> Tuple[str, Optional[str], int]:
143      """
144      Evaluates a 'uuid' embed. Ignores format_spec.
145      Returns (string_value, error_message, size_of_string_value).
146      """
147      value = str(uuid.uuid4())
148      return value, None, len(value.encode("utf-8"))
149  
150  
151  async def _evaluate_artifact_meta_embed(
152      expression: str,
153      context: Dict[str, Any],
154      log_identifier: str,
155      format_spec: Optional[str] = None,
156  ) -> Tuple[str, Optional[str], int]:
157      """
158      Evaluates an 'artifact_meta' embed (early stage). Ignores format_spec.
159      Context is expected to be a dictionary containing 'artifact_service'
160      and 'session_context' (which itself is a dict with app_name, user_id, session_id).
161      Returns (string_value, error_message, size_of_string_value).
162      """
163      if not isinstance(context, dict):
164          err_msg = "Invalid context type for artifact_meta, expected dict."
165          return f"[Error: {err_msg}]", err_msg, 0
166  
167      artifact_service = context.get("artifact_service")
168      session_ctx_dict = context.get("session_context")
169  
170      if not artifact_service:
171          err_msg = "ArtifactService not available in context for artifact_meta"
172          return f"[Error: {err_msg}]", err_msg, 0
173      if not session_ctx_dict or not isinstance(session_ctx_dict, dict):
174          err_msg = (
175              "Session context dictionary not available in context for artifact_meta"
176          )
177          return f"[Error: {err_msg}]", err_msg, 0
178  
179      app_name = session_ctx_dict.get("app_name")
180      user_id = session_ctx_dict.get("user_id")
181      session_id = session_ctx_dict.get("session_id")
182  
183      if not all([app_name, user_id, session_id]):
184          err_msg = "Missing app_name, user_id, or session_id in session_context for artifact_meta"
185          return f"[Error: {err_msg}]", err_msg, 0
186  
187      parts = expression.strip().split(":", 1)
188      filename = parts[0]
189      version_str = parts[1] if len(parts) > 1 else None
190      version = None
191  
192      if not filename:
193          err_msg = "Filename missing for artifact_meta"
194          return f"[Error: {err_msg}]", err_msg, 0
195  
196      version_to_load: Optional[int] = None
197      try:
198          if version_str:
199              version_to_load = int(version_str)
200          else:
201              versions = await artifact_service.list_versions(
202                  app_name=app_name,
203                  user_id=user_id,
204                  session_id=session_id,
205                  filename=filename,
206              )
207              if not versions:
208                  err_msg = f"Artifact '{filename}' not found (no versions available)"
209                  return f"[Error: {err_msg}]", err_msg, 0
210              version_to_load = max(versions)
211  
212          if version_to_load is None:
213              err_msg = f"Could not determine version for artifact_meta '{filename}'"
214              return f"[Error: {err_msg}]", err_msg, 0
215  
216          data_artifact_part = await artifact_service.load_artifact(
217              app_name=app_name,
218              user_id=user_id,
219              session_id=session_id,
220              filename=filename,
221              version=version_to_load,
222          )
223  
224          if not data_artifact_part or not data_artifact_part.inline_data:
225              err_msg = (
226                  f"Data artifact '{filename}' v{version_to_load} not found or empty"
227              )
228              return f"[Error: {err_msg}]", err_msg, 0
229  
230          data_mime_type = (
231              data_artifact_part.inline_data.mime_type or "application/octet-stream"
232          )
233          data_size = len(data_artifact_part.inline_data.data)
234  
235          custom_metadata_from_file = {}
236          metadata_filename = f"{filename}.metadata.json"
237          try:
238              companion_metadata_part = await artifact_service.load_artifact(
239                  app_name=app_name,
240                  user_id=user_id,
241                  session_id=session_id,
242                  filename=metadata_filename,
243                  version=version_to_load,
244              )
245              if companion_metadata_part and companion_metadata_part.inline_data:
246                  try:
247                      custom_metadata_from_file = json.loads(
248                          companion_metadata_part.inline_data.data.decode("utf-8")
249                      )
250                  except json.JSONDecodeError as json_err:
251                      log.warning(
252                          f"{log_identifier} Failed to parse companion metadata JSON for '{metadata_filename}' v{version_to_load}: {json_err}"
253                      )
254          except Exception as e_meta_load:
255              log.debug(
256                  f"{log_identifier} Could not load companion metadata file '{metadata_filename}' v{version_to_load} (this is often normal): {e_meta_load}"
257              )
258  
259          full_metadata_dict = {
260              "filename": filename,
261              "version": version_to_load,
262              "mime_type": data_mime_type,
263              "size": data_size,
264              **custom_metadata_from_file,
265          }
266  
267          formatted_text = format_metadata_for_llm(full_metadata_dict)
268          return formatted_text, None, len(formatted_text.encode("utf-8"))
269  
270      except ValueError as ve:
271          err_msg = (
272              f"Invalid version specified for artifact_meta: '{version_str}'. Error: {ve}"
273          )
274          return f"[Error: {err_msg}]", err_msg, 0
275      except Exception as e:
276          err_msg = f"Error evaluating artifact_meta for '{filename}' v{version_str or 'latest'}: {e}"
277          log.error(f"{log_identifier} {err_msg}", exc_info=True)
278          return f"[Error: {err_msg}]", err_msg, 0
279  
280  
281  async def _evaluate_artifact_content_embed(
282      expression: str, context: Any, log_identifier: str, config: Optional[Dict] = None
283  ) -> Tuple[Optional[bytes], Optional[str], Optional[str]]:
284      """
285      Evaluates an 'artifact_content' embed (late stage).
286      Loads the raw artifact content (bytes) and its mime type.
287      Returns (content_bytes, mime_type, None) on success,
288      or (None, None, error_message) on failure.
289      The 'expression' here should ONLY be the artifact specifier (filename[:version]).
290      """
291      if EMBED_CHAIN_DELIMITER in expression:
292          err_msg = f"Internal Error: _evaluate_artifact_content_embed received expression containing chain delimiter ('{EMBED_CHAIN_DELIMITER}'). This indicates an upstream parsing issue. Expression: '{expression}'"
293          log.error("%s %s", log_identifier, err_msg)
294          return None, None, err_msg
295  
296      if not isinstance(context, dict):
297          return None, None, "Invalid context for artifact_content embed"
298  
299      artifact_service = context.get("artifact_service")
300      session_context = context.get("session_context")
301      if not artifact_service or not session_context:
302          return None, None, "ArtifactService or session context not available"
303  
304      app_name = session_context.get("app_name")
305      user_id = session_context.get("user_id")
306      session_id = session_context.get("session_id")
307      if not all([app_name, user_id, session_id]):
308          return None, None, "Missing required session identifiers in context"
309  
310      artifact_spec = expression.strip()
311      parts = artifact_spec.split(":", 1)
312      filename = parts[0]
313      version_str = parts[1] if len(parts) > 1 else None
314      version_to_load: Optional[int] = None
315  
316      if not filename:
317          return None, None, "Filename missing for artifact_content"
318  
319      try:
320          if version_str:
321              try:
322                  version_to_load = int(version_str)
323              except ValueError:
324                  err_msg = f"Invalid version format in artifact specifier '{artifact_spec}'. Expected 'filename' or 'filename:integer_version'."
325                  log.warning("%s %s", log_identifier, err_msg)
326                  return None, None, err_msg
327          else:
328              versions = await artifact_service.list_versions(
329                  app_name=app_name,
330                  user_id=user_id,
331                  session_id=session_id,
332                  filename=filename,
333              )
334              if not versions:
335                  return (
336                      None,
337                      None,
338                      f"Artifact '{filename}' not found (no versions available)",
339                  )
340              version_to_load = max(versions)
341  
342          if version_to_load is None:
343              return None, None, f"Could not determine version for artifact '{filename}'"
344  
345          artifact_part = await artifact_service.load_artifact(
346              app_name=app_name,
347              user_id=user_id,
348              session_id=session_id,
349              filename=filename,
350              version=version_to_load,
351          )
352  
353          if not artifact_part or not artifact_part.inline_data:
354              return (
355                  None,
356                  None,
357                  f"Artifact '{filename}' v{version_to_load} not found or empty",
358              )
359  
360          content_bytes = artifact_part.inline_data.data
361          mime_type = artifact_part.inline_data.mime_type
362  
363          limit_bytes = (
364              config.get("gateway_max_artifact_resolve_size_bytes", -1) if config else -1
365          )
366          if limit_bytes >= 0 and len(content_bytes) > limit_bytes:
367              error_msg = f"Artifact '{filename}' v{version_to_load} exceeds maximum size limit ({len(content_bytes)} > {limit_bytes} bytes)"
368              log.warning("%s %s", log_identifier, error_msg)
369              return None, None, error_msg
370  
371          return content_bytes, mime_type, None
372  
373      except FileNotFoundError:
374          return None, None, f"Artifact '{filename}' v{version_str or 'latest'} not found"
375      except Exception as e:
376          log.exception(
377              "%s Error loading artifact content for '%s' v%s: %s",
378              log_identifier,
379              filename,
380              version_str or "latest",
381              e,
382          )
383          return (
384              None,
385              None,
386              f"Error loading artifact content for '{filename}' v{version_str or 'latest'}: {e}",
387          )
388  
389  
390  EMBED_EVALUATORS: Dict[str, Callable[..., Tuple[Any, Optional[str]]]] = {
391      "math": _evaluate_math_embed,
392      "datetime": _evaluate_datetime_embed,
393      "uuid": _evaluate_uuid_embed,
394      "artifact_meta": _evaluate_artifact_meta_embed,
395  }