events.py
1 import inspect 2 import os 3 import sys 4 from collections import Counter 5 from enum import Enum 6 from typing import TYPE_CHECKING, Any 7 from urllib.parse import urlparse 8 9 from mlflow.entities import Feedback 10 from mlflow.entities.issue import IssueSeverity, IssueStatus 11 from mlflow.environment_variables import MLFLOW_ENABLE_OTEL_GENAI_SEMCONV 12 from mlflow.telemetry.constant import ( 13 GENAI_MODULES, 14 MODULES_TO_CHECK_IMPORT, 15 ) 16 17 if TYPE_CHECKING: 18 from mlflow.genai.scorers.base import Scorer 19 20 21 GENAI_EVALUATION_PATH = "mlflow/genai/evaluation/base" 22 GENAI_SCORERS_PATH = "mlflow/genai/scorers/base" 23 GENAI_EVALUATE_FUNCTION = "_run_harness" 24 SCORER_RUN_FUNCTION = "run" 25 26 27 def _get_scorer_class_name_for_tracking(scorer: "Scorer") -> str: 28 from mlflow.genai.scorers.builtin_scorers import BuiltInScorer 29 30 if isinstance(scorer, BuiltInScorer): 31 return type(scorer).__name__ 32 33 try: 34 from mlflow.genai.scorers.deepeval import DeepEvalScorer 35 36 if isinstance(scorer, DeepEvalScorer): 37 return f"DeepEval:{scorer.name}" 38 except ImportError: 39 pass 40 41 try: 42 from mlflow.genai.scorers.ragas import RagasScorer 43 44 if isinstance(scorer, RagasScorer): 45 return f"Ragas:{scorer.name}" 46 except ImportError: 47 pass 48 49 return "UserDefinedScorer" 50 51 52 class Event: 53 name: str 54 55 @classmethod 56 def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None: 57 """ 58 Parse the arguments and return the params. 59 """ 60 return None 61 62 63 class CreateExperimentEvent(Event): 64 name: str = "create_experiment" 65 66 @classmethod 67 def parse_result(cls, result: Any) -> dict[str, Any] | None: 68 # create_experiment API returns the experiment id 69 return {"experiment_id": result} 70 71 72 class CreatePromptEvent(Event): 73 name: str = "create_prompt" 74 75 76 class LoadPromptEvent(Event): 77 name: str = "load_prompt" 78 79 @classmethod 80 def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None: 81 name_or_uri = arguments.get("name_or_uri", "") 82 # Check if alias is used (format: "prompts:/name@alias") 83 uses_alias = "@" in name_or_uri 84 return {"uses_alias": uses_alias} 85 86 87 class StartTraceEvent(Event): 88 name: str = "start_trace" 89 90 @classmethod 91 def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None: 92 # Capture the set of currently imported packages at trace start time to 93 # understand the flavor of the trace. 94 return { 95 "imports": [pkg for pkg in GENAI_MODULES if pkg in sys.modules], 96 "format": "genai_semconv" if MLFLOW_ENABLE_OTEL_GENAI_SEMCONV.get() else "native", 97 } 98 99 100 class LogAssessmentEvent(Event): 101 name: str = "log_assessment" 102 103 @classmethod 104 def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None: 105 from mlflow.entities.assessment import Expectation, Feedback 106 107 assessment = arguments.get("assessment") 108 if assessment is None: 109 return None 110 111 if isinstance(assessment, Expectation): 112 return {"type": "expectation", "source_type": assessment.source.source_type} 113 elif isinstance(assessment, Feedback): 114 return {"type": "feedback", "source_type": assessment.source.source_type} 115 116 117 class EvaluateEvent(Event): 118 name: str = "evaluate" 119 120 121 class GenAIEvaluateEvent(Event): 122 name: str = "genai_evaluate" 123 124 @classmethod 125 def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None: 126 from mlflow.genai.scorers.base import Scorer 127 128 record_params = {} 129 130 # Track if predict_fn is provided 131 record_params["predict_fn_provided"] = arguments.get("predict_fn") is not None 132 133 # Track eval data type 134 eval_data = arguments.get("data") 135 if eval_data is not None: 136 from mlflow.genai.evaluation.utils import _get_eval_data_type 137 138 record_params["eval_data_type"] = _get_eval_data_type(eval_data) 139 140 # Track scorer information 141 scorers = arguments.get("scorers") or [] 142 scorer_info = [ 143 { 144 "class": _get_scorer_class_name_for_tracking(scorer), 145 "kind": scorer.kind.value, 146 "scope": "session" if scorer.is_session_level_scorer else "trace", 147 } 148 for scorer in scorers 149 if isinstance(scorer, Scorer) 150 ] 151 record_params["scorer_info"] = scorer_info 152 153 return record_params 154 155 @classmethod 156 def parse_result(cls, result: Any) -> dict[str, Any] | None: 157 _, telemetry_data = result 158 159 if not isinstance(telemetry_data, dict): 160 return None 161 162 return telemetry_data 163 164 165 class CreateLoggedModelEvent(Event): 166 name: str = "create_logged_model" 167 168 @classmethod 169 def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None: 170 data: dict[str, Any] = {} 171 if flavor := arguments.get("flavor"): 172 data["flavor"] = flavor.removeprefix("mlflow.") 173 if serialization_format := arguments.get("serialization_format"): 174 data["serialization_format"] = serialization_format 175 if arguments.get("uses_uv"): 176 data["uses_uv"] = True 177 return data or None 178 179 180 class GetLoggedModelEvent(Event): 181 name: str = "get_logged_model" 182 183 @classmethod 184 def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None: 185 return { 186 "imports": [pkg for pkg in MODULES_TO_CHECK_IMPORT if pkg in sys.modules], 187 } 188 189 190 class CreateRegisteredModelEvent(Event): 191 name: str = "create_registered_model" 192 193 @classmethod 194 def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None: 195 tags = arguments.get("tags") or {} 196 return {"is_prompt": _is_prompt(tags)} 197 198 199 class CreateRunEvent(Event): 200 name: str = "create_run" 201 202 @classmethod 203 def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None: 204 # Capture the set of currently imported packages at run creation time to 205 # understand how MLflow is used together with other libraries. Collecting 206 # this data at run creation ensures accuracy and completeness. 207 return { 208 "imports": [pkg for pkg in MODULES_TO_CHECK_IMPORT if pkg in sys.modules], 209 "experiment_id": arguments.get("experiment_id"), 210 } 211 212 213 class CreateModelVersionEvent(Event): 214 name: str = "create_model_version" 215 216 @classmethod 217 def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None: 218 tags = arguments.get("tags") or {} 219 return {"is_prompt": _is_prompt(tags)} 220 221 222 class CreateDatasetEvent(Event): 223 name: str = "create_dataset" 224 225 226 class MergeRecordsEvent(Event): 227 name: str = "merge_records" 228 229 @classmethod 230 def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None: 231 from mlflow.entities.evaluation_dataset import ( 232 DatasetGranularity, 233 EvaluationDataset, 234 ) 235 236 if arguments is None: 237 return None 238 239 records = arguments.get("records") 240 if records is None: 241 return None 242 243 try: 244 count = len(records) 245 except TypeError: 246 return None 247 248 if count == 0: 249 return None 250 251 input_type = type(records).__name__.lower() 252 input_keys: set[str] | None = None 253 254 if "dataframe" in input_type: 255 input_type = "pandas" 256 try: 257 if "inputs" in records.columns: 258 if first_inputs := records.iloc[0].get("inputs", {}): 259 input_keys = set(first_inputs.keys()) 260 except Exception: 261 pass 262 elif isinstance(records, list): 263 first_elem = records[0] 264 if hasattr(first_elem, "__class__") and first_elem.__class__.__name__ == "Trace": 265 input_type = "list[trace]" 266 elif isinstance(first_elem, dict): 267 input_type = "list[dict]" 268 if first_inputs := first_elem.get("inputs", {}): 269 input_keys = set(first_inputs.keys()) 270 else: 271 input_type = "list" 272 else: 273 input_type = "other" 274 275 if input_type == "list[trace]": 276 dataset_type = DatasetGranularity.TRACE 277 elif input_keys: 278 dataset_type = EvaluationDataset._classify_input_fields(input_keys) 279 else: 280 dataset_type = DatasetGranularity.UNKNOWN 281 282 return { 283 "record_count": count, 284 "input_type": input_type, 285 "dataset_type": dataset_type.value, 286 } 287 288 289 class DatasetToDataFrameEvent(Event): 290 name: str = "dataset_to_df" 291 292 @classmethod 293 def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None: 294 from mlflow.entities.evaluation_dataset import EvaluationDataset 295 296 dataset_instance = arguments.get("self") 297 if not isinstance(dataset_instance, EvaluationDataset): 298 return None 299 300 callsite = "direct_call" 301 frame = sys._getframe() 302 for _ in range(10): 303 if frame is None: 304 break 305 frame_filename = frame.f_code.co_filename.replace("\\", "/") 306 if "mlflow/genai/evaluation" in frame_filename: 307 callsite = "genai_evaluate" 308 break 309 if "mlflow/genai/simulators" in frame_filename: 310 callsite = "conversation_simulator" 311 break 312 frame = frame.f_back 313 314 granularity = dataset_instance._get_existing_granularity() 315 return {"dataset_type": granularity.value, "callsite": callsite} 316 317 @classmethod 318 def parse_result(cls, result: Any) -> dict[str, Any] | None: 319 if result is None: 320 return {"record_count": 0} 321 322 return {"record_count": len(result)} 323 324 325 def _is_prompt(tags: dict[str, str]) -> bool: 326 try: 327 from mlflow.prompt.constants import IS_PROMPT_TAG_KEY 328 except ImportError: 329 return False 330 return tags.get(IS_PROMPT_TAG_KEY, "false").lower() == "true" 331 332 333 class CreateWebhookEvent(Event): 334 name: str = "create_webhook" 335 336 @classmethod 337 def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None: 338 events = arguments.get("events") or [] 339 return {"events": [str(event) for event in events]} 340 341 342 class PromptOptimizationEvent(Event): 343 name: str = "prompt_optimization" 344 345 @classmethod 346 def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None: 347 result = {} 348 349 # Track the optimizer type used 350 if optimizer := arguments.get("optimizer"): 351 result["optimizer_type"] = type(optimizer).__name__ 352 else: 353 result["optimizer_type"] = None 354 355 # Track the number of prompts being optimized 356 prompt_uris = arguments.get("prompt_uris") or [] 357 try: 358 result["prompt_count"] = len(prompt_uris) 359 except TypeError: 360 result["prompt_count"] = None 361 362 # Track if custom scorers are provided and how many 363 scorers = arguments.get("scorers") 364 try: 365 result["scorer_count"] = len(scorers) 366 except TypeError: 367 result["scorer_count"] = None 368 369 # Track if custom aggregation is provided 370 result["custom_aggregation"] = arguments.get("aggregation") is not None 371 372 return result 373 374 375 class LogDatasetEvent(Event): 376 name: str = "log_dataset" 377 378 379 class LogMetricEvent(Event): 380 name: str = "log_metric" 381 382 @classmethod 383 def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None: 384 return {"synchronous": arguments.get("synchronous")} 385 386 387 class LogParamEvent(Event): 388 name: str = "log_param" 389 390 @classmethod 391 def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None: 392 return {"synchronous": arguments.get("synchronous")} 393 394 395 class LogBatchEvent(Event): 396 name: str = "log_batch" 397 398 @classmethod 399 def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None: 400 return { 401 "metrics": bool(arguments.get("metrics")), 402 "params": bool(arguments.get("params")), 403 "tags": bool(arguments.get("tags")), 404 "synchronous": arguments.get("synchronous"), 405 } 406 407 408 class McpRunEvent(Event): 409 name: str = "mcp_run" 410 411 412 class TrackingServerStartEvent(Event): 413 name: str = "tracking_server_start" 414 415 @classmethod 416 def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None: 417 backend_store_uri = arguments.get("backend_store_uri") or "" 418 scheme = urlparse(backend_store_uri).scheme 419 # Treat empty schemes (relative paths) and single-letter schemes 420 # (Windows drive letters like C:\) as local file storage. 421 # Strip SQLAlchemy driver suffixes (e.g. mysql+pymysql → mysql). 422 backend_store_type = "file" if not scheme or len(scheme) == 1 else scheme.split("+")[0] 423 424 app_name = arguments.get("app_name") 425 return { 426 "auth_enabled": app_name == "basic-auth", 427 "app_name": app_name, 428 "backend_store_type": backend_store_type, 429 "serve_artifacts": bool(arguments.get("serve_artifacts")), 430 "artifacts_only": bool(arguments.get("artifacts_only")), 431 "expose_prometheus": arguments.get("expose_prometheus") is not None, 432 "enable_workspaces": bool(arguments.get("enable_workspaces")), 433 "workers": arguments.get("workers"), 434 "dev": bool(arguments.get("dev")), 435 } 436 437 438 class GatewayStartEvent(Event): 439 name: str = "gateway_start" 440 441 442 # Gateway Resource CRUD Events 443 class GatewayCreateEndpointEvent(Event): 444 name: str = "gateway_create_endpoint" 445 446 @classmethod 447 def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None: 448 return { 449 "has_fallback_config": arguments.get("fallback_config") is not None, 450 "routing_strategy": str(arguments.get("routing_strategy")) 451 if arguments.get("routing_strategy") 452 else None, 453 "num_model_configs": len(arguments.get("model_configs") or []), 454 "usage_tracking": arguments.get("usage_tracking"), 455 } 456 457 458 class GatewayUpdateEndpointEvent(Event): 459 name: str = "gateway_update_endpoint" 460 461 @classmethod 462 def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None: 463 return { 464 "has_fallback_config": arguments.get("fallback_config") is not None, 465 "routing_strategy": str(arguments.get("routing_strategy")) 466 if arguments.get("routing_strategy") 467 else None, 468 "num_model_configs": len(arguments.get("model_configs")) 469 if arguments.get("model_configs") is not None 470 else None, 471 "usage_tracking": arguments.get("usage_tracking"), 472 } 473 474 475 class GatewayDeleteEndpointEvent(Event): 476 name: str = "gateway_delete_endpoint" 477 478 479 class GatewayGetEndpointEvent(Event): 480 name: str = "gateway_get_endpoint" 481 482 483 class GatewayListEndpointsEvent(Event): 484 name: str = "gateway_list_endpoints" 485 486 @classmethod 487 def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None: 488 return { 489 "filter_by_provider": arguments.get("provider") is not None, 490 } 491 492 493 class GatewayCreateModelDefinitionEvent(Event): 494 name: str = "gateway_create_model_definition" 495 496 @classmethod 497 def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None: 498 return { 499 "model_name": arguments.get("model_name"), 500 "provider": arguments.get("provider"), 501 } 502 503 504 # Gateway Budget Policy CRUD Events 505 class GatewayCreateBudgetPolicyEvent(Event): 506 name: str = "gateway_create_budget_policy" 507 508 @classmethod 509 def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None: 510 def _enum_str(val: Any) -> str | None: 511 if val is None: 512 return None 513 return val.value if hasattr(val, "value") else str(val) 514 515 duration = arguments.get("duration") 516 return { 517 "budget_unit": _enum_str(arguments.get("budget_unit")), 518 "duration_unit": _enum_str(duration.unit if duration is not None else None), 519 "target_scope": _enum_str(arguments.get("target_scope")), 520 "budget_action": _enum_str(arguments.get("budget_action")), 521 } 522 523 524 class GatewayUpdateBudgetPolicyEvent(Event): 525 name: str = "gateway_update_budget_policy" 526 527 528 class GatewayDeleteBudgetPolicyEvent(Event): 529 name: str = "gateway_delete_budget_policy" 530 531 532 class GatewayListBudgetPoliciesEvent(Event): 533 name: str = "gateway_list_budget_policies" 534 535 536 # Gateway Guardrail CRUD Events 537 class GatewayCreateGuardrailEvent(Event): 538 name: str = "gateway_create_guardrail" 539 540 @classmethod 541 def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None: 542 return { 543 "stage": str(arguments.get("stage")) if arguments.get("stage") else None, 544 "action": str(arguments.get("action")) if arguments.get("action") else None, 545 } 546 547 548 class GatewayUpdateGuardrailEvent(Event): 549 name: str = "gateway_update_guardrail" 550 551 @classmethod 552 def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None: 553 return { 554 "stage": str(arguments.get("stage")) if arguments.get("stage") else None, 555 "action": str(arguments.get("action")) if arguments.get("action") else None, 556 } 557 558 559 class GatewayDeleteGuardrailEvent(Event): 560 name: str = "gateway_delete_guardrail" 561 562 563 # Gateway Secret CRUD Events 564 class GatewayCreateSecretEvent(Event): 565 name: str = "gateway_create_secret" 566 567 @classmethod 568 def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None: 569 return { 570 "provider": arguments.get("provider"), 571 } 572 573 574 class GatewayUpdateSecretEvent(Event): 575 name: str = "gateway_update_secret" 576 577 578 class GatewayDeleteSecretEvent(Event): 579 name: str = "gateway_delete_secret" 580 581 582 class GatewayListSecretsEvent(Event): 583 name: str = "gateway_list_secrets" 584 585 @classmethod 586 def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None: 587 return { 588 "filter_by_provider": arguments.get("provider") is not None, 589 } 590 591 592 # Gateway Invocation Events 593 class GatewayInvocationType(str, Enum): 594 """Type of gateway invocation endpoint.""" 595 596 MLFLOW_INVOCATIONS = "mlflow_invocations" 597 MLFLOW_CHAT_COMPLETIONS = "mlflow_chat_completions" 598 OPENAI_PASSTHROUGH_CHAT = "openai_passthrough_chat" 599 OPENAI_PASSTHROUGH_EMBEDDINGS = "openai_passthrough_embeddings" 600 OPENAI_PASSTHROUGH_RESPONSES = "openai_passthrough_responses" 601 ANTHROPIC_PASSTHROUGH_MESSAGES = "anthropic_passthrough_messages" 602 GEMINI_PASSTHROUGH_GENERATE_CONTENT = "gemini_passthrough_generate_content" 603 GEMINI_PASSTHROUGH_STREAM_GENERATE_CONTENT = "gemini_passthrough_stream_generate_content" 604 605 606 class GatewayInvocationEvent(Event): 607 name: str = "gateway_invocation" 608 609 610 class AiCommandRunEvent(Event): 611 name: str = "ai_command_run" 612 613 614 class TracingContextPropagation(Event): 615 name: str = "tracing_context_propagation" 616 617 618 class GitModelVersioningEvent(Event): 619 name: str = "git_model_versioning" 620 621 622 class InvokeCustomJudgeModelEvent(Event): 623 name: str = "invoke_custom_judge_model" 624 625 @classmethod 626 def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None: 627 from mlflow.metrics.genai.model_utils import _parse_model_uri 628 629 model_uri = arguments.get("model_uri") 630 if not model_uri: 631 return {"model_provider": None} 632 633 model_provider, _ = _parse_model_uri(model_uri) 634 return {"model_provider": model_provider} 635 636 637 class MakeJudgeEvent(Event): 638 name: str = "make_judge" 639 640 @classmethod 641 def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None: 642 model = arguments.get("model") 643 if model and isinstance(model, str): 644 model_provider = model.split(":")[0] if ":" in model else None 645 return {"model_provider": model_provider} 646 return {"model_provider": None} 647 648 649 class AlignJudgeEvent(Event): 650 name: str = "align_judge" 651 652 @classmethod 653 def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None: 654 result = {} 655 656 if (traces := arguments.get("traces")) is not None: 657 try: 658 result["trace_count"] = len(traces) 659 except TypeError: 660 result["trace_count"] = None 661 662 if optimizer := arguments.get("optimizer"): 663 result["optimizer_type"] = type(optimizer).__name__ 664 else: 665 result["optimizer_type"] = "default" 666 667 return result 668 669 670 class AutologgingEvent(Event): 671 name: str = "autologging" 672 673 674 class TraceAttachmentsEvent(Event): 675 name: str = "trace_attachments" 676 677 @classmethod 678 def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None: 679 if attachments := arguments.get("attachments"): 680 content_types = Counter(att.content_type for att in attachments.values()) 681 return {"content_types": dict(content_types)} 682 return None 683 684 685 class TraceSource(str, Enum): 686 """Source of a trace received by the MLflow server.""" 687 688 MLFLOW_PYTHON_CLIENT = "MLFLOW_PYTHON_CLIENT" 689 EXTERNAL_OTEL_CLIENT = "EXTERNAL_OTEL_CLIENT" 690 UNKNOWN = "UNKNOWN" 691 692 693 class TracesReceivedByServerEvent(Event): 694 name: str = "traces_received_by_server" 695 696 697 class SimulateConversationEvent(Event): 698 name: str = "simulate_conversation" 699 700 @classmethod 701 def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None: 702 callsite = "conversation_simulator" 703 for frame_info in inspect.stack()[:10]: 704 frame_filename = frame_info.filename 705 frame_function = frame_info.function 706 707 if ( 708 GENAI_EVALUATION_PATH in frame_filename.replace("\\", "/") 709 and frame_function == GENAI_EVALUATE_FUNCTION 710 ): 711 callsite = "genai_evaluate" 712 break 713 714 return {"callsite": callsite} 715 716 @classmethod 717 def parse_result(cls, result: Any) -> dict[str, Any] | None: 718 return { 719 "simulated_conversation_info": [ 720 {"turn_count": len(conversation)} for conversation in result 721 ] 722 } 723 724 725 class OptimizePromptsJobEvent(Event): 726 name: str = "optimize_prompts_job" 727 728 @classmethod 729 def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None: 730 result = {} 731 732 if optimizer_type := arguments.get("optimizer_type"): 733 result["optimizer_type"] = optimizer_type 734 735 if "scorer_names" in arguments: 736 scorer_names = arguments["scorer_names"] 737 # `scorer_count` is useful for indicating zero-shot vs few-shot optimization, and to 738 # track the pattern of how users use prompt optimization. 739 result["scorer_count"] = len(scorer_names) 740 741 return result or None 742 743 744 class ScorerCallEvent(Event): 745 name: str = "scorer_call" 746 747 @classmethod 748 def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None: 749 from mlflow.genai.scorers.base import Scorer 750 751 scorer_instance = arguments.get("self") 752 if not isinstance(scorer_instance, Scorer): 753 return None 754 755 # Check if running inside an online scoring job 756 # Import here to avoid circular imports 757 from mlflow.genai.scorers.job import ( 758 ONLINE_SESSION_SCORER_JOB_NAME, 759 ONLINE_TRACE_SCORER_JOB_NAME, 760 ) 761 from mlflow.server.jobs.utils import MLFLOW_SERVER_JOB_NAME_ENV_VAR 762 763 job_name = os.environ.get(MLFLOW_SERVER_JOB_NAME_ENV_VAR) 764 if job_name in (ONLINE_TRACE_SCORER_JOB_NAME, ONLINE_SESSION_SCORER_JOB_NAME): 765 callsite = "online_scoring" 766 else: 767 callsite = "direct_scorer_call" 768 for frame_info in inspect.stack()[:10]: 769 frame_filename = frame_info.filename 770 frame_function = frame_info.function 771 772 if ( 773 GENAI_SCORERS_PATH in frame_filename.replace("\\", "/") 774 and frame_function == SCORER_RUN_FUNCTION 775 ): 776 callsite = "genai_evaluate" 777 break 778 779 return { 780 "scorer_class": _get_scorer_class_name_for_tracking(scorer_instance), 781 "scorer_kind": scorer_instance.kind.value, 782 "scope": "session" if scorer_instance.is_session_level_scorer else "trace", 783 "callsite": callsite, 784 } 785 786 @classmethod 787 def parse_result(cls, result: Any) -> dict[str, Any] | None: 788 if isinstance(result, Feedback): 789 return {"has_feedback_error": result.error is not None} 790 791 if isinstance(result, list) and result and all(isinstance(f, Feedback) for f in result): 792 return {"has_feedback_error": any(f.error is not None for f in result)} 793 794 return {"has_feedback_error": False} 795 796 797 class DiscoverIssuesEvent(Event): 798 name: str = "discover_issues" 799 800 @classmethod 801 def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None: 802 return { 803 "model": arguments.get("model"), 804 "trace_count": len(arguments.get("traces") or []), 805 "categories": arguments.get("categories"), 806 "source_run_id": arguments.get("run_id"), 807 } 808 809 @classmethod 810 def parse_result(cls, result: Any) -> dict[str, Any] | None: 811 return { 812 "issue_count": len(result.issues), 813 "total_traces_analyzed": result.total_traces_analyzed, 814 "total_cost_usd": result.total_cost_usd, 815 "triage_run_id": result.triage_run_id, 816 } 817 818 819 class UpdateIssueEvent(Event): 820 name: str = "update_issue" 821 822 @classmethod 823 def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None: 824 status = arguments.get("status") 825 if isinstance(status, IssueStatus): 826 status = status.value 827 severity = arguments.get("severity") 828 if isinstance(severity, IssueSeverity): 829 severity = severity.value 830 return { 831 "status": status, 832 "has_name": arguments.get("name") is not None, 833 "has_description": arguments.get("description") is not None, 834 "severity": severity, 835 } 836 837 @classmethod 838 def parse_result(cls, result: Any) -> dict[str, Any]: 839 return {"source_run_id": result.source_run_id} if result else {}