/ mlflow / telemetry / events.py
events.py
  1  import inspect
  2  import os
  3  import sys
  4  from collections import Counter
  5  from enum import Enum
  6  from typing import TYPE_CHECKING, Any
  7  from urllib.parse import urlparse
  8  
  9  from mlflow.entities import Feedback
 10  from mlflow.entities.issue import IssueSeverity, IssueStatus
 11  from mlflow.environment_variables import MLFLOW_ENABLE_OTEL_GENAI_SEMCONV
 12  from mlflow.telemetry.constant import (
 13      GENAI_MODULES,
 14      MODULES_TO_CHECK_IMPORT,
 15  )
 16  
 17  if TYPE_CHECKING:
 18      from mlflow.genai.scorers.base import Scorer
 19  
 20  
 21  GENAI_EVALUATION_PATH = "mlflow/genai/evaluation/base"
 22  GENAI_SCORERS_PATH = "mlflow/genai/scorers/base"
 23  GENAI_EVALUATE_FUNCTION = "_run_harness"
 24  SCORER_RUN_FUNCTION = "run"
 25  
 26  
 27  def _get_scorer_class_name_for_tracking(scorer: "Scorer") -> str:
 28      from mlflow.genai.scorers.builtin_scorers import BuiltInScorer
 29  
 30      if isinstance(scorer, BuiltInScorer):
 31          return type(scorer).__name__
 32  
 33      try:
 34          from mlflow.genai.scorers.deepeval import DeepEvalScorer
 35  
 36          if isinstance(scorer, DeepEvalScorer):
 37              return f"DeepEval:{scorer.name}"
 38      except ImportError:
 39          pass
 40  
 41      try:
 42          from mlflow.genai.scorers.ragas import RagasScorer
 43  
 44          if isinstance(scorer, RagasScorer):
 45              return f"Ragas:{scorer.name}"
 46      except ImportError:
 47          pass
 48  
 49      return "UserDefinedScorer"
 50  
 51  
 52  class Event:
 53      name: str
 54  
 55      @classmethod
 56      def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None:
 57          """
 58          Parse the arguments and return the params.
 59          """
 60          return None
 61  
 62  
 63  class CreateExperimentEvent(Event):
 64      name: str = "create_experiment"
 65  
 66      @classmethod
 67      def parse_result(cls, result: Any) -> dict[str, Any] | None:
 68          # create_experiment API returns the experiment id
 69          return {"experiment_id": result}
 70  
 71  
 72  class CreatePromptEvent(Event):
 73      name: str = "create_prompt"
 74  
 75  
 76  class LoadPromptEvent(Event):
 77      name: str = "load_prompt"
 78  
 79      @classmethod
 80      def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None:
 81          name_or_uri = arguments.get("name_or_uri", "")
 82          # Check if alias is used (format: "prompts:/name@alias")
 83          uses_alias = "@" in name_or_uri
 84          return {"uses_alias": uses_alias}
 85  
 86  
 87  class StartTraceEvent(Event):
 88      name: str = "start_trace"
 89  
 90      @classmethod
 91      def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None:
 92          # Capture the set of currently imported packages at trace start time to
 93          # understand the flavor of the trace.
 94          return {
 95              "imports": [pkg for pkg in GENAI_MODULES if pkg in sys.modules],
 96              "format": "genai_semconv" if MLFLOW_ENABLE_OTEL_GENAI_SEMCONV.get() else "native",
 97          }
 98  
 99  
