/ tests / genai / evaluate / test_evaluation.py
test_evaluation.py
   1  import threading
   2  import uuid
   3  from concurrent.futures import ThreadPoolExecutor
   4  from dataclasses import dataclass
   5  from pathlib import Path
   6  from typing import Any, Literal
   7  from unittest import mock
   8  from unittest.mock import ANY, MagicMock
   9  
  10  import pandas as pd
  11  import pytest
  12  
  13  import mlflow
  14  from mlflow.entities.assessment import Expectation, Feedback
  15  from mlflow.entities.assessment_source import AssessmentSource, AssessmentSourceType
  16  from mlflow.entities.span import SpanType
  17  from mlflow.entities.trace import Trace
  18  from mlflow.entities.trace_data import TraceData
  19  from mlflow.exceptions import MlflowException
  20  from mlflow.genai.datasets import EvaluationDataset, create_dataset
  21  from mlflow.genai.evaluation.entities import EvaluationResult
  22  from mlflow.genai.evaluation.harness import (
  23      AUTO_INITIAL_RPS,
  24      _should_clone_trace,
  25      backpressure_buffer,
  26  )
  27  from mlflow.genai.evaluation.rate_limiter import RPSRateLimiter
  28  from mlflow.genai.scorers.base import scorer
  29  from mlflow.genai.scorers.builtin_scorers import RelevanceToQuery
  30  from mlflow.genai.simulators import ConversationSimulator
  31  from mlflow.server import handlers
  32  from mlflow.server.fastapi_app import app
  33  from mlflow.server.handlers import initialize_backend_stores
  34  from mlflow.tracing.constant import AssessmentMetadataKey, TraceMetadataKey
  35  
  36  from tests.helper_functions import get_safe_port
  37  from tests.tracing.helper import (
  38      create_test_trace_info,
  39      create_test_trace_info_with_uc_table,
  40      get_traces,
  41  )
  42  from tests.tracking.integration_test_utils import ServerThread
  43  
  44  
  45  @pytest.fixture
  46  def mlflow_experiment_trace():
  47      return Trace(
  48          info=create_test_trace_info(trace_id="tr-123", experiment_id="exp-123"),
  49          data=TraceData(spans=[]),
  50      )
  51  
  52  
  53  _DUMMY_CHAT_RESPONSE = {
  54      "id": "1",
  55      "object": "text_completion",
  56      "created": "2021-10-01T00:00:00.000000Z",
  57      "model": "gpt-4o-mini",
  58      "choices": [
  59          {
  60              "index": 0,
  61              "message": {
  62                  "content": "This is a response",
  63                  "role": "assistant",
  64              },
  65              "finish_reason": "length",
  66          }
  67      ],
  68      "usage": {
  69          "prompt_tokens": 1,
  70          "completion_tokens": 1,
  71          "total_tokens": 2,
  72      },
  73  }
  74  
  75  
  76  class TestModel:
  77      def predict(self, question: str) -> str:
  78          return "I don't know"
  79  
  80  
  81  @scorer
  82  def exact_match(outputs, expectations):
  83      return outputs == expectations["expected_response"]
  84  
  85  
  86  @scorer
  87  def is_concise(outputs, expectations):
  88      return len(outputs) <= expectations["max_length"]
  89  
  90  
  91  @scorer
  92  def relevance(inputs, outputs):
  93      return Feedback(
  94          name="relevance",
  95          value="yes",
  96          rationale="The response is relevant to the question",
  97          source=AssessmentSource(source_id="gpt", source_type="LLM_JUDGE"),
  98      )
  99  
 100  
 101  @scorer
 102  @mlflow.trace(span_type=SpanType.EVALUATOR)
 103  def has_trace(trace):
 104      return trace is not None
 105  
 106  
 107  class FailingSessionScorer(mlflow.genai.Scorer):
 108      def __init__(self):
 109          super().__init__(name="failing_session_scorer")
 110  
 111      @property
 112      def is_session_level_scorer(self) -> bool:
 113          return True
 114  
 115      def __call__(self, session=None, **kwargs):
 116          raise ValueError("Session scorer error")
 117  
 118  
 119  class WorkingSessionScorer(mlflow.genai.Scorer):
 120      def __init__(self):
 121          super().__init__(name="working_session_scorer")
 122  
 123      @property
 124      def is_session_level_scorer(self) -> bool:
 125          return True
 126  
 127      def __call__(self, session=None, **kwargs):
 128          return len(session or [])
 129  
 130  
 131  def _validate_assessments(traces):
 132      """Validate assessments are added to the traces"""
 133      for trace in traces:
 134          assert len(trace.info.assessments) == 6, (
 135              f"Expected 6 assessments, got {len(trace.info.assessments)}"
 136              f"Assessments: {[a.name for a in trace.info.assessments]}"
 137          )  # 2 expectations + 4 feedbacks
 138          assessments = {a.name: a for a in trace.info.assessments}
 139          a_exact_match = assessments["exact_match"]
 140          assert isinstance(a_exact_match, Feedback)
 141          assert a_exact_match.trace_id == trace.info.trace_id
 142          assert isinstance(a_exact_match.value, bool)
 143          assert a_exact_match.source.source_type == AssessmentSourceType.CODE
 144          # Scorer name is used as source_id
 145          assert a_exact_match.source.source_id == "exact_match"
 146          assert a_exact_match.metadata[AssessmentMetadataKey.SOURCE_RUN_ID] is not None
 147  
 148          a_is_concise = assessments["is_concise"]
 149          assert isinstance(a_is_concise, Feedback)
 150          assert isinstance(a_is_concise.value, bool)
 151          assert a_is_concise.metadata[AssessmentMetadataKey.SOURCE_RUN_ID] is not None
 152  
 153          a_has_trace = assessments["has_trace"]
 154          assert isinstance(a_has_trace, Feedback)
 155          assert a_has_trace.value is True
 156          assert a_has_trace.metadata[AssessmentMetadataKey.SOURCE_RUN_ID] is not None
 157  
 158          a_relevance = assessments["relevance"]
 159          assert isinstance(a_relevance, Feedback)
 160          assert a_relevance.value == "yes"
 161          assert a_relevance.source.source_id == "gpt"
 162          assert a_relevance.source.source_type == "LLM_JUDGE"
 163          assert a_relevance.rationale == "The response is relevant to the question"
 164          assert a_relevance.metadata[AssessmentMetadataKey.SOURCE_RUN_ID] is not None
 165  
 166          a_expected_response = assessments["expected_response"]
 167          assert isinstance(a_expected_response, Expectation)
 168          assert isinstance(a_expected_response.value, str)
 169          assert a_expected_response.source.source_type == AssessmentSourceType.HUMAN
 170          assert a_expected_response.source.source_id is not None
 171  
 172          a_max_length = assessments["max_length"]
 173          assert isinstance(a_max_length, Expectation)
 174          assert isinstance(a_max_length.value, (int, float))
 175          assert a_max_length.source.source_type == AssessmentSourceType.HUMAN
 176  
 177  
 178  def _validate_eval_result_df(result: EvaluationResult):
 179      search_traces_df = mlflow.search_traces(run_id=result.run_id)
 180      assert result.result_df is not None
 181      assert len(result.result_df) == len(search_traces_df)
 182      assert set(result.result_df.columns) >= set(search_traces_df.columns)
 183  
 184      actual = result.result_df.sort_values(by="trace_id").reset_index(drop=True)
 185      expected = search_traces_df.sort_values(by="trace_id").reset_index(drop=True)
 186      for i in range(len(actual)):
 187          assert actual.iloc[i].trace_id == expected.iloc[i].trace_id
 188          assert actual.iloc[i].spans == expected.iloc[i].spans
 189          assert actual.iloc[i].assessments == expected.iloc[i].assessments
 190          assert actual.iloc[i]["exact_match/value"] is not None
 191          assert actual.iloc[i]["is_concise/value"] is not None
 192          assert actual.iloc[i]["relevance/value"] is not None
 193          assert actual.iloc[i]["has_trace/value"] is not None
 194          assert actual.iloc[i]["expected_response/value"] is not None
 195          assert actual.iloc[i]["max_length/value"] is not None
 196  
 197      # backwards compatibility
 198      assert len(result.tables["eval_results"]) == len(result.result_df)
 199  
 200  
 201  @dataclass
 202  class ServerConfig:
 203      host_type: Literal["local", "remote", "databricks"]
 204      backend_type: Literal["file", "sqlalchemy"] | None = None
 205  
 206  
 207  # Test with different server configurations
 208  # 1. local file backend
 209  # 2. local sqlalchemy backend
 210  # 3. remote server running on file backend
 211  # 4. remote server running on sqlalchemy backend
 212  @pytest.fixture(
 213      params=[
 214          ServerConfig(host_type="local", backend_type="file"),
 215          ServerConfig(host_type="local", backend_type="sqlalchemy"),
 216          ServerConfig(host_type="remote", backend_type="file"),
 217          ServerConfig(host_type="remote", backend_type="sqlalchemy"),
 218      ],
 219      ids=["local_file", "local_sqlalchemy", "remote_file", "remote_sqlalchemy"],
 220  )
 221  def server_config(request, tmp_path: Path, db_uri: str):
 222      """Provides an MLflow Tracking API client pointed at the local tracking server."""
 223      config = request.param
 224      if config.backend_type == "file":
 225          pytest.skip("FileStore is no longer supported.")
 226  
 227      match config.backend_type:
 228          case "file":
 229              backend_uri = tmp_path.joinpath("file").as_uri()
 230          case "sqlalchemy":
 231              backend_uri = db_uri
 232  
 233      match config.host_type:
 234          case "local":
 235              mlflow.set_tracking_uri(backend_uri)
 236              yield config
 237  
 238          case "remote":
 239              # Force-reset backend stores before each test.
 240              handlers._tracking_store = None
 241              handlers._model_registry_store = None
 242              initialize_backend_stores(backend_uri, default_artifact_root=tmp_path.as_uri())
 243  
 244              with ServerThread(app, get_safe_port()) as url:
 245                  mlflow.set_tracking_uri(url)
 246                  yield config
 247  
 248  
 249  def test_evaluate_with_static_dataset(server_config):
 250      data = [
 251          {
 252              "inputs": {"question": "What is MLflow?"},
 253              "outputs": "MLflow is a tool for ML",
 254              "expectations": {
 255                  "expected_response": "MLflow is a tool for ML",
 256                  "max_length": 100,
 257              },
 258          },
 259          {
 260              "inputs": {"question": "What is Spark?"},
 261              "outputs": "Spark is a fast data processing engine",
 262              "expectations": {
 263                  "expected_response": "Spark is a fast data processing engine",
 264                  "max_length": 1,
 265              },
 266          },
 267      ]
 268  
 269      result = mlflow.genai.evaluate(
 270          data=data,
 271          scorers=[exact_match, is_concise, relevance, has_trace],
 272      )
 273  
 274      # OSS evaluator doesn't support metrics aggregation yet.
 275      metrics = result.metrics
 276      assert metrics["exact_match/mean"] == 1.0
 277      assert metrics["is_concise/mean"] == 0.5
 278      assert metrics["relevance/mean"] == 1.0
 279      assert metrics["has_trace/mean"] == 1.0
 280  
 281      # Exact number of traces should be generated
 282      traces = get_traces()
 283      assert len(traces) == len(data)
 284  
 285      # Traces should be associated with the eval run
 286      traces = mlflow.search_traces(run_id=result.run_id, return_type="list")
 287      assert len(traces) == len(data)
 288  
 289      # Re-order traces to match with the order of the input data
 290      traces = sorted(traces, key=lambda t: t.data.spans[0].inputs["question"])
 291  
 292      for i in range(len(traces)):
 293          assert len(traces[i].data.spans) == 1
 294          span = traces[i].data.spans[0]
 295          assert span.name == "root_span"
 296          assert span.inputs == data[i]["inputs"]
 297          assert span.outputs == data[i]["outputs"]
 298  
 299      _validate_assessments(traces)
 300      _validate_eval_result_df(result)
 301  
 302      # Dataset input should be logged to the run
 303      run = mlflow.get_run(result.run_id)
 304      assert len(run.inputs.dataset_inputs) == 1
 305      assert run.inputs.dataset_inputs[0].dataset.name == "dataset"
 306      assert run.inputs.dataset_inputs[0].dataset.source_type == "code"
 307  
 308  
 309  @pytest.mark.parametrize("is_predict_fn_traced", [True, False])
 310  def test_evaluate_with_predict_fn(is_predict_fn_traced, server_config):
 311      model_id = mlflow.set_active_model(name="test-model-id").model_id
 312  
 313      data = [
 314          {
 315              "inputs": {"question": "What is MLflow?"},
 316              "expectations": {
 317                  "expected_response": "MLflow is a tool for ML",
 318                  "max_length": 100,
 319              },
 320          },
 321          {
 322              "inputs": {"question": "What is Spark?"},
 323              "expectations": {
 324                  "expected_response": "Spark is a fast data processing engine",
 325                  "max_length": 1,
 326              },
 327          },
 328      ]
 329      model = TestModel()
 330      predict_fn = mlflow.trace(model.predict) if is_predict_fn_traced else model.predict
 331  
 332      result = mlflow.genai.evaluate(
 333          predict_fn=predict_fn,
 334          data=data,
 335          scorers=[exact_match, is_concise, relevance, has_trace],
 336          model_id=model_id,
 337      )
 338  
 339      metrics = result.metrics
 340      assert metrics["exact_match/mean"] == 0.0
 341      assert metrics["is_concise/mean"] == 0.5
 342      assert metrics["relevance/mean"] == 1.0
 343      assert metrics["has_trace/mean"] == 1.0
 344  
 345      # Metrics should be logged to the model ID as well
 346      model = mlflow.get_logged_model(model_id)
 347      assert metrics == {m.key: m.value for m in model.metrics}
 348  
 349      # Exact number of traces should be generated
 350      traces = get_traces()
 351      assert len(traces) == len(data)
 352  
 353      # Traces should be associated with the eval run
 354      traces = mlflow.search_traces(run_id=result.run_id, return_type="list")
 355      assert len(traces) == len(data)
 356  
 357      # Re-order traces to match with the order of the input data
 358      traces = sorted(traces, key=lambda t: t.data.spans[0].inputs["question"])
 359  
 360      # Check if the model_id is set in the traces
 361      assert traces[0].info.trace_metadata[TraceMetadataKey.MODEL_ID] == model_id
 362      assert traces[1].info.trace_metadata[TraceMetadataKey.MODEL_ID] == model_id
 363  
 364      # Validate assessments are added to the traces
 365      for i in range(len(traces)):
 366          assert len(traces[i].data.spans) == 1
 367          span = traces[i].data.spans[0]
 368          assert span.name == "predict"
 369          assert span.inputs == data[i]["inputs"]
 370          assert span.outputs == "I don't know"
 371  
 372      _validate_assessments(traces)
 373      _validate_eval_result_df(result)
 374  
 375  
 376  @pytest.mark.parametrize("return_type", ["pandas", "list"])
 377  def test_evaluate_with_traces(monkeypatch: pytest.MonkeyPatch, server_config, return_type):
 378      questions = ["What is MLflow?", "What is Spark?"]
 379  
 380      @mlflow.trace(span_type=SpanType.AGENT)
 381      def predict(question: str) -> str:
 382          return TestModel().predict(question)
 383  
 384      predict(questions[0])
 385      trace_id = mlflow.get_last_active_trace_id()
 386      mlflow.log_expectation(
 387          trace_id=trace_id,
 388          name="expected_response",
 389          value="MLflow is a tool for ML",
 390          source=AssessmentSource(source_id="me", source_type="HUMAN"),
 391      )
 392      mlflow.log_expectation(
 393          trace_id=trace_id,
 394          name="max_length",
 395          value=100,
 396          source=AssessmentSource(source_id="me", source_type="HUMAN"),
 397      )
 398      predict(questions[1])
 399      trace_id = mlflow.get_last_active_trace_id()
 400      mlflow.log_expectation(
 401          trace_id=trace_id,
 402          name="expected_response",
 403          value="Spark is a fast data processing engine",
 404          source=AssessmentSource(source_id="me", source_type="HUMAN"),
 405      )
 406      mlflow.log_expectation(
 407          trace_id=trace_id,
 408          name="max_length",
 409          value=1,
 410          source=AssessmentSource(source_id="me", source_type="HUMAN"),
 411      )
 412  
 413      data = mlflow.search_traces(return_type=return_type)
 414      assert len(data) == len(questions)
 415  
 416      result = mlflow.genai.evaluate(
 417          data=data,
 418          scorers=[exact_match, is_concise, relevance, has_trace],
 419      )
 420  
 421      metrics = result.metrics
 422      assert metrics["exact_match/mean"] == 0.0
 423      assert metrics["is_concise/mean"] == 0.5
 424      assert metrics["relevance/mean"] == 1.0
 425      assert metrics["has_trace/mean"] == 1.0
 426  
 427      if server_config.backend_type == "sqlalchemy":
 428          # Assessments should be added to the traces in-place and no new trace should be created
 429          traces = get_traces()
 430          assert len(traces) == len(questions)
 431      else:
 432          # File store doesn't support trace linking, so each trace will be cloned to the eval run
 433          assert len(get_traces()) == len(questions) * 2
 434  
 435      # Traces are associated with the eval run
 436      traces = mlflow.search_traces(run_id=result.run_id, return_type="list")
 437      assert len(traces) == len(questions)
 438  
 439      # Re-order traces to match with the order of the input data
 440      traces = sorted(traces, key=lambda t: t.data.spans[0].inputs["question"])
 441  
 442      # Validate assessments are added to the traces
 443      _validate_assessments(traces)
 444      _validate_eval_result_df(result)
 445  
 446  
 447  def test_evaluate_with_managed_dataset(is_in_databricks):
 448      if is_in_databricks:
 449          # Databricks path: Use managed dataset with mocks
 450          class MockDatasetClient:
 451              def __init__(self):
 452                  # dataset_id -> list of records
 453                  self.records = {}
 454  
 455              def create_dataset(self, uc_table_name: str, experiment_ids: list[str]):
 456                  from databricks.agents.datasets import Dataset
 457  
 458                  dataset = Dataset(
 459                      dataset_id=str(uuid.uuid4()),
 460                      name=uc_table_name,
 461                      digest=None,
 462                      source_type="databricks-uc-table",
 463                  )
 464                  self.records[dataset.dataset_id] = []
 465                  return dataset
 466  
 467              def list_dataset_records(self, dataset_id: str):
 468                  return self.records[dataset_id]
 469  
 470              def batch_create_dataset_records(self, name: str, dataset_id: str, records):
 471                  self.records[dataset_id].extend(records)
 472  
 473              def upsert_dataset_record_expectations(
 474                  self, name: str, dataset_id: str, record_id: str, expectations: list[dict[str, Any]]
 475              ):
 476                  for record in self.records[dataset_id]:
 477                      if record.id == record_id:
 478                          record.expectations.update(expectations)
 479  
 480              def sync_dataset_to_uc(self, dataset_id: str, uc_table_name: str):
 481                  pass
 482  
 483          mock_client = MockDatasetClient()
 484          with (
 485              mock.patch("databricks.rag_eval.datasets.api._get_client", return_value=mock_client),
 486              mock.patch(
 487                  "databricks.rag_eval.datasets.entities._get_client", return_value=mock_client
 488              ),
 489              mock.patch("mlflow.genai.datasets.is_databricks_uri", return_value=True),
 490          ):
 491              dataset = create_dataset(
 492                  uc_table_name="mlflow.managed.dataset", experiment_id="exp-123"
 493              )
 494              dataset.merge_records([
 495                  {
 496                      "inputs": {"question": "What is MLflow?"},
 497                      "expectations": {
 498                          "expected_response": "MLflow is a tool for ML",
 499                          "max_length": 100,
 500                      },
 501                  },
 502                  {
 503                      "inputs": {"question": "What is Spark?"},
 504                      "expectations": {
 505                          "expected_response": "Spark is a fast data processing engine",
 506                          "max_length": 1,
 507                      },
 508                  },
 509              ])
 510  
 511              result = mlflow.genai.evaluate(
 512                  data=dataset,
 513                  predict_fn=TestModel().predict,
 514                  scorers=[exact_match, is_concise, relevance, has_trace],
 515              )
 516      else:
 517          dataset = create_dataset(
 518              name="eval_test_dataset", tags={"source": "test", "version": "1.0"}
 519          )
 520          dataset.merge_records([
 521              {
 522                  "inputs": {"question": "What is MLflow?"},
 523                  "expectations": {
 524                      "expected_response": "MLflow is a tool for ML",
 525                      "max_length": 100,
 526                  },
 527              },
 528              {
 529                  "inputs": {"question": "What is Spark?"},
 530                  "expectations": {
 531                      "expected_response": "Spark is a fast data processing engine",
 532                      "max_length": 1,
 533                  },
 534              },
 535          ])
 536  
 537          result = mlflow.genai.evaluate(
 538              data=dataset,
 539              predict_fn=TestModel().predict,
 540              scorers=[exact_match, is_concise, relevance, has_trace],
 541          )
 542  
 543      metrics = result.metrics
 544      assert metrics["exact_match/mean"] == 0.0
 545      assert metrics["is_concise/mean"] == 0.5
 546      assert metrics["relevance/mean"] == 1.0
 547      assert metrics["has_trace/mean"] == 1.0
 548  
 549      run = mlflow.get_run(result.run_id)
 550      # Dataset metadata should be added to the run
 551      assert len(run.inputs.dataset_inputs) == 1
 552      assert run.inputs.dataset_inputs[0].dataset.name == dataset.name
 553      assert run.inputs.dataset_inputs[0].dataset.digest == dataset.digest
 554      # Check for the correct source_type based on whether we're in Databricks or OSS
 555      expected_source_type = (
 556          "databricks-uc-table" if is_in_databricks else "mlflow_evaluation_dataset"
 557      )
 558      assert run.inputs.dataset_inputs[0].dataset.source_type == expected_source_type
 559  
 560      # Traces are associated with the eval run
 561      traces = mlflow.search_traces(run_id=result.run_id, return_type="list")
 562      assert len(traces) == 2
 563  
 564      _validate_assessments(traces)
 565      _validate_eval_result_df(result)
 566  
 567  
 568  def test_evaluate_with_managed_dataset_from_searched_traces():
 569      for i in range(3):
 570          with mlflow.start_span(name=f"qa_span_{i}") as span:
 571              question = f"What is item {i}?"
 572              span.set_inputs({"question": question})
 573              span.set_outputs({"answer": f"Item {i} is something"})
 574  
 575              mlflow.log_expectation(
 576                  trace_id=span.trace_id,
 577                  name="expected_response",
 578                  value=f"Item {i} is a detailed answer",
 579              )
 580              mlflow.log_expectation(
 581                  trace_id=span.trace_id,
 582                  name="max_length",
 583                  value=50 if i % 2 == 0 else 10,
 584              )
 585  
 586      traces_df = mlflow.search_traces()
 587  
 588      dataset = create_dataset(
 589          name="traces_eval_dataset", tags={"source": "traces", "evaluation": "test"}
 590      )
 591      dataset.merge_records(traces_df)
 592  
 593      result = mlflow.genai.evaluate(
 594          data=dataset,
 595          predict_fn=TestModel().predict,
 596          scorers=[exact_match, is_concise, has_trace],
 597      )
 598  
 599      metrics = result.metrics
 600      assert "exact_match/mean" in metrics
 601      assert "is_concise/mean" in metrics
 602      assert "has_trace/mean" in metrics
 603      assert metrics["has_trace/mean"] == 1.0
 604  
 605  
 606  def test_model_from_deployment_endpoint(is_in_databricks):
 607      with mock.patch("mlflow.deployments.get_deploy_client") as mock_get_deploy_client:
 608          mock_client = mock_get_deploy_client.return_value
 609          mock_client.predict.return_value = _DUMMY_CHAT_RESPONSE
 610  
 611          data = [
 612              {
 613                  "inputs": {
 614                      "messages": [
 615                          {"content": "You are a helpful assistant.", "role": "system"},
 616                          {"content": "What is Spark?", "role": "user"},
 617                      ],
 618                      "max_tokens": 10,
 619                  }
 620              },
 621              {
 622                  "inputs": {
 623                      "messages": [
 624                          {"content": "What is MLflow?", "role": "user"},
 625                      ]
 626                  }
 627              },
 628          ]
 629          predict_fn = mlflow.genai.to_predict_fn("endpoints:/chat")
 630          result = mlflow.genai.evaluate(
 631              data=data,
 632              predict_fn=predict_fn,
 633              scorers=[has_trace],
 634          )
 635  
 636          databricks_options = {"databricks_options": {"return_trace": True}}
 637          mock_client.predict.assert_has_calls(
 638              [
 639                  # Test call to check if the function is traced or not
 640                  mock.call(endpoint="chat", inputs={**data[0]["inputs"], **databricks_options}),
 641                  # First evaluation call
 642                  mock.call(endpoint="chat", inputs={**data[0]["inputs"], **databricks_options}),
 643                  # Second evaluation call
 644                  mock.call(endpoint="chat", inputs={**data[1]["inputs"], **databricks_options}),
 645              ],
 646              any_order=True,
 647          )
 648  
 649          # Validate traces
 650          traces = mlflow.search_traces(run_id=result.run_id, return_type="list")
 651  
 652          assert len(traces) == 2
 653          spans = traces[0].data.spans
 654          assert len(spans) == 1
 655          assert spans[0].name == "predict"
 656          # Eval harness runs prediction in parallel, so the order is not deterministic
 657          assert spans[0].inputs in (data[0]["inputs"], data[1]["inputs"])
 658          assert spans[0].outputs == _DUMMY_CHAT_RESPONSE
 659  
 660  
 661  def test_missing_scorers_argument():
 662      with pytest.raises(TypeError, match=r"evaluate\(\) missing 1 required positional"):
 663          mlflow.genai.evaluate(data=[{"inputs": "Hello", "outputs": "Hi"}])
 664  
 665  
 666  def test_empty_scorers_allowed():
 667      mock_result = EvaluationResult(run_id="test-run", metrics={}, result_df=pd.DataFrame())
 668  
 669      data = [{"inputs": {"question": "What is MLflow?"}, "outputs": "MLflow is an ML platform"}]
 670  
 671      with mock.patch("mlflow.genai.evaluation.base._run_harness") as mock_evaluate_oss:
 672          mock_evaluate_oss.return_value = (mock_result, {})
 673          result = mlflow.genai.evaluate(data=data, scorers=[])
 674  
 675      assert result is mock_result
 676      mock_evaluate_oss.assert_called_once()
 677  
 678  
 679  @pytest.mark.parametrize("pass_full_dataframe", [True, False])
 680  def test_trace_input_can_contain_string_input(pass_full_dataframe, is_in_databricks):
 681      """
 682      The `inputs` column must be a dictionary when a static dataset is provided.
 683      However, when a trace is provided, it doesn't need to be validated and the
 684      harness can handle it nicely.
 685      """
 686      with mlflow.start_span() as span:
 687          span.set_inputs("What is MLflow?")
 688          span.set_outputs("MLflow is a tool for ML")
 689  
 690      traces = mlflow.search_traces()
 691      if not pass_full_dataframe:
 692          traces = traces[["trace"]]
 693  
 694      # Harness should run without an error
 695      mlflow.genai.evaluate(data=traces, scorers=[RelevanceToQuery()])
 696  
 697  
 698  def test_max_workers_env_var(monkeypatch):
 699      # Disable rate limits so auto-derivation doesn't override the default
 700      monkeypatch.setenv("MLFLOW_GENAI_EVAL_PREDICT_RATE_LIMIT", "0")
 701      monkeypatch.setenv("MLFLOW_GENAI_EVAL_SCORER_RATE_LIMIT", "0")
 702  
 703      def _validate_max_workers(expected_max_workers):
 704          with mock.patch(
 705              "mlflow.genai.evaluation.harness.ThreadPoolExecutor", wraps=ThreadPoolExecutor
 706          ) as mock_executor:
 707              mlflow.genai.evaluate(
 708                  data=[
 709                      {
 710                          "inputs": {"question": "What is MLflow?"},
 711                          "outputs": "MLflow is a tool for ML",
 712                      }
 713                  ],
 714                  scorers=[RelevanceToQuery()],
 715              )
 716              # ThreadPoolExecutor is called twice in OSS (harness + scorers)
 717              first_call = mock_executor.call_args_list[0]
 718              assert first_call[1]["max_workers"] == expected_max_workers
 719  
 720      # default workers is 10
 721      _validate_max_workers(10)
 722  
 723      # override workers with env var
 724      monkeypatch.setenv("MLFLOW_GENAI_EVAL_MAX_WORKERS", "20")
 725      _validate_max_workers(20)
 726  
 727      # legacy env var for backward compatibility
 728      monkeypatch.delenv("MLFLOW_GENAI_EVAL_MAX_WORKERS", raising=False)
 729      monkeypatch.setenv("RAG_EVAL_MAX_WORKERS", "30")
 730      _validate_max_workers(30)
 731  
 732  
 733  def test_dataset_name_is_logged_correctly(is_in_databricks):
 734      data = pd.DataFrame({
 735          "inputs": [{"question": "What is MLflow?"}],
 736          "outputs": ["MLflow is a tool for ML"],
 737      })
 738  
 739      with mlflow.start_run() as run:
 740          mlflow.genai.evaluate(
 741              data=data,
 742              scorers=[RelevanceToQuery()],
 743          )
 744  
 745      if not is_in_databricks:
 746          run_data = mlflow.get_run(run.info.run_id)
 747          assert run_data.inputs is not None
 748          assert run_data.inputs.dataset_inputs is not None
 749          assert len(run_data.inputs.dataset_inputs) > 0
 750  
 751          dataset_input = run_data.inputs.dataset_inputs[0]
 752          dataset = dataset_input.dataset
 753          assert dataset.name == "dataset"
 754  
 755  
 756  def test_evaluate_with_dataset_preserves_name(is_in_databricks):
 757      from mlflow.entities import Dataset as DatasetEntity
 758  
 759      data = pd.DataFrame({
 760          "inputs": [{"question": "What is MLflow?"}],
 761          "outputs": ["MLflow is a tool for ML"],
 762      })
 763  
 764      mock_managed_dataset = MagicMock(spec=EvaluationDataset)
 765      type(mock_managed_dataset).name = mock.PropertyMock(return_value="my_managed_dataset")
 766      mock_managed_dataset.to_df.return_value = data
 767      mock_managed_dataset.digest = "test_digest"
 768      mock_managed_dataset.source = MagicMock()
 769      mock_managed_dataset.source.to_json.return_value = "{}"
 770      mock_managed_dataset.source._get_source_type.return_value = "test"
 771      mock_managed_dataset._to_mlflow_entity.return_value = DatasetEntity(
 772          name="my_managed_dataset",
 773          digest="test_digest",
 774          source_type="test",
 775          source="{}",
 776          schema=None,
 777          profile=None,
 778      )
 779  
 780      if not is_in_databricks:
 781          with mlflow.start_run() as run:
 782              mlflow.genai.evaluate(
 783                  data=data,
 784                  scorers=[RelevanceToQuery()],
 785              )
 786  
 787          run_data = mlflow.get_run(run.info.run_id)
 788          dataset_input = run_data.inputs.dataset_inputs[0]
 789          assert dataset_input.dataset.name == "dataset"
 790  
 791          with mlflow.start_run() as run:
 792              mlflow.genai.evaluate(
 793                  data=mock_managed_dataset,
 794                  scorers=[RelevanceToQuery()],
 795              )
 796  
 797          run_data = mlflow.get_run(run.info.run_id)
 798          dataset_input = run_data.inputs.dataset_inputs[0]
 799          assert dataset_input.dataset.name == "my_managed_dataset"
 800  
 801  
 802  def test_evaluate_with_managed_dataset_preserves_name():
 803      mock_managed_dataset = MagicMock()
 804      mock_managed_dataset.dataset_id = "d-1234567890abcdef1234567890abcdef"
 805      mock_managed_dataset.name = "test.evaluation.sample_dataset"
 806      mock_managed_dataset.digest = "abc123"
 807      mock_managed_dataset.schema = None
 808      mock_managed_dataset.profile = None
 809      mock_managed_dataset.source_type = "databricks-uc-table"
 810      mock_managed_dataset.create_time = None
 811      mock_managed_dataset.created_by = None
 812      mock_managed_dataset.last_update_time = None
 813      mock_managed_dataset.last_updated_by = None
 814      mock_managed_dataset.to_df.return_value = pd.DataFrame({
 815          "inputs": [{"question": "What is MLflow?"}],
 816          "outputs": ["MLflow is a tool for ML"],
 817      })
 818  
 819      dataset = EvaluationDataset(mock_managed_dataset)
 820  
 821      with mlflow.start_run() as run:
 822          mlflow.genai.evaluate(
 823              data=dataset,
 824              scorers=[RelevanceToQuery()],
 825          )
 826  
 827          run_data = mlflow.get_run(run.info.run_id)
 828  
 829          assert run_data.inputs is not None
 830          assert run_data.inputs.dataset_inputs is not None
 831          assert len(run_data.inputs.dataset_inputs) > 0
 832  
 833          dataset_input = run_data.inputs.dataset_inputs[0]
 834          logged_dataset = dataset_input.dataset
 835          assert logged_dataset.name == "test.evaluation.sample_dataset"
 836  
 837  
 838  @pytest.mark.parametrize(
 839      ("tags_data", "expected_calls"),
 840      [
 841          # Regular tags
 842          (
 843              [
 844                  {"environment": "test", "model_version": "v1.0"},
 845                  {"environment": "production", "team": "data-science"},
 846              ],
 847              [
 848                  ("environment", "test"),
 849                  ("model_version", "v1.0"),
 850                  ("environment", "production"),
 851                  ("team", "data-science"),
 852              ],
 853          ),
 854          # Empty tags dict
 855          (
 856              [{}, {}],
 857              [],
 858          ),
 859          # None tags (no tags field)
 860          (
 861              [None, None],
 862              [],
 863          ),
 864          # Mix of tags and empty/None
 865          (
 866              [{"env": "test"}, {}, None],
 867              [("env", "test")],
 868          ),
 869      ],
 870  )
 871  def test_evaluate_with_tags(tags_data, expected_calls):
 872      data = [
 873          {
 874              "inputs": {"question": f"What is question {i}?"},
 875              "outputs": f"Answer {i}",
 876              "expectations": {"expected_response": f"Answer {i}"},
 877              "tags": tags,
 878          }
 879          for i, tags in enumerate(tags_data)
 880      ]
 881  
 882      with mock.patch("mlflow.set_trace_tag") as mock_set_trace_tag:
 883          mlflow.genai.evaluate(
 884              data=data,
 885              scorers=[exact_match],
 886          )
 887  
 888          # Check that all expected calls were made (order may vary due to parallel execution)
 889          actual_calls = mock_set_trace_tag.call_args_list
 890          expected_mock_calls = [
 891              mock.call(trace_id=ANY, key=key, value=value) for key, value in expected_calls
 892          ]
 893          assert len(actual_calls) == len(expected_mock_calls)
 894          for expected_call in expected_mock_calls:
 895              assert expected_call in actual_calls
 896  
 897  
 898  def test_evaluate_with_traces_tags_no_warnings():
 899      with mlflow.start_span() as span:
 900          span.set_inputs({"question": "Hello?"})
 901  
 902      traces = mlflow.search_traces()
 903      with mock.patch("mlflow.tracing.client._logger.warning") as mock_warning:
 904          mlflow.genai.evaluate(
 905              data=traces,
 906              scorers=[has_trace],
 907          )
 908          assert not any(
 909              "immutable and cannot be set on a trace" in call.args[0]
 910              for call in mock_warning.call_args_list
 911          )
 912  
 913  
 914  def test_evaluate_with_tags_error_handling(is_in_databricks):
 915      data = [
 916          {
 917              "inputs": {"question": "What is MLflow?"},
 918              "outputs": "MLflow is a tool for ML",
 919              "expectations": {"expected_response": "MLflow is a tool for ML"},
 920              "tags": {"invalid_tag": "value"},
 921          }
 922      ]
 923  
 924      # Mock set_trace_tag to raise an exception
 925      with mock.patch("mlflow.set_trace_tag", side_effect=Exception("Tag logging failed")):
 926          # This should not raise an exception
 927          result = mlflow.genai.evaluate(
 928              data=data,
 929              scorers=[exact_match],
 930          )
 931  
 932          # Evaluation should still succeed
 933          assert "exact_match/mean" in result.metrics
 934  
 935  
 936  def test_evaluate_with_invalid_tags_type():
 937      data = [
 938          {
 939              "inputs": {"question": "What is MLflow?"},
 940              "outputs": "MLflow is a tool for ML",
 941              "expectations": {"expected_response": "MLflow is a tool for ML"},
 942              "tags": "invalid_tags_string",  # Should be dict
 943          }
 944      ]
 945  
 946      with pytest.raises(MlflowException, match="Tags must be a dictionary"):
 947          mlflow.genai.evaluate(
 948              data=data,
 949              scorers=[exact_match],
 950          )
 951  
 952  
 953  def test_evaluate_without_inputs_in_eval_dataset():
 954      answers = [
 955          "MLflow is an open-source platform for managing ML lifecycle",
 956          "Apache Spark is a fast data processing engine",
 957          "I don't know",
 958      ]
 959      for answer in answers:
 960          with mlflow.start_span() as span:
 961              span.set_outputs(answer)
 962  
 963      trace_df = mlflow.search_traces()
 964      trace_df["inputs"] = None
 965      trace_df["expectations"] = pd.Series([
 966          {"expected_response": answer, "max_length": 100} for answer in answers
 967      ])
 968  
 969      result = mlflow.genai.evaluate(
 970          data=trace_df,
 971          scorers=[is_concise, exact_match, has_trace],
 972      )
 973  
 974      assert "is_concise/mean" in result.metrics
 975      assert "exact_match/mean" in result.metrics
 976      assert "has_trace/mean" in result.metrics
 977  
 978      @scorer
 979      def input_exist(inputs):
 980          if inputs is None:
 981              return False
 982          return True
 983  
 984      trace_df["outputs"] = None
 985      result = mlflow.genai.evaluate(
 986          data=trace_df,
 987          scorers=[input_exist],
 988      )
 989      assert result.metrics["input_exist/mean"] == 0.0
 990  
 991  
 992  def test_evaluate_with_only_trace_in_eval_dataset():
 993      for _ in range(3):
 994          with mlflow.start_span():
 995              pass
 996  
 997      trace_df = mlflow.search_traces()
 998      trace_df = trace_df[["trace"]]
 999  
