evaluators.py
1 """ 2 Contains individual evaluator functions for different embed types 3 and the mapping dictionary. 4 """ 5 6 import logging 7 import json 8 from datetime import datetime 9 import uuid 10 from typing import Any, Callable, Dict, Optional, Tuple 11 from asteval import Interpreter 12 import math, random 13 14 from ....agent.utils.artifact_helpers import format_metadata_for_llm 15 from .constants import EMBED_CHAIN_DELIMITER 16 17 log = logging.getLogger(__name__) 18 19 MATH_SAFE_SYMBOLS = { 20 # Basic math operations 21 "abs": abs, 22 "round": round, 23 "min": min, 24 "max": max, 25 # Math module functions 26 "sin": math.sin, 27 "cos": math.cos, 28 "tan": math.tan, 29 "sqrt": math.sqrt, 30 "pow": math.pow, 31 "exp": math.exp, 32 "log": math.log, 33 "log10": math.log10, 34 "pi": math.pi, 35 "e": math.e, 36 "inf": math.inf, 37 "ceil": math.ceil, 38 "floor": math.floor, 39 # Trigonometric helpers 40 "radians": math.radians, 41 # Combinatorics/Statistical 42 "factorial": math.factorial, 43 "sum": sum, 44 # Hyperbolic functions 45 "sinh": math.sinh, 46 "cosh": math.cosh, 47 "tanh": math.tanh, 48 # Random functions 49 "random": random.random, 50 "randint": random.randint, 51 "uniform": random.uniform, 52 } 53 54 55 def _evaluate_math_embed( 56 expression: str, 57 context: Any, 58 log_identifier: str, 59 format_spec: Optional[str] = None, 60 ) -> Tuple[str, Optional[str], int]: 61 """ 62 Evaluates a 'math' embed using asteval. 63 Applies format_spec if the result is numeric and format_spec is provided. 64 Returns (string_value, error_message, size_of_string_value). 65 """ 66 try: 67 user_symtable = MATH_SAFE_SYMBOLS.copy() 68 69 local_interpreter = Interpreter(symtable=user_symtable) 70 result = local_interpreter.eval(expression.strip()) 71 72 if local_interpreter.error: 73 error_messages = [err.msg for err in local_interpreter.error] 74 error_msg_str = "; ".join(error_messages) 75 err_str = f"[Error: Math evaluation error: {error_msg_str}]" 76 return err_str, error_msg_str, len(err_str.encode("utf-8")) 77 78 str_value: str 79 if isinstance(result, (int, float)) and format_spec: 80 try: 81 str_value = format(result, format_spec) 82 except ValueError as fmt_err: 83 log.warning( 84 "%s Invalid format_spec '%s' for math result %s: %s. Falling back to str().", 85 log_identifier, 86 format_spec, 87 result, 88 fmt_err, 89 ) 90 str_value = str(result) 91 else: 92 str_value = str(result) 93 94 return str_value, None, len(str_value.encode("utf-8")) 95 except ImportError: 96 err_msg = "Math evaluation skipped: 'asteval' not installed" 97 log.warning("%s %s", log_identifier, err_msg) 98 err_str = f"[Error: {err_msg}]" 99 return err_str, err_msg, len(err_str.encode("utf-8")) 100 except Exception as e: 101 err_msg = f"Math evaluation error: {e}" 102 err_str = f"[Error: {err_msg}]" 103 return err_str, err_msg, len(err_str.encode("utf-8")) 104 105 106 def _evaluate_datetime_embed( 107 expression: str, 108 context: Any, 109 log_identifier: str, 110 format_spec: Optional[str] = None, 111 ) -> Tuple[str, Optional[str], int]: 112 """ 113 Evaluates a 'datetime' embed. Ignores format_spec from '|' syntax. 114 Returns (string_value, error_message, size_of_string_value). 115 """ 116 format_str = expression.strip() 117 now = datetime.now() 118 try: 119 value = None 120 if not format_str or format_str.lower() in ["now", "iso"]: 121 value = now.isoformat() 122 elif format_str.lower() == "timestamp": 123 value = str(now.timestamp()) 124 elif format_str.lower() == "date": 125 value = now.strftime("%Y-%m-%d") 126 elif format_str.lower() == "time": 127 value = now.strftime("%H:%M:%S") 128 else: 129 value = now.strftime(format_str) 130 return value, None, len(value.encode("utf-8")) 131 except Exception as e: 132 err_msg = f"Datetime formatting error: {e}" 133 err_str = f"[Error: {err_msg}]" 134 return err_str, err_msg, len(err_str.encode("utf-8")) 135 136 137 def _evaluate_uuid_embed( 138 expression: str, 139 context: Any, 140 log_identifier: str, 141 format_spec: Optional[str] = None, 142 ) -> Tuple[str, Optional[str], int]: 143 """ 144 Evaluates a 'uuid' embed. Ignores format_spec. 145 Returns (string_value, error_message, size_of_string_value). 146 """ 147 value = str(uuid.uuid4()) 148 return value, None, len(value.encode("utf-8")) 149 150 151 async def _evaluate_artifact_meta_embed( 152 expression: str, 153 context: Dict[str, Any], 154 log_identifier: str, 155 format_spec: Optional[str] = None, 156 ) -> Tuple[str, Optional[str], int]: 157 """ 158 Evaluates an 'artifact_meta' embed (early stage). Ignores format_spec. 159 Context is expected to be a dictionary containing 'artifact_service' 160 and 'session_context' (which itself is a dict with app_name, user_id, session_id). 161 Returns (string_value, error_message, size_of_string_value). 162 """ 163 if not isinstance(context, dict): 164 err_msg = "Invalid context type for artifact_meta, expected dict." 165 return f"[Error: {err_msg}]", err_msg, 0 166 167 artifact_service = context.get("artifact_service") 168 session_ctx_dict = context.get("session_context") 169 170 if not artifact_service: 171 err_msg = "ArtifactService not available in context for artifact_meta" 172 return f"[Error: {err_msg}]", err_msg, 0 173 if not session_ctx_dict or not isinstance(session_ctx_dict, dict): 174 err_msg = ( 175 "Session context dictionary not available in context for artifact_meta" 176 ) 177 return f"[Error: {err_msg}]", err_msg, 0 178 179 app_name = session_ctx_dict.get("app_name") 180 user_id = session_ctx_dict.get("user_id") 181 session_id = session_ctx_dict.get("session_id") 182 183 if not all([app_name, user_id, session_id]): 184 err_msg = "Missing app_name, user_id, or session_id in session_context for artifact_meta" 185 return f"[Error: {err_msg}]", err_msg, 0 186 187 parts = expression.strip().split(":", 1) 188 filename = parts[0] 189 version_str = parts[1] if len(parts) > 1 else None 190 version = None 191 192 if not filename: 193 err_msg = "Filename missing for artifact_meta" 194 return f"[Error: {err_msg}]", err_msg, 0 195 196 version_to_load: Optional[int] = None 197 try: 198 if version_str: 199 version_to_load = int(version_str) 200 else: 201 versions = await artifact_service.list_versions( 202 app_name=app_name, 203 user_id=user_id, 204 session_id=session_id, 205 filename=filename, 206 ) 207 if not versions: 208 err_msg = f"Artifact '{filename}' not found (no versions available)" 209 return f"[Error: {err_msg}]", err_msg, 0 210 version_to_load = max(versions) 211 212 if version_to_load is None: 213 err_msg = f"Could not determine version for artifact_meta '{filename}'" 214 return f"[Error: {err_msg}]", err_msg, 0 215 216 data_artifact_part = await artifact_service.load_artifact( 217 app_name=app_name, 218 user_id=user_id, 219 session_id=session_id, 220 filename=filename, 221 version=version_to_load, 222 ) 223 224 if not data_artifact_part or not data_artifact_part.inline_data: 225 err_msg = ( 226 f"Data artifact '{filename}' v{version_to_load} not found or empty" 227 ) 228 return f"[Error: {err_msg}]", err_msg, 0 229 230 data_mime_type = ( 231 data_artifact_part.inline_data.mime_type or "application/octet-stream" 232 ) 233 data_size = len(data_artifact_part.inline_data.data) 234 235 custom_metadata_from_file = {} 236 metadata_filename = f"{filename}.metadata.json" 237 try: 238 companion_metadata_part = await artifact_service.load_artifact( 239 app_name=app_name, 240 user_id=user_id, 241 session_id=session_id, 242 filename=metadata_filename, 243 version=version_to_load, 244 ) 245 if companion_metadata_part and companion_metadata_part.inline_data: 246 try: 247 custom_metadata_from_file = json.loads( 248 companion_metadata_part.inline_data.data.decode("utf-8") 249 ) 250 except json.JSONDecodeError as json_err: 251 log.warning( 252 f"{log_identifier} Failed to parse companion metadata JSON for '{metadata_filename}' v{version_to_load}: {json_err}" 253 ) 254 except Exception as e_meta_load: 255 log.debug( 256 f"{log_identifier} Could not load companion metadata file '{metadata_filename}' v{version_to_load} (this is often normal): {e_meta_load}" 257 ) 258 259 full_metadata_dict = { 260 "filename": filename, 261 "version": version_to_load, 262 "mime_type": data_mime_type, 263 "size": data_size, 264 **custom_metadata_from_file, 265 } 266 267 formatted_text = format_metadata_for_llm(full_metadata_dict) 268 return formatted_text, None, len(formatted_text.encode("utf-8")) 269 270 except ValueError as ve: 271 err_msg = ( 272 f"Invalid version specified for artifact_meta: '{version_str}'. Error: {ve}" 273 ) 274 return f"[Error: {err_msg}]", err_msg, 0 275 except Exception as e: 276 err_msg = f"Error evaluating artifact_meta for '{filename}' v{version_str or 'latest'}: {e}" 277 log.error(f"{log_identifier} {err_msg}", exc_info=True) 278 return f"[Error: {err_msg}]", err_msg, 0 279 280 281 async def _evaluate_artifact_content_embed( 282 expression: str, context: Any, log_identifier: str, config: Optional[Dict] = None 283 ) -> Tuple[Optional[bytes], Optional[str], Optional[str]]: 284 """ 285 Evaluates an 'artifact_content' embed (late stage). 286 Loads the raw artifact content (bytes) and its mime type. 287 Returns (content_bytes, mime_type, None) on success, 288 or (None, None, error_message) on failure. 289 The 'expression' here should ONLY be the artifact specifier (filename[:version]). 290 """ 291 if EMBED_CHAIN_DELIMITER in expression: 292 err_msg = f"Internal Error: _evaluate_artifact_content_embed received expression containing chain delimiter ('{EMBED_CHAIN_DELIMITER}'). This indicates an upstream parsing issue. Expression: '{expression}'" 293 log.error("%s %s", log_identifier, err_msg) 294 return None, None, err_msg 295 296 if not isinstance(context, dict): 297 return None, None, "Invalid context for artifact_content embed" 298 299 artifact_service = context.get("artifact_service") 300 session_context = context.get("session_context") 301 if not artifact_service or not session_context: 302 return None, None, "ArtifactService or session context not available" 303 304 app_name = session_context.get("app_name") 305 user_id = session_context.get("user_id") 306 session_id = session_context.get("session_id") 307 if not all([app_name, user_id, session_id]): 308 return None, None, "Missing required session identifiers in context" 309 310 artifact_spec = expression.strip() 311 parts = artifact_spec.split(":", 1) 312 filename = parts[0] 313 version_str = parts[1] if len(parts) > 1 else None 314 version_to_load: Optional[int] = None 315 316 if not filename: 317 return None, None, "Filename missing for artifact_content" 318 319 try: 320 if version_str: 321 try: 322 version_to_load = int(version_str) 323 except ValueError: 324 err_msg = f"Invalid version format in artifact specifier '{artifact_spec}'. Expected 'filename' or 'filename:integer_version'." 325 log.warning("%s %s", log_identifier, err_msg) 326 return None, None, err_msg 327 else: 328 versions = await artifact_service.list_versions( 329 app_name=app_name, 330 user_id=user_id, 331 session_id=session_id, 332 filename=filename, 333 ) 334 if not versions: 335 return ( 336 None, 337 None, 338 f"Artifact '{filename}' not found (no versions available)", 339 ) 340 version_to_load = max(versions) 341 342 if version_to_load is None: 343 return None, None, f"Could not determine version for artifact '{filename}'" 344 345 artifact_part = await artifact_service.load_artifact( 346 app_name=app_name, 347 user_id=user_id, 348 session_id=session_id, 349 filename=filename, 350 version=version_to_load, 351 ) 352 353 if not artifact_part or not artifact_part.inline_data: 354 return ( 355 None, 356 None, 357 f"Artifact '{filename}' v{version_to_load} not found or empty", 358 ) 359 360 content_bytes = artifact_part.inline_data.data 361 mime_type = artifact_part.inline_data.mime_type 362 363 limit_bytes = ( 364 config.get("gateway_max_artifact_resolve_size_bytes", -1) if config else -1 365 ) 366 if limit_bytes >= 0 and len(content_bytes) > limit_bytes: 367 error_msg = f"Artifact '{filename}' v{version_to_load} exceeds maximum size limit ({len(content_bytes)} > {limit_bytes} bytes)" 368 log.warning("%s %s", log_identifier, error_msg) 369 return None, None, error_msg 370 371 return content_bytes, mime_type, None 372 373 except FileNotFoundError: 374 return None, None, f"Artifact '{filename}' v{version_str or 'latest'} not found" 375 except Exception as e: 376 log.exception( 377 "%s Error loading artifact content for '%s' v%s: %s", 378 log_identifier, 379 filename, 380 version_str or "latest", 381 e, 382 ) 383 return ( 384 None, 385 None, 386 f"Error loading artifact content for '{filename}' v{version_str or 'latest'}: {e}", 387 ) 388 389 390 EMBED_EVALUATORS: Dict[str, Callable[..., Tuple[Any, Optional[str]]]] = { 391 "math": _evaluate_math_embed, 392 "datetime": _evaluate_datetime_embed, 393 "uuid": _evaluate_uuid_embed, 394 "artifact_meta": _evaluate_artifact_meta_embed, 395 }