100  class LogAssessmentEvent(Event):
101      name: str = "log_assessment"
102  
103      @classmethod
104      def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None:
105          from mlflow.entities.assessment import Expectation, Feedback
106  
107          assessment = arguments.get("assessment")
108          if assessment is None:
109              return None
110  
111          if isinstance(assessment, Expectation):
112              return {"type": "expectation", "source_type": assessment.source.source_type}
113          elif isinstance(assessment, Feedback):
114              return {"type": "feedback", "source_type": assessment.source.source_type}
115  
116  
117  class EvaluateEvent(Event):
118      name: str = "evaluate"
119  
120  
121  class GenAIEvaluateEvent(Event):
122      name: str = "genai_evaluate"
123  
124      @classmethod
125      def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None:
126          from mlflow.genai.scorers.base import Scorer
127  
128          record_params = {}
129  
130          # Track if predict_fn is provided
131          record_params["predict_fn_provided"] = arguments.get("predict_fn") is not None
132  
133          # Track eval data type
134          eval_data = arguments.get("data")
135          if eval_data is not None:
136              from mlflow.genai.evaluation.utils import _get_eval_data_type
137  
138              record_params["eval_data_type"] = _get_eval_data_type(eval_data)
139  
140          # Track scorer information
141          scorers = arguments.get("scorers") or []
142          scorer_info = [
143              {
144                  "class": _get_scorer_class_name_for_tracking(scorer),
145                  "kind": scorer.kind.value,
146                  "scope": "session" if scorer.is_session_level_scorer else "trace",
147              }
148              for scorer in scorers
149              if isinstance(scorer, Scorer)
150          ]
151          record_params["scorer_info"] = scorer_info
152  
153          return record_params
154  
155      @classmethod
156      def parse_result(cls, result: Any) -> dict[str, Any] | None:
157          _, telemetry_data = result
158  
159          if not isinstance(telemetry_data, dict):
160              return None
161  
162          return telemetry_data
163  
164  
165  class CreateLoggedModelEvent(Event):
166      name: str = "create_logged_model"
167  
168      @classmethod
169      def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None:
170          data: dict[str, Any] = {}
171          if flavor := arguments.get("flavor"):
172              data["flavor"] = flavor.removeprefix("mlflow.")
173          if serialization_format := arguments.get("serialization_format"):
174              data["serialization_format"] = serialization_format
175          if arguments.get("uses_uv"):
176              data["uses_uv"] = True
177          return data or None
178  
179  
180  class GetLoggedModelEvent(Event):
181      name: str = "get_logged_model"
182  
183      @classmethod
184      def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None:
185          return {
186              "imports": [pkg for pkg in MODULES_TO_CHECK_IMPORT if pkg in sys.modules],
187          }
188  
189  
190  class CreateRegisteredModelEvent(Event):
191      name: str = "create_registered_model"
192  
193      @classmethod
194      def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None:
195          tags = arguments.get("tags") or {}
196          return {"is_prompt": _is_prompt(tags)}
197  
198  
199  class CreateRunEvent(Event):
200      name: str = "create_run"
201  
202      @classmethod
203      def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None:
204          # Capture the set of currently imported packages at run creation time to
205          # understand how MLflow is used together with other libraries. Collecting
206          # this data at run creation ensures accuracy and completeness.
207          return {
208              "imports": [pkg for pkg in MODULES_TO_CHECK_IMPORT if pkg in sys.modules],
209              "experiment_id": arguments.get("experiment_id"),
210          }
211  
212  
213  class CreateModelVersionEvent(Event):
214      name: str = "create_model_version"
215  
216      @classmethod
217      def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None:
218          tags = arguments.get("tags") or {}
219          return {"is_prompt": _is_prompt(tags)}
220  
221  
222  class CreateDatasetEvent(Event):
223      name: str = "create_dataset"
224  
225  
226  class MergeRecordsEvent(Event):
227      name: str = "merge_records"
228  
229      @classmethod
230      def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None:
231          from mlflow.entities.evaluation_dataset import (
232              DatasetGranularity,
233              EvaluationDataset,
234          )
235  
236          if arguments is None:
237              return None
238  
239          records = arguments.get("records")
240          if records is None:
241              return None
242  
243          try:
244              count = len(records)
245          except TypeError:
246              return None
247  
248          if count == 0:
249              return None
250  
251          input_type = type(records).__name__.lower()
252          input_keys: set[str] | None = None
253  
254          if "dataframe" in input_type:
255              input_type = "pandas"
256              try:
257                  if "inputs" in records.columns:
258                      if first_inputs := records.iloc[0].get("inputs", {}):
259                          input_keys = set(first_inputs.keys())
260              except Exception:
261                  pass
262          elif isinstance(records, list):
263              first_elem = records[0]
264              if hasattr(first_elem, "__class__") and first_elem.__class__.__name__ == "Trace":
265                  input_type = "list[trace]"
266              elif isinstance(first_elem, dict):
267                  input_type = "list[dict]"
268                  if first_inputs := first_elem.get("inputs", {}):
269                      input_keys = set(first_inputs.keys())
270              else:
271                  input_type = "list"
272          else:
273              input_type = "other"
274  
275          if input_type == "list[trace]":
276              dataset_type = DatasetGranularity.TRACE
277          elif input_keys:
278              dataset_type = EvaluationDataset._classify_input_fields(input_keys)
279          else:
280              dataset_type = DatasetGranularity.UNKNOWN
281  
282          return {
283              "record_count": count,
284              "input_type": input_type,
285              "dataset_type": dataset_type.value,
286          }
287  
288  
289  class DatasetToDataFrameEvent(Event):
290      name: str = "dataset_to_df"
291  
292      @classmethod
293      def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None:
294          from mlflow.entities.evaluation_dataset import EvaluationDataset
295  
296          dataset_instance = arguments.get("self")
297          if not isinstance(dataset_instance, EvaluationDataset):
298              return None
299  
300          callsite = "direct_call"
301          frame = sys._getframe()
302          for _ in range(10):
303              if frame is None:
304                  break
305              frame_filename = frame.f_code.co_filename.replace("\\", "/")
306              if "mlflow/genai/evaluation" in frame_filename:
307                  callsite = "genai_evaluate"
308                  break
309              if "mlflow/genai/simulators" in frame_filename:
310                  callsite = "conversation_simulator"
311                  break
312              frame = frame.f_back
313  
314          granularity = dataset_instance._get_existing_granularity()
315          return {"dataset_type": granularity.value, "callsite": callsite}
316  
317      @classmethod
318      def parse_result(cls, result: Any) -> dict[str, Any] | None:
319          if result is None:
320              return {"record_count": 0}
321  
322          return {"record_count": len(result)}
323  
324  
325  def _is_prompt(tags: dict[str, str]) -> bool:
326      try:
327          from mlflow.prompt.constants import IS_PROMPT_TAG_KEY
328      except ImportError:
329          return False
330      return tags.get(IS_PROMPT_TAG_KEY, "false").lower() == "true"
331  
332  
333  class CreateWebhookEvent(Event):
334      name: str = "create_webhook"
335  
336      @classmethod
337      def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None:
338          events = arguments.get("events") or []
339          return {"events": [str(event) for event in events]}
340  
341  
342  class PromptOptimizationEvent(Event):
343      name: str = "prompt_optimization"
344  
345      @classmethod
346      def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None:
347          result = {}
348  
349          # Track the optimizer type used
350          if optimizer := arguments.get("optimizer"):
351              result["optimizer_type"] = type(optimizer).__name__
352          else:
353              result["optimizer_type"] = None
354  
355          # Track the number of prompts being optimized
356          prompt_uris = arguments.get("prompt_uris") or []
357          try:
358              result["prompt_count"] = len(prompt_uris)
359          except TypeError:
360              result["prompt_count"] = None
361  
362          # Track if custom scorers are provided and how many
363          scorers = arguments.get("scorers")
364          try:
365              result["scorer_count"] = len(scorers)
366          except TypeError:
367              result["scorer_count"] = None
368  
369          # Track if custom aggregation is provided
370          result["custom_aggregation"] = arguments.get("aggregation") is not None
371  
372          return result
373  
374  
375  class LogDatasetEvent(Event):
376      name: str = "log_dataset"
377  
378  
379  class LogMetricEvent(Event):
380      name: str = "log_metric"
381  
382      @classmethod
383      def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None:
384          return {"synchronous": arguments.get("synchronous")}
385  
386  
387  class LogParamEvent(Event):
388      name: str = "log_param"
389  
390      @classmethod
391      def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None:
392          return {"synchronous": arguments.get("synchronous")}
393  
394  
395  class LogBatchEvent(Event):
396      name: str = "log_batch"
397  
398      @classmethod
399      def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None:
400          return {
401              "metrics": bool(arguments.get("metrics")),
402              "params": bool(arguments.get("params")),
403              "tags": bool(arguments.get("tags")),
404              "synchronous": arguments.get("synchronous"),
405          }
406  
407  
408  class McpRunEvent(Event):
409      name: str = "mcp_run"
410  
411  
412  class TrackingServerStartEvent(Event):
413      name: str = "tracking_server_start"
414  
415      @classmethod
416      def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None:
417          backend_store_uri = arguments.get("backend_store_uri") or ""
418          scheme = urlparse(backend_store_uri).scheme
419          # Treat empty schemes (relative paths) and single-letter schemes
420          # (Windows drive letters like C:\) as local file storage.
421          # Strip SQLAlchemy driver suffixes (e.g. mysql+pymysql → mysql).
422          backend_store_type = "file" if not scheme or len(scheme) == 1 else scheme.split("+")[0]
423  
424          app_name = arguments.get("app_name")
425          return {
426              "auth_enabled": app_name == "basic-auth",
427              "app_name": app_name,
428              "backend_store_type": backend_store_type,
429              "serve_artifacts": bool(arguments.get("serve_artifacts")),
430              "artifacts_only": bool(arguments.get("artifacts_only")),
431              "expose_prometheus": arguments.get("expose_prometheus") is not None,
432              "enable_workspaces": bool(arguments.get("enable_workspaces")),
433              "workers": arguments.get("workers"),
434              "dev": bool(arguments.get("dev")),
435          }
436  
437  
438  class GatewayStartEvent(Event):
439      name: str = "gateway_start"
440  
441  
442  # Gateway Resource CRUD Events
443  class GatewayCreateEndpointEvent(Event):
444      name: str = "gateway_create_endpoint"
445  
446      @classmethod
447      def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None:
448          return {
449              "has_fallback_config": arguments.get("fallback_config") is not None,
450              "routing_strategy": str(arguments.get("routing_strategy"))
451              if arguments.get("routing_strategy")
452              else None,
453              "num_model_configs": len(arguments.get("model_configs") or []),
454              "usage_tracking": arguments.get("usage_tracking"),
455          }
456  
457  
458  class GatewayUpdateEndpointEvent(Event):
459      name: str = "gateway_update_endpoint"
460  
461      @classmethod
462      def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None:
463          return {
464              "has_fallback_config": arguments.get("fallback_config") is not None,
465              "routing_strategy": str(arguments.get("routing_strategy"))
466              if arguments.get("routing_strategy")
467              else None,
468              "num_model_configs": len(arguments.get("model_configs"))
469              if arguments.get("model_configs") is not None
470              else None,
471              "usage_tracking": arguments.get("usage_tracking"),
472          }
473  
474  
475  class GatewayDeleteEndpointEvent(Event):
476      name: str = "gateway_delete_endpoint"
477  
478  
479  class GatewayGetEndpointEvent(Event):
480      name: str = "gateway_get_endpoint"
481  
482  
483  class GatewayListEndpointsEvent(Event):
484      name: str = "gateway_list_endpoints"
485  
486      @classmethod
487      def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None:
488          return {
489              "filter_by_provider": arguments.get("provider") is not None,
490          }
491  
492  
493  class GatewayCreateModelDefinitionEvent(Event):
494      name: str = "gateway_create_model_definition"
495  
496      @classmethod
497      def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None:
498          return {
499              "model_name": arguments.get("model_name"),
500              "provider": arguments.get("provider"),
501          }
502  
503  
504  # Gateway Budget Policy CRUD Events
505  class GatewayCreateBudgetPolicyEvent(Event):
506      name: str = "gateway_create_budget_policy"
507  
508      @classmethod
509      def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None:
510          def _enum_str(val: Any) -> str | None:
511              if val is None:
512                  return None
513              return val.value if hasattr(val, "value") else str(val)
514  
515          duration = arguments.get("duration")
516          return {
517              "budget_unit": _enum_str(arguments.get("budget_unit")),
518              "duration_unit": _enum_str(duration.unit if duration is not None else None),
519              "target_scope": _enum_str(arguments.get("target_scope")),
520              "budget_action": _enum_str(arguments.get("budget_action")),
521          }
522  
523  
524  class GatewayUpdateBudgetPolicyEvent(Event):
525      name: str = "gateway_update_budget_policy"
526  
527  
528  class GatewayDeleteBudgetPolicyEvent(Event):
529      name: str = "gateway_delete_budget_policy"
530  
531  
532  class GatewayListBudgetPoliciesEvent(Event):
533      name: str = "gateway_list_budget_policies"
534  
535  
536  # Gateway Guardrail CRUD Events
537  class GatewayCreateGuardrailEvent(Event):
538      name: str = "gateway_create_guardrail"
539  
540      @classmethod
541      def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None:
542          return {
543              "stage": str(arguments.get("stage")) if arguments.get("stage") else None,
544              "action": str(arguments.get("action")) if arguments.get("action") else None,
545          }
546  
547  
548  class GatewayUpdateGuardrailEvent(Event):
549      name: str = "gateway_update_guardrail"
550  
551      @classmethod
552      def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None:
553          return {
554              "stage": str(arguments.get("stage")) if arguments.get("stage") else None,
555              "action": str(arguments.get("action")) if arguments.get("action") else None,
556          }
557  
558  
559  class GatewayDeleteGuardrailEvent(Event):
560      name: str = "gateway_delete_guardrail"
561  
562  
563  # Gateway Secret CRUD Events
564  class GatewayCreateSecretEvent(Event):
565      name: str = "gateway_create_secret"
566  
567      @classmethod
568      def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None:
569          return {
570              "provider": arguments.get("provider"),
571          }
572  
573  
574  class GatewayUpdateSecretEvent(Event):
575      name: str = "gateway_update_secret"
576  
577  
578  class GatewayDeleteSecretEvent(Event):
579      name: str = "gateway_delete_secret"
580  
581  
582  class GatewayListSecretsEvent(Event):
583      name: str = "gateway_list_secrets"
584  
585      @classmethod
586      def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None:
587          return {
588              "filter_by_provider": arguments.get("provider") is not None,
589          }
590  
591  
592  # Gateway Invocation Events
593  class GatewayInvocationType(str, Enum):
594      """Type of gateway invocation endpoint."""
595  
596      MLFLOW_INVOCATIONS = "mlflow_invocations"
597      MLFLOW_CHAT_COMPLETIONS = "mlflow_chat_completions"
598      OPENAI_PASSTHROUGH_CHAT = "openai_passthrough_chat"
599      OPENAI_PASSTHROUGH_EMBEDDINGS = "openai_passthrough_embeddings"
600      OPENAI_PASSTHROUGH_RESPONSES = "openai_passthrough_responses"
601      ANTHROPIC_PASSTHROUGH_MESSAGES = "anthropic_passthrough_messages"
602      GEMINI_PASSTHROUGH_GENERATE_CONTENT = "gemini_passthrough_generate_content"
603      GEMINI_PASSTHROUGH_STREAM_GENERATE_CONTENT = "gemini_passthrough_stream_generate_content"
604  
605  
606  class GatewayInvocationEvent(Event):
607      name: str = "gateway_invocation"
608  
609  
610  class AiCommandRunEvent(Event):
611      name: str = "ai_command_run"
612  
613  
614  class TracingContextPropagation(Event):
615      name: str = "tracing_context_propagation"
616  
617  
618  class GitModelVersioningEvent(Event):
619      name: str = "git_model_versioning"
620  
621  
622  class InvokeCustomJudgeModelEvent(Event):
623      name: str = "invoke_custom_judge_model"
624  
625      @classmethod
626      def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None:
627          from mlflow.metrics.genai.model_utils import _parse_model_uri
628  
629          model_uri = arguments.get("model_uri")
630          if not model_uri:
631              return {"model_provider": None}
632  
633          model_provider, _ = _parse_model_uri(model_uri)
634          return {"model_provider": model_provider}
635  
636  
637  class MakeJudgeEvent(Event):
638      name: str = "make_judge"
639  
640      @classmethod
641      def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None:
642          model = arguments.get("model")
643          if model and isinstance(model, str):
644              model_provider = model.split(":")[0] if ":" in model else None
645              return {"model_provider": model_provider}
646          return {"model_provider": None}
647  
648  
649  class AlignJudgeEvent(Event):
650      name: str = "align_judge"
651  
652      @classmethod
653      def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None:
654          result = {}
655  
656          if (traces := arguments.get("traces")) is not None:
657              try:
658                  result["trace_count"] = len(traces)
659              except TypeError:
660                  result["trace_count"] = None
661  
662          if optimizer := arguments.get("optimizer"):
663              result["optimizer_type"] = type(optimizer).__name__
664          else:
665              result["optimizer_type"] = "default"
666  
667          return result
668  
669  
670  class AutologgingEvent(Event):
671      name: str = "autologging"
672  
673  
674  class TraceAttachmentsEvent(Event):
675      name: str = "trace_attachments"
676  
677      @classmethod
678      def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None:
679          if attachments := arguments.get("attachments"):
680              content_types = Counter(att.content_type for att in attachments.values())
681              return {"content_types": dict(content_types)}
682          return None
683  
684  
685  class TraceSource(str, Enum):
686      """Source of a trace received by the MLflow server."""
687  
688      MLFLOW_PYTHON_CLIENT = "MLFLOW_PYTHON_CLIENT"
689      EXTERNAL_OTEL_CLIENT = "EXTERNAL_OTEL_CLIENT"
690      UNKNOWN = "UNKNOWN"
691  
692  
693  class TracesReceivedByServerEvent(Event):
694      name: str = "traces_received_by_server"
695  
696  
697  class SimulateConversationEvent(Event):
698      name: str = "simulate_conversation"
699  
700      @classmethod
701      def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None:
702          callsite = "conversation_simulator"
703          for frame_info in inspect.stack()[:10]:
704              frame_filename = frame_info.filename
705              frame_function = frame_info.function
706  
707              if (
708                  GENAI_EVALUATION_PATH in frame_filename.replace("\\", "/")
709                  and frame_function == GENAI_EVALUATE_FUNCTION
710              ):
711                  callsite = "genai_evaluate"
712                  break
713  
714          return {"callsite": callsite}
715  
716      @classmethod
717      def parse_result(cls, result: Any) -> dict[str, Any] | None:
718          return {
719              "simulated_conversation_info": [
720                  {"turn_count": len(conversation)} for conversation in result
721              ]
722          }
723  
724  
725  class OptimizePromptsJobEvent(Event):
726      name: str = "optimize_prompts_job"
727  
728      @classmethod
729      def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None:
730          result = {}
731  
732          if optimizer_type := arguments.get("optimizer_type"):
733              result["optimizer_type"] = optimizer_type
734  
735          if "scorer_names" in arguments:
736              scorer_names = arguments["scorer_names"]
737              # `scorer_count` is useful for indicating zero-shot vs few-shot optimization, and to
738              # track the pattern of how users use prompt optimization.
739              result["scorer_count"] = len(scorer_names)
740  
741          return result or None
742  
743  
744  class ScorerCallEvent(Event):
745      name: str = "scorer_call"
746  
747      @classmethod
748      def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None:
749          from mlflow.genai.scorers.base import Scorer
750  
751          scorer_instance = arguments.get("self")
752          if not isinstance(scorer_instance, Scorer):
753              return None
754  
755          # Check if running inside an online scoring job
756          # Import here to avoid circular imports
757          from mlflow.genai.scorers.job import (
758              ONLINE_SESSION_SCORER_JOB_NAME,
759              ONLINE_TRACE_SCORER_JOB_NAME,
760          )
761          from mlflow.server.jobs.utils import MLFLOW_SERVER_JOB_NAME_ENV_VAR
762  
763          job_name = os.environ.get(MLFLOW_SERVER_JOB_NAME_ENV_VAR)
764          if job_name in (ONLINE_TRACE_SCORER_JOB_NAME, ONLINE_SESSION_SCORER_JOB_NAME):
765              callsite = "online_scoring"
766          else:
767              callsite = "direct_scorer_call"
768              for frame_info in inspect.stack()[:10]:
769                  frame_filename = frame_info.filename
770                  frame_function = frame_info.function
771  
772                  if (
773                      GENAI_SCORERS_PATH in frame_filename.replace("\\", "/")
774                      and frame_function == SCORER_RUN_FUNCTION
775                  ):
776                      callsite = "genai_evaluate"
777                      break
778  
779          return {
780              "scorer_class": _get_scorer_class_name_for_tracking(scorer_instance),
781              "scorer_kind": scorer_instance.kind.value,
782              "scope": "session" if scorer_instance.is_session_level_scorer else "trace",
783              "callsite": callsite,
784          }
785  
786      @classmethod
787      def parse_result(cls, result: Any) -> dict[str, Any] | None:
788          if isinstance(result, Feedback):
789              return {"has_feedback_error": result.error is not None}
790  
791          if isinstance(result, list) and result and all(isinstance(f, Feedback) for f in result):
792              return {"has_feedback_error": any(f.error is not None for f in result)}
793  
794          return {"has_feedback_error": False}
795  
796  
797  class DiscoverIssuesEvent(Event):
798      name: str = "discover_issues"
799  
800      @classmethod
801      def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None:
802          return {
803              "model": arguments.get("model"),
804              "trace_count": len(arguments.get("traces") or []),
805              "categories": arguments.get("categories"),
806              "source_run_id": arguments.get("run_id"),
807          }
808  
809      @classmethod
810      def parse_result(cls, result: Any) -> dict[str, Any] | None:
811          return {
812              "issue_count": len(result.issues),
813              "total_traces_analyzed": result.total_traces_analyzed,
814              "total_cost_usd": result.total_cost_usd,
815              "triage_run_id": result.triage_run_id,
816          }
817  
818  
819  class UpdateIssueEvent(Event):
820      name: str = "update_issue"
821  
822      @classmethod
823      def parse(cls, arguments: dict[str, Any]) -> dict[str, Any] | None:
824          status = arguments.get("status")
825          if isinstance(status, IssueStatus):
826              status = status.value
827          severity = arguments.get("severity")
828          if isinstance(severity, IssueSeverity):
829              severity = severity.value
830          return {
831              "status": status,
832              "has_name": arguments.get("name") is not None,
833              "has_description": arguments.get("description") is not None,
834              "severity": severity,
835          }
836  
837      @classmethod
838      def parse_result(cls, result: Any) -> dict[str, Any]:
839          return {"source_run_id": result.source_run_id} if result else {}