1000      result = mlflow.genai.evaluate(
1001          data=trace_df,
1002          scorers=[has_trace],
1003      )
1004  
1005      assert result.metrics["has_trace/mean"] == 1.0
1006  
1007  
1008  @pytest.mark.parametrize("is_enabled", [True, False])
1009  def test_evaluate_with_scorer_tracing(server_config, monkeypatch, is_enabled):
1010      monkeypatch.setenv("MLFLOW_GENAI_EVAL_ENABLE_SCORER_TRACING", str(is_enabled).lower())
1011  
1012      data = [
1013          {
1014              "inputs": {"question": "What is MLflow?"},
1015              "expectations": {
1016                  "expected_response": "MLflow is a tool for ML",
1017                  "max_length": 100,
1018              },
1019          },
1020          {
1021              "inputs": {"question": "What is Spark?"},
1022              "expectations": {
1023                  "expected_response": "Spark is a fast data processing engine",
1024                  "max_length": 1,
1025              },
1026          },
1027      ]
1028  
1029      result = mlflow.genai.evaluate(
1030          predict_fn=TestModel().predict,
1031          data=data,
1032          scorers=[exact_match, is_concise, relevance, has_trace],
1033      )
1034  
1035      metrics = result.metrics
1036      assert metrics["exact_match/mean"] == 0.0
1037      assert metrics["is_concise/mean"] == 0.5
1038      assert metrics["relevance/mean"] == 1.0
1039      assert metrics["has_trace/mean"] == 1.0
1040  
1041      traces = get_traces()
1042      if is_enabled:
1043          assert len(traces) == len(data) * 5  # 1 trace for prediction + 4 scorer traces
1044      else:
1045          assert len(traces) == len(data)
1046  
1047      # Traces should be associated with the eval run
1048      traces = mlflow.search_traces(
1049          filter_string="tags.`mlflow.eval.requestId` != 'None'",
1050          run_id=result.run_id,
1051          return_type="list",
1052      )
1053      assert len(traces) == len(data)
1054  
1055      # Each assessment should have a source trace ID
1056      for trace in traces:
1057          for a in trace.info.assessments:
1058              if isinstance(a, Feedback) and is_enabled:
1059                  assert a.metadata[AssessmentMetadataKey.SCORER_TRACE_ID] is not None
1060                  assert a.metadata[AssessmentMetadataKey.SCORER_TRACE_ID] != trace.info.trace_id
1061              else:
1062                  assert AssessmentMetadataKey.SCORER_TRACE_ID not in a.metadata
1063  
1064  
1065  @pytest.mark.parametrize("diff_experiment_id", [True, False])
1066  def test_eval_with_traces_log_spans_correctly(diff_experiment_id):
1067      exp_id = mlflow.set_experiment("traces exp").experiment_id
1068      with mlflow.start_span() as span:
1069          span.set_inputs({"question": "What is MLflow?"})
1070          span.set_outputs({"answer": "MLflow is a tool for ML"})
1071          span.set_attributes({"key": "value"})
1072          with mlflow.start_span() as child_span:
1073              child_span.set_inputs("test")
1074  
1075      # set to a different experiment
1076      if diff_experiment_id:
1077          mlflow.set_experiment("diff exp")
1078  
1079      # search traces from the original experiment
1080      trace_df = mlflow.search_traces(locations=[exp_id])
1081  
1082      result = mlflow.genai.evaluate(
1083          data=trace_df,
1084          scorers=[has_trace],
1085      )
1086  
1087      assert result.metrics["has_trace/mean"] == 1.0
1088  
1089      traces = get_traces()
1090      assert len(traces) == 1
1091      # copied trace should contain all spans
1092      assert len(traces[0].data.spans) == 2
1093      span = traces[0].data.spans[0]
1094      assert span.get_attribute("key") == "value"
1095      assert span.inputs == {"question": "What is MLflow?"}
1096      assert span.outputs == {"answer": "MLflow is a tool for ML"}
1097      child_span = traces[0].data.spans[1]
1098      assert child_span.inputs == "test"
1099  
1100  
1101  def test_evaluate_with_mixed_single_turn_and_multi_turn_scorers(server_config):
1102      """Test evaluation with a combination of single-turn and multi-turn scorers.
1103  
1104      Validates that:
1105      - Single-turn scorers are applied to all traces
1106      - Multi-turn scorers are only applied to the first trace of each session
1107      """
1108  
1109      # Define a multi-turn scorer that counts conversation turns
1110      class ConversationLengthScorer(mlflow.genai.Scorer):
1111          def __init__(self):
1112              super().__init__(name="conversation_length")
1113  
1114          @property
1115          def is_session_level_scorer(self) -> bool:
1116              return True
1117  
1118          def __call__(self, session=None, **kwargs):
1119              """Return the number of turns in the conversation."""
1120              return len(session or [])
1121  
1122      # Define a single-turn scorer
1123      @scorer
1124      def response_length(outputs) -> int:
1125          """Return the length of the response."""
1126          return len(outputs) if isinstance(outputs, str) else 0
1127  
1128      # Create a traced model function
1129      @mlflow.trace(span_type=SpanType.CHAT_MODEL)
1130      def model(question, session_id):
1131          mlflow.update_current_trace(metadata={"mlflow.trace.session": session_id})
1132          return f"Answer to: {question}"
1133  
1134      # Generate traces for 2 sessions (3 turns + 2 turns = 5 total traces)
1135      mlflow.set_experiment("multi_turn_test")
1136      with mlflow.start_run() as run:
1137          # Session 1: 3 turns
1138          for q in ["Q1", "Q2", "Q3"]:
1139              model(q, session_id="session_1")
1140  
1141          # Session 2: 2 turns
1142          for q in ["Q4", "Q5"]:
1143              model(q, session_id="session_2")
1144  
1145          # Get traces for evaluation
1146          traces = mlflow.search_traces(
1147              locations=[run.info.experiment_id], filter_string=f'run_id = "{run.info.run_id}"'
1148          )
1149  
1150          # Evaluate with both single-turn and multi-turn scorers
1151          result = mlflow.genai.evaluate(
1152              data=traces, scorers=[response_length, ConversationLengthScorer()]
1153          )
1154  
1155      # Validate results
1156      result_df = result.result_df
1157  
1158      # Should have one row per trace
1159      assert len(result_df) == 5, f"Expected 5 traces, got {len(result_df)}"
1160  
1161      # Single-turn scorer should be applied to all traces
1162      single_turn_scores = result_df["response_length/value"].notna().sum()
1163      assert single_turn_scores == 5, (
1164          f"Expected single-turn scores for all 5 traces, got {single_turn_scores}"
1165      )
1166  
1167      # Multi-turn scorer should only be applied to first trace of each session (2 total)
1168      multi_turn_scores = result_df["conversation_length/value"].notna().sum()
1169      assert multi_turn_scores == 2, (
1170          f"Expected multi-turn scores for 2 sessions (first trace only), got {multi_turn_scores}"
1171      )
1172  
1173      # Validate the conversation length values
1174      # Session 1 should have 3 turns, Session 2 should have 2 turns
1175      conv_lengths = result_df["conversation_length/value"].dropna().sort_values().tolist()
1176      assert conv_lengths == [2.0, 3.0], (
1177          f"Expected conversation lengths [2.0, 3.0], got {conv_lengths}"
1178      )
1179  
1180      # Validate that all single-turn scores are the same (based on our dummy response)
1181      response_lengths = result_df["response_length/value"].dropna()
1182      # All responses should be "Answer to: Qx" format, so lengths should be consistent
1183      assert all(length > 0 for length in response_lengths)
1184  
1185      # Verify multi-turn assessments were persisted to the backend (not just in-memory).
1186      # Fetch each trace from the backend and check the first trace of each session has
1187      # the conversation_length assessment logged.
1188      all_traces = mlflow.search_traces(
1189          locations=[run.info.experiment_id],
1190          run_id=result.run_id,
1191          return_type="list",
1192      )
1193      for trace in all_traces:
1194          assessment_names = {a.name for a in trace.info.assessments}
1195          has_session = "mlflow.trace.session" in trace.info.trace_metadata
1196          is_first_in_session = "conversation_length" in assessment_names
1197          if has_session and is_first_in_session:
1198              a = next(a for a in trace.info.assessments if a.name == "conversation_length")
1199              assert isinstance(a, Feedback)
1200              assert a.value in (2.0, 3.0)
1201  
1202  
1203  def test_evaluate_with_evaluation_dataset_and_session_level_scorers():
1204      # Define a session-level scorer
1205      class ConversationLengthScorer(mlflow.genai.Scorer):
1206          def __init__(self):
1207              super().__init__(name="conversation_length")
1208  
1209          @property
1210          def is_session_level_scorer(self) -> bool:
1211              return True
1212  
1213          def __call__(self, session=None, **kwargs):
1214              return len(session or [])
1215  
1216      # Create traces with session metadata (2 traces in session_1, 1 in session_2)
1217      @mlflow.trace(span_type=mlflow.entities.SpanType.CHAT_MODEL)
1218      def model(question, session_id):
1219          mlflow.update_current_trace(metadata={"mlflow.trace.session": session_id})
1220          return f"Answer to {question}"
1221  
1222      model("Q1", session_id="session_1")
1223      trace_1 = mlflow.get_trace(mlflow.get_last_active_trace_id())
1224  
1225      model("Q2", session_id="session_1")
1226      trace_2 = mlflow.get_trace(mlflow.get_last_active_trace_id())
1227  
1228      model("Q3", session_id="session_2")
1229      trace_3 = mlflow.get_trace(mlflow.get_last_active_trace_id())
1230  
1231      # Create dataset from traces
1232      dataset = create_dataset(name="multi_turn_dataset")
1233      dataset.merge_records([trace_1, trace_2, trace_3])
1234  
1235      # Evaluate with session-level scorer
1236      result = mlflow.genai.evaluate(data=dataset, scorers=[ConversationLengthScorer()])
1237      result_df = result.result_df
1238  
1239      # Session-level scorer should produce 2 scores (one per session)
1240      assert "conversation_length/value" in result_df.columns
1241      assert result_df["conversation_length/value"].notna().sum() == 2
1242  
1243      # Verify conversation lengths: session_1 has 2 traces, session_2 has 1 trace
1244      conv_lengths = result_df["conversation_length/value"].dropna().sort_values().tolist()
1245      assert conv_lengths == [1.0, 2.0]
1246  
1247  
1248  def test_evaluate_dataset_mixed_traces_with_and_without_sessions():
1249      class SessionScorer(mlflow.genai.Scorer):
1250          def __init__(self):
1251              super().__init__(name="session_length")
1252  
1253          @property
1254          def is_session_level_scorer(self):
1255              return True
1256  
1257          def __call__(self, session=None, **kwargs):
1258              return len(session or [])
1259  
1260      # Create mixed traces
1261      @mlflow.trace(span_type=mlflow.entities.SpanType.CHAT_MODEL)
1262      def model_with_session(question, session_id):
1263          mlflow.update_current_trace(metadata={"mlflow.trace.session": session_id})
1264          return "answer"
1265  
1266      @mlflow.trace(span_type=mlflow.entities.SpanType.CHAT_MODEL)
1267      def model_without_session(question):
1268          return "answer"
1269  
1270      model_with_session("Q1", "session_1")
1271      trace_1 = mlflow.get_trace(mlflow.get_last_active_trace_id())
1272  
1273      model_without_session("Q2")
1274      trace_2 = mlflow.get_trace(mlflow.get_last_active_trace_id())
1275  
1276      model_with_session("Q3", "session_1")
1277      trace_3 = mlflow.get_trace(mlflow.get_last_active_trace_id())
1278  
1279      # Create dataset and evaluate
1280      dataset = create_dataset(name="mixed_dataset")
1281      dataset.merge_records([trace_1, trace_2, trace_3])
1282  
1283      result = mlflow.genai.evaluate(data=dataset, scorers=[SessionScorer()])
1284      result_df = result.result_df
1285  
1286      # Should have 1 session-level score (for session_1 with 2 traces)
1287      # The trace without session should not be scored by session-level scorer
1288      assert result_df["session_length/value"].notna().sum() == 1
1289      assert result_df["session_length/value"].dropna().iloc[0] == 2.0
1290  
1291  
1292  def test_max_scorer_workers_env_var(monkeypatch):
1293      @scorer
1294      def dummy_scorer_1(outputs):
1295          return True
1296  
1297      @scorer
1298      def dummy_scorer_2(outputs):
1299          return True
1300  
1301      @scorer
1302      def dummy_scorer_3(outputs):
1303          return True
1304  
1305      def _validate_scorer_max_workers(expected_max_workers, num_scorers):
1306          scorers_list = [dummy_scorer_1, dummy_scorer_2, dummy_scorer_3][:num_scorers]
1307          with mock.patch(
1308              "mlflow.genai.evaluation.harness.ThreadPoolExecutor", wraps=ThreadPoolExecutor
1309          ) as mock_executor:
1310              mlflow.genai.evaluate(
1311                  data=[
1312                      {
1313                          "inputs": {"question": "What is MLflow?"},
1314                          "outputs": "MLflow is a tool for ML",
1315                      }
1316                  ],
1317                  scorers=scorers_list,
1318              )
1319              # Find the scorer pool call by its thread_name_prefix
1320              scorer_call = next(
1321                  call
1322                  for call in mock_executor.call_args_list
1323                  if call[1].get("thread_name_prefix") == "MlflowGenAIEvalScorer"
1324              )
1325              assert scorer_call[1]["max_workers"] == expected_max_workers
1326  
1327      # default scorer workers is 10, but limited by number of scorers (3)
1328      _validate_scorer_max_workers(expected_max_workers=3, num_scorers=3)
1329  
1330      # override scorer workers with env var (limit to 2)
1331      monkeypatch.setenv("MLFLOW_GENAI_EVAL_MAX_SCORER_WORKERS", "2")
1332      _validate_scorer_max_workers(expected_max_workers=2, num_scorers=3)
1333  
1334      # when num_scorers < max_scorer_workers, use num_scorers
1335      monkeypatch.setenv("MLFLOW_GENAI_EVAL_MAX_SCORER_WORKERS", "10")
1336      _validate_scorer_max_workers(expected_max_workers=2, num_scorers=2)
1337  
1338      # set to 1 for sequential execution
1339      monkeypatch.setenv("MLFLOW_GENAI_EVAL_MAX_SCORER_WORKERS", "1")
1340      _validate_scorer_max_workers(expected_max_workers=1, num_scorers=3)
1341  
1342  
1343  # ===================== ConversationSimulator Integration Tests =====================
1344  
1345  
1346  def test_evaluate_with_conversation_simulator_requires_predict_fn():
1347      simulator = ConversationSimulator(
1348          test_cases=[{"goal": "Learn about MLflow"}],
1349          max_turns=2,
1350      )
1351  
1352      with pytest.raises(MlflowException, match="predict_fn is required"):
1353          mlflow.genai.evaluate(
1354              data=simulator,
1355              scorers=[has_trace],
1356          )
1357  
1358  
1359  def test_evaluate_with_conversation_simulator_empty_simulation_error():
1360      def failing_predict_fn(input: list[dict[str, Any]], **kwargs):
1361          raise Exception("Simulated failure")
1362  
1363      simulator = ConversationSimulator(
1364          test_cases=[{"goal": "Learn about MLflow"}],
1365          max_turns=2,
1366      )
1367  
1368      with mock.patch(
1369          "mlflow.genai.simulators.simulator.invoke_model_without_tracing"
1370      ) as mock_invoke:
1371          # Simulate a failure that produces no traces
1372          mock_invoke.side_effect = Exception("LLM call failed")
1373  
1374          with pytest.raises(MlflowException, match="Simulation produced no traces"):
1375              mlflow.genai.evaluate(
1376                  data=simulator,
1377                  predict_fn=failing_predict_fn,
1378                  scorers=[has_trace],
1379              )
1380  
1381  
1382  def test_session_level_evaluation_with_predict_fn_without_simulator():
1383      class SessionScorer(mlflow.genai.Scorer):
1384          def __init__(self):
1385              super().__init__(name="session_scorer")
1386  
1387          @property
1388          def is_session_level_scorer(self):
1389              return True
1390  
1391          def __call__(self, session=None, **kwargs):
1392              return len(session or [])
1393  
1394      data = [
1395          {"inputs": {"question": "What is MLflow?"}, "outputs": "MLflow is a tool"},
1396      ]
1397  
1398      with pytest.raises(
1399          MlflowException,
1400          match=(
1401              r"Session-level scorers require traces with session IDs.*"
1402              r"session_scorer.*"
1403              r"Either pass a ConversationSimulator to `data` with `predict_fn`"
1404          ),
1405      ):
1406          mlflow.genai.evaluate(
1407              data=data,
1408              predict_fn=TestModel().predict,
1409              scorers=[SessionScorer()],
1410          )
1411  
1412  
1413  def test_evaluate_with_conversation_simulator_calls_simulate():
1414      simulator = ConversationSimulator(
1415          test_cases=[{"goal": "Learn MLflow"}],
1416          max_turns=2,
1417      )
1418  
1419      def mock_predict_fn(input: list[dict[str, Any]], **kwargs):
1420          return {"output": "Mock response"}
1421  
1422      with mock.patch.object(simulator, "simulate") as mock_simulate:
1423          # Return empty list to trigger the "no traces" error
1424          mock_simulate.return_value = []
1425  
1426          with pytest.raises(MlflowException, match="Simulation produced no traces"):
1427              mlflow.genai.evaluate(
1428                  data=simulator,
1429                  predict_fn=mock_predict_fn,
1430                  scorers=[has_trace],
1431              )
1432  
1433          # Verify simulate was called with predict_fn
1434          mock_simulate.assert_called_once_with(mock_predict_fn)
1435  
1436  
1437  @scorer
1438  def always_pass(outputs):
1439      return True
1440  
1441  
1442  @pytest.mark.parametrize(
1443      ("expectations_values", "expected_count"),
1444      [
1445          ([None, {}], 2),
1446          ([float("nan")], 1),
1447          ([{"expected": "value"}, None, {}], 3),
1448      ],
1449  )
1450  def test_evaluate_handles_empty_expectations(expectations_values, expected_count):
1451      data = [
1452          {
1453              "inputs": {"question": f"Q{i}"},
1454              "outputs": f"A{i}",
1455              "expectations": exp,
1456          }
1457          for i, exp in enumerate(expectations_values)
1458      ]
1459  
1460      result = mlflow.genai.evaluate(data=data, scorers=[always_pass])
1461  
1462      assert result is not None
1463      assert result.metrics is not None
1464      assert result.result_df is not None
1465      assert len(result.result_df) == expected_count
1466      assert result.metrics["always_pass/mean"] == 1.0
1467  
1468  
1469  from mlflow.genai.simulators.simulator import BaseSimulatedUserAgent
1470  
1471  
1472  class MockUserAgent(BaseSimulatedUserAgent):
1473      def __init__(self, **kwargs):
1474          pass
1475  
1476      def generate_message(self, context):
1477          if context.turn == 0:
1478              return f"Hello, I want to {context.goal}"
1479          return "[GOAL ACHIEVED]"
1480  
1481  
1482  def test_evaluate_with_simulator_creates_single_run(tmp_path):
1483      mlflow.set_tracking_uri(f"sqlite:///{tmp_path}/mlflow.db")
1484      mlflow.set_experiment("test-experiment")
1485  
1486      simulator = ConversationSimulator(
1487          test_cases=[{"goal": "get help"}],
1488          max_turns=2,
1489          user_agent_class=MockUserAgent,
1490      )
1491  
1492      def mock_predict_fn(input: list[dict[str, Any]], **kwargs):
1493          return {"content": "Response"}
1494  
1495      with mock.patch(
1496          "mlflow.genai.simulators.simulator.invoke_model_without_tracing",
1497          return_value='{"rationale": "Goal achieved!", "result": "yes"}',
1498      ):
1499          mlflow.genai.evaluate(data=simulator, predict_fn=mock_predict_fn, scorers=[])
1500  
1501      runs = mlflow.search_runs()
1502      assert len(runs) == 1
1503  
1504  
1505  def test_evaluate_with_simulator_within_parent_run(tmp_path):
1506      mlflow.set_tracking_uri(f"sqlite:///{tmp_path}/mlflow.db")
1507      mlflow.set_experiment("test-experiment")
1508  
1509      simulator = ConversationSimulator(
1510          test_cases=[{"goal": "get help"}],
1511          max_turns=2,
1512          user_agent_class=MockUserAgent,
1513      )
1514  
1515      def mock_predict_fn(input: list[dict[str, Any]], **kwargs):
1516          return {"content": "Response"}
1517  
1518      with mock.patch(
1519          "mlflow.genai.simulators.simulator.invoke_model_without_tracing",
1520          return_value='{"rationale": "Goal achieved!", "result": "yes"}',
1521      ):
1522          with mlflow.start_run(run_name="parent-run") as parent_run:
1523              parent_run_id = parent_run.info.run_id
1524              mlflow.genai.evaluate(data=simulator, predict_fn=mock_predict_fn, scorers=[])
1525              assert mlflow.active_run().info.run_id == parent_run_id
1526  
1527      runs = mlflow.search_runs()
1528      assert len(runs) == 1
1529      assert runs.iloc[0]["tags.mlflow.runName"] == "parent-run"
1530  
1531  
1532  # ===================== Rate Limiting & Pipelining Tests =====================
1533  
1534  
1535  def test_predict_rate_limiter_is_wired_to_predict_fn(monkeypatch):
1536      monkeypatch.setenv("MLFLOW_GENAI_EVAL_PREDICT_RATE_LIMIT", "100")
1537  
1538      with mock.patch.object(
1539          RPSRateLimiter, "acquire", autospec=True, side_effect=lambda self: None
1540      ) as mock_acquire:
1541          data = [{"inputs": {"q": f"Q{i}"}} for i in range(5)]
1542          mlflow.genai.evaluate(data=data, predict_fn=lambda q: "answer", scorers=[])
1543          assert mock_acquire.call_count == 5
1544  
1545  
1546  def test_scorer_rate_limiter_is_wired_to_scorers(monkeypatch):
1547      monkeypatch.setenv("MLFLOW_GENAI_EVAL_SCORER_RATE_LIMIT", "100")
1548  
1549      with mock.patch.object(
1550          RPSRateLimiter, "acquire", autospec=True, side_effect=lambda self: None
1551      ) as mock_acquire:
1552          data = [{"inputs": {"q": f"Q{i}"}, "outputs": "a"} for i in range(3)]
1553          mlflow.genai.evaluate(data=data, scorers=[always_pass, always_pass])
1554          # 3 items x 2 scorers = 6 scorer acquire calls (no predict_fn → no predict acquires)
1555          assert mock_acquire.call_count == 6
1556  
1557  
1558  def test_pipelining_scores_while_predicts_pending(monkeypatch):
1559      monkeypatch.setenv("MLFLOW_GENAI_EVAL_MAX_WORKERS", "2")
1560  
1561      # Gate that blocks predicts after the first batch completes
1562      gate = threading.Event()
1563      predict_call_count = 0
1564      predict_lock = threading.Lock()
1565      scoring_started_while_predicts_pending = threading.Event()
1566  
1567      def gated_predict_fn(q):
1568          nonlocal predict_call_count
1569          with predict_lock:
1570              predict_call_count += 1
1571              call_num = predict_call_count
1572          # Let the first 2 predictions through immediately, block the rest
1573          if call_num > 2:
1574              gate.wait(timeout=5)
1575          return "answer"
1576  
1577      @scorer
1578      def signaling_scorer(outputs):
1579          # If we're scoring while predicts are still pending, signal success
1580          with predict_lock:
1581              current_predicts = predict_call_count
1582          if (
1583              current_predicts > 0 and current_predicts < 6
1584          ):  # 6 total items, so some must still be pending
1585              scoring_started_while_predicts_pending.set()
1586          return True
1587  
1588      data = [{"inputs": {"q": f"Q{i}"}} for i in range(6)]
1589  
1590      try:
1591          # Run evaluate in a background thread so we can release the gate
1592          result_holder = []
1593  
1594          def run_eval():
1595              result = mlflow.genai.evaluate(
1596                  data=data, predict_fn=gated_predict_fn, scorers=[signaling_scorer]
1597              )
1598              result_holder.append(result)
1599  
1600          eval_thread = threading.Thread(name="test-evaluation-eval", target=run_eval)
1601          eval_thread.start()
1602  
1603          # Wait for scoring to signal it started while predicts are pending
1604          signaled = scoring_started_while_predicts_pending.wait(timeout=10)
1605  
1606          # Release all blocked predicts
1607          gate.set()
1608          eval_thread.join(timeout=30)
1609  
1610          assert signaled
1611      finally:
1612          gate.set()  # Ensure we don't leave threads blocked
1613  
1614  
1615  def test_backpressure_limits_in_flight_items(monkeypatch):
1616      workers = 2
1617      monkeypatch.setenv("MLFLOW_GENAI_EVAL_MAX_WORKERS", str(workers))
1618      monkeypatch.setenv("MLFLOW_GENAI_EVAL_PREDICT_RATE_LIMIT", "0")
1619      monkeypatch.setenv("MLFLOW_GENAI_EVAL_SCORER_RATE_LIMIT", "0")
1620      # Skip pre-flight predict_fn call that runs outside the backpressure semaphore
1621      monkeypatch.setenv("MLFLOW_GENAI_EVAL_SKIP_TRACE_VALIDATION", "true")
1622      buffer = backpressure_buffer(workers)
1623  
1624      max_in_flight = 0
1625      in_flight = 0
1626      lock = threading.Lock()
1627      predict_done = threading.Semaphore(0)
1628      score_gate = threading.Event()
1629  
1630      def tracking_predict(q):
1631          nonlocal in_flight, max_in_flight
1632          with lock:
1633              in_flight += 1
1634              max_in_flight = max(max_in_flight, in_flight)
1635          predict_done.release()
1636          return "answer"
1637  
1638      @scorer
1639      def blocking_scorer(outputs):
1640          score_gate.wait()
1641          nonlocal in_flight
1642          with lock:
1643              in_flight -= 1
1644          return True
1645  
1646      num_items = 3 * buffer
1647      data = [{"inputs": {"q": f"Q{i}"}} for i in range(num_items)]
1648  
1649      eval_thread = threading.Thread(
1650          name="test-evaluation-blocking",
1651          target=lambda: mlflow.genai.evaluate(
1652              data=data, predict_fn=tracking_predict, scorers=[blocking_scorer]
1653          ),
1654      )
1655  
1656      try:
1657          eval_thread.start()
1658  
1659          # Wait for exactly `buffer` predicts to complete. The semaphore guarantees
1660          # each acquire() returns once a predict runs. After this, the submit thread
1661          # is blocked on the backpressure semaphore and no more predicts can run
1662          # (scorers are blocked on score_gate).
1663          for _ in range(buffer):
1664              predict_done.acquire()
1665  
1666          with lock:
1667              observed_max = max_in_flight
1668      finally:
1669          score_gate.set()
1670          eval_thread.join()
1671  
1672      # The semaphore bounds in-flight items. Without backpressure all
1673      # num_items would pile up.
1674      assert observed_max == buffer
1675  
1676  
1677  def test_evaluate_logs_scorer_failure_summary():
1678      @scorer
1679      def failing_scorer(inputs, outputs):
1680          raise ValueError("Model endpoint not found")
1681  
1682      @scorer
1683      def working_scorer(inputs, outputs):
1684          return True
1685  
1686      data = [
1687          {
1688              "inputs": {"question": "What is MLflow?"},
1689              "outputs": "MLflow is a platform",
1690          },
1691          {
1692              "inputs": {"question": "What is Python?"},
1693              "outputs": "Python is a language",
1694          },
1695      ]
1696  
1697      with mock.patch("mlflow.genai.evaluation.harness._logger.warning") as mock_warning:
1698          result = mlflow.genai.evaluate(
1699              data=data,
1700              scorers=[failing_scorer, working_scorer],
1701          )
1702  
1703          # Evaluation should complete without raising an exception
1704          assert "working_scorer/mean" in result.metrics
1705  
1706          # Verify warning was logged with failure summary
1707          warning_calls = [call.args[0] for call in mock_warning.call_args_list]
1708          failure_warnings = [
1709              msg
1710              for msg in warning_calls
1711              if "Some scorer invocations failed during evaluation" in msg
1712          ]
1713          warning_message = failure_warnings[0]
1714          assert "'failing_scorer': 2/2 failed" in warning_message
1715          assert "Check individual trace assessments for detailed error messages" in warning_message
1716  
1717  
1718  def test_evaluate_no_warning_when_all_scorers_succeed():
1719      @scorer
1720      def working_scorer_1(inputs, outputs):
1721          return True
1722  
1723      @scorer
1724      def working_scorer_2(inputs, outputs):
1725          return False
1726  
1727      data = [
1728          {
1729              "inputs": {"question": "What is MLflow?"},
1730              "outputs": "MLflow is a platform",
1731          },
1732      ]
1733  
1734      with mock.patch("mlflow.genai.evaluation.harness._logger.warning") as mock_warning:
1735          result = mlflow.genai.evaluate(
1736              data=data,
1737              scorers=[working_scorer_1, working_scorer_2],
1738          )
1739  
1740          assert "working_scorer_1/mean" in result.metrics
1741          assert "working_scorer_2/mean" in result.metrics
1742          warning_calls = [call.args[0] for call in mock_warning.call_args_list]
1743          assert not any(
1744              "Some scorer invocations failed during evaluation" in msg for msg in warning_calls
1745          )
1746  
1747  
1748  def test_evaluate_logs_scorer_failure_summary_with_multi_turn_scorers():
1749  
1750      @mlflow.trace
1751      def model(question, session_id):
1752          mlflow.update_current_trace(metadata={"mlflow.trace.session": session_id})
1753          return f"Answer to {question}"
1754  
1755      model("Q1", session_id="session_1")
1756      trace_1 = mlflow.get_trace(mlflow.get_last_active_trace_id())
1757  
1758      model("Q2", session_id="session_1")
1759      trace_2 = mlflow.get_trace(mlflow.get_last_active_trace_id())
1760  
1761      model("Q3", session_id="session_2")
1762      trace_3 = mlflow.get_trace(mlflow.get_last_active_trace_id())
1763  
1764      dataset = create_dataset(name="multi_turn_failure_test")
1765      dataset.merge_records([trace_1, trace_2, trace_3])
1766  
1767      with mock.patch("mlflow.genai.evaluation.harness._logger.warning") as mock_warning:
1768          result = mlflow.genai.evaluate(
1769              data=dataset,
1770              scorers=[FailingSessionScorer(), WorkingSessionScorer()],
1771          )
1772  
1773          assert "working_session_scorer/mean" in result.metrics
1774  
1775          warning_calls = [call.args[0] for call in mock_warning.call_args_list]
1776          failure_warnings = [
1777              msg
1778              for msg in warning_calls
1779              if "Some scorer invocations failed during evaluation" in msg
1780          ]
1781          assert len(failure_warnings) > 0
1782          warning_message = failure_warnings[0]
1783          assert "'failing_session_scorer': 2/2 failed" in warning_message
1784          assert "Check individual trace assessments for detailed error messages" in warning_message
1785  
1786  
1787  def test_evaluate_logs_scorer_failure_summary_with_mixed_scorers():
1788      @scorer
1789      def failing_single_turn(inputs, outputs):
1790          raise ValueError("Single turn scorer error")
1791  
1792      @mlflow.trace
1793      def model(question, session_id):
1794          mlflow.update_current_trace(metadata={"mlflow.trace.session": session_id})
1795          return f"Answer to {question}"
1796  
1797      model("Q1", session_id="session_1")
1798      trace_1 = mlflow.get_trace(mlflow.get_last_active_trace_id())
1799  
1800      model("Q2", session_id="session_1")
1801      trace_2 = mlflow.get_trace(mlflow.get_last_active_trace_id())
1802  
1803      model("Q3", session_id="session_2")
1804      trace_3 = mlflow.get_trace(mlflow.get_last_active_trace_id())
1805  
1806      model("Q4", session_id="session_2")
1807      trace_4 = mlflow.get_trace(mlflow.get_last_active_trace_id())
1808  
1809      # Create dataset from traces
1810      dataset = create_dataset(name="mixed_scorer_failure_test")
1811      dataset.merge_records([trace_1, trace_2, trace_3, trace_4])
1812  
1813      with mock.patch("mlflow.genai.evaluation.harness._logger.warning") as mock_warning:
1814          result = mlflow.genai.evaluate(
1815              data=dataset,
1816              scorers=[
1817                  failing_single_turn,
1818                  always_pass,
1819                  FailingSessionScorer(),
1820                  WorkingSessionScorer(),
1821              ],
1822          )
1823  
1824          assert "always_pass/mean" in result.metrics
1825          assert "working_session_scorer/mean" in result.metrics
1826  
1827          warning_calls = [call.args[0] for call in mock_warning.call_args_list]
1828          failure_warnings = [
1829              msg
1830              for msg in warning_calls
1831              if "Some scorer invocations failed during evaluation" in msg
1832          ]
1833          assert len(failure_warnings) > 0
1834          warning_message = failure_warnings[0]
1835          assert "'failing_single_turn': 4/4 failed" in warning_message
1836          assert "'failing_session_scorer': 2/2 failed" in warning_message
1837          assert "Check individual trace assessments for detailed error messages" in warning_message
1838  
1839  
1840  def test_no_rate_limit_backward_compat(monkeypatch):
1841      monkeypatch.setenv("MLFLOW_GENAI_EVAL_PREDICT_RATE_LIMIT", "0")
1842      monkeypatch.setenv("MLFLOW_GENAI_EVAL_SCORER_RATE_LIMIT", "0")
1843      data = [
1844          {
1845              "inputs": {"question": "What is MLflow?"},
1846              "outputs": "MLflow is a tool for ML",
1847              "expectations": {"expected_response": "MLflow is a tool for ML", "max_length": 100},
1848          },
1849      ]
1850  
1851      result = mlflow.genai.evaluate(data=data, scorers=[exact_match, is_concise])
1852  
1853      assert result.metrics["exact_match/mean"] == 1.0
1854      assert result.metrics["is_concise/mean"] == 1.0
1855  
1856  
1857  # ===================== Retry & Adaptive Rate Limiting Tests =====================
1858  
1859  
1860  def test_predict_retries_on_429(monkeypatch):
1861      monkeypatch.setenv("MLFLOW_GENAI_EVAL_PREDICT_RATE_LIMIT", "0")
1862      monkeypatch.setenv("MLFLOW_GENAI_EVAL_MAX_RETRIES", "3")
1863  
1864      attempts = []
1865  
1866      def flaky_predict(q):
1867          attempts.append(1)
1868          # First call is the pre-flight validation check — let it pass.
1869          # Fail on calls 2 and 3, succeed on call 4.
1870          if 1 < len(attempts) < 4:
1871              raise Exception("429 Too Many Requests")
1872          return "answer"
1873  
1874      data = [{"inputs": {"q": "Q1"}}]
1875      result = mlflow.genai.evaluate(data=data, predict_fn=flaky_predict, scorers=[always_pass])
1876  
1877      # 1 validation + 1 fail + 1 fail + 1 success = 4 total
1878      assert len(attempts) == 4
1879      assert result.metrics["always_pass/mean"] == 1.0
1880  
1881  
1882  def test_scorer_retries_on_429(monkeypatch):
1883      monkeypatch.setenv("MLFLOW_GENAI_EVAL_PREDICT_RATE_LIMIT", "0")
1884      monkeypatch.setenv("MLFLOW_GENAI_EVAL_SCORER_RATE_LIMIT", "0")
1885      monkeypatch.setenv("MLFLOW_GENAI_EVAL_MAX_RETRIES", "3")
1886  
1887      attempts = []
1888  
1889      @scorer
1890      def flaky_scorer(outputs):
1891          attempts.append(1)
1892          if len(attempts) < 3:
1893              raise Exception("429 Too Many Requests")
1894          return True
1895  
1896      data = [{"inputs": {"q": "Q1"}, "outputs": "a"}]
1897      result = mlflow.genai.evaluate(data=data, scorers=[flaky_scorer])
1898  
1899      assert len(attempts) == 3
1900      assert result.metrics["flaky_scorer/mean"] == 1.0
1901  
1902  
1903  def test_adaptive_rate_reduces_on_429(monkeypatch):
1904      monkeypatch.setenv("MLFLOW_GENAI_EVAL_PREDICT_RATE_LIMIT", "auto")
1905      monkeypatch.setenv("MLFLOW_GENAI_EVAL_MAX_RETRIES", "3")
1906  
1907      rate_after_throttle = []
1908  
1909      original_report_throttle = RPSRateLimiter.report_throttle
1910  
1911      def spy_report_throttle(self):
1912          original_report_throttle(self)
1913          rate_after_throttle.append(self._rps)
1914  
1915      attempts = []
1916  
1917      def flaky_predict(q):
1918          attempts.append(1)
1919          # First call is the pre-flight validation check — let it pass.
1920          # Fail on call 2 to trigger a throttle event.
1921          if len(attempts) == 2:
1922              raise Exception("429 Too Many Requests")
1923          return "answer"
1924  
1925      data = [{"inputs": {"q": f"Q{i}"}} for i in range(3)]
1926  
1927      with mock.patch.object(RPSRateLimiter, "report_throttle", spy_report_throttle):
1928          result = mlflow.genai.evaluate(data=data, predict_fn=flaky_predict, scorers=[always_pass])
1929  
1930      assert result.metrics["always_pass/mean"] == 1.0
1931      # At least one throttle event observed, and the rate was reduced
1932      assert len(rate_after_throttle) >= 1
1933      assert rate_after_throttle[0] < AUTO_INITIAL_RPS
1934  
1935  
1936  @pytest.mark.parametrize(
1937      ("trace_or_none", "run_id"),
1938      [
1939          (None, "run-1"),
1940          (
1941              Trace(
1942                  info=create_test_trace_info_with_uc_table(
1943                      trace_id="tr-uc", catalog_name="catalog", schema_name="schema"
1944                  ),
1945                  data=TraceData(spans=[]),
1946              ),
1947              "run-1",
1948          ),
1949      ],
1950      ids=["none_trace", "uc_schema_trace"],
1951  )
1952  def test_should_clone_trace_returns_false_early(trace_or_none, run_id):
1953      assert _should_clone_trace(trace_or_none, run_id=run_id) is False
1954  
1955  
1956  @pytest.mark.parametrize(
1957      ("experiment_id", "expected"),
1958      [
1959          ("exp-999", True),
1960          ("exp-123", False),
1961      ],
1962      ids=["different_experiment", "matching_experiment"],
1963  )
1964  def test_should_clone_trace_with_explicit_experiment_id(
1965      mlflow_experiment_trace, experiment_id, expected
1966  ):
1967      with mock.patch(
1968          "mlflow.genai.evaluation.harness._does_store_support_trace_linking",
1969          return_value=True,
1970      ) as mock_store:
1971          result = _should_clone_trace(
1972              mlflow_experiment_trace, run_id="run-1", experiment_id=experiment_id
1973          )
1974      assert result is expected
1975      if expected is False:
1976          mock_store.assert_called_once()
1977  
1978  
1979  def test_should_clone_trace_falls_back_to_get_experiment_id_when_none(mlflow_experiment_trace):
1980      with (
1981          mock.patch(
1982              "mlflow.tracking.fluent._get_experiment_id",
1983              return_value="exp-999",
1984          ) as mock_get_exp,
1985          mock.patch(
1986              "mlflow.genai.evaluation.harness._does_store_support_trace_linking",
1987              return_value=True,
1988          ),
1989      ):
1990          result = _should_clone_trace(mlflow_experiment_trace, run_id="run-1", experiment_id=None)
1991          mock_get_exp.assert_called_once()
1992      assert result is True
1993  
1994  
1995  def test_should_clone_trace_does_not_call_get_experiment_id_when_provided(mlflow_experiment_trace):
1996      with (
1997          mock.patch("mlflow.tracking.fluent._get_experiment_id") as mock_get_exp,
1998          mock.patch(
1999              "mlflow.genai.evaluation.harness._does_store_support_trace_linking",
2000              return_value=True,
2001          ),
2002      ):
2003          _should_clone_trace(mlflow_experiment_trace, run_id="run-1", experiment_id="exp-123")
2004          mock_get_exp.assert_not_called()