test_evaluation.py
1 import threading 2 import uuid 3 from concurrent.futures import ThreadPoolExecutor 4 from dataclasses import dataclass 5 from pathlib import Path 6 from typing import Any, Literal 7 from unittest import mock 8 from unittest.mock import ANY, MagicMock 9 10 import pandas as pd 11 import pytest 12 13 import mlflow 14 from mlflow.entities.assessment import Expectation, Feedback 15 from mlflow.entities.assessment_source import AssessmentSource, AssessmentSourceType 16 from mlflow.entities.span import SpanType 17 from mlflow.entities.trace import Trace 18 from mlflow.entities.trace_data import TraceData 19 from mlflow.exceptions import MlflowException 20 from mlflow.genai.datasets import EvaluationDataset, create_dataset 21 from mlflow.genai.evaluation.entities import EvaluationResult 22 from mlflow.genai.evaluation.harness import ( 23 AUTO_INITIAL_RPS, 24 _should_clone_trace, 25 backpressure_buffer, 26 ) 27 from mlflow.genai.evaluation.rate_limiter import RPSRateLimiter 28 from mlflow.genai.scorers.base import scorer 29 from mlflow.genai.scorers.builtin_scorers import RelevanceToQuery 30 from mlflow.genai.simulators import ConversationSimulator 31 from mlflow.server import handlers 32 from mlflow.server.fastapi_app import app 33 from mlflow.server.handlers import initialize_backend_stores 34 from mlflow.tracing.constant import AssessmentMetadataKey, TraceMetadataKey 35 36 from tests.helper_functions import get_safe_port 37 from tests.tracing.helper import ( 38 create_test_trace_info, 39 create_test_trace_info_with_uc_table, 40 get_traces, 41 ) 42 from tests.tracking.integration_test_utils import ServerThread 43 44 45 @pytest.fixture 46 def mlflow_experiment_trace(): 47 return Trace( 48 info=create_test_trace_info(trace_id="tr-123", experiment_id="exp-123"), 49 data=TraceData(spans=[]), 50 ) 51 52 53 _DUMMY_CHAT_RESPONSE = { 54 "id": "1", 55 "object": "text_completion", 56 "created": "2021-10-01T00:00:00.000000Z", 57 "model": "gpt-4o-mini", 58 "choices": [ 59 { 60 "index": 0, 61 "message": { 62 "content": "This is a response", 63 "role": "assistant", 64 }, 65 "finish_reason": "length", 66 } 67 ], 68 "usage": { 69 "prompt_tokens": 1, 70 "completion_tokens": 1, 71 "total_tokens": 2, 72 }, 73 } 74 75 76 class TestModel: 77 def predict(self, question: str) -> str: 78 return "I don't know" 79 80 81 @scorer 82 def exact_match(outputs, expectations): 83 return outputs == expectations["expected_response"] 84 85 86 @scorer 87 def is_concise(outputs, expectations): 88 return len(outputs) <= expectations["max_length"] 89 90 91 @scorer 92 def relevance(inputs, outputs): 93 return Feedback( 94 name="relevance", 95 value="yes", 96 rationale="The response is relevant to the question", 97 source=AssessmentSource(source_id="gpt", source_type="LLM_JUDGE"), 98 ) 99 100 101 @scorer 102 @mlflow.trace(span_type=SpanType.EVALUATOR) 103 def has_trace(trace): 104 return trace is not None 105 106 107 class FailingSessionScorer(mlflow.genai.Scorer): 108 def __init__(self): 109 super().__init__(name="failing_session_scorer") 110 111 @property 112 def is_session_level_scorer(self) -> bool: 113 return True 114 115 def __call__(self, session=None, **kwargs): 116 raise ValueError("Session scorer error") 117 118 119 class WorkingSessionScorer(mlflow.genai.Scorer): 120 def __init__(self): 121 super().__init__(name="working_session_scorer") 122 123 @property 124 def is_session_level_scorer(self) -> bool: 125 return True 126 127 def __call__(self, session=None, **kwargs): 128 return len(session or []) 129 130 131 def _validate_assessments(traces): 132 """Validate assessments are added to the traces""" 133 for trace in traces: 134 assert len(trace.info.assessments) == 6, ( 135 f"Expected 6 assessments, got {len(trace.info.assessments)}" 136 f"Assessments: {[a.name for a in trace.info.assessments]}" 137 ) # 2 expectations + 4 feedbacks 138 assessments = {a.name: a for a in trace.info.assessments} 139 a_exact_match = assessments["exact_match"] 140 assert isinstance(a_exact_match, Feedback) 141 assert a_exact_match.trace_id == trace.info.trace_id 142 assert isinstance(a_exact_match.value, bool) 143 assert a_exact_match.source.source_type == AssessmentSourceType.CODE 144 # Scorer name is used as source_id 145 assert a_exact_match.source.source_id == "exact_match" 146 assert a_exact_match.metadata[AssessmentMetadataKey.SOURCE_RUN_ID] is not None 147 148 a_is_concise = assessments["is_concise"] 149 assert isinstance(a_is_concise, Feedback) 150 assert isinstance(a_is_concise.value, bool) 151 assert a_is_concise.metadata[AssessmentMetadataKey.SOURCE_RUN_ID] is not None 152 153 a_has_trace = assessments["has_trace"] 154 assert isinstance(a_has_trace, Feedback) 155 assert a_has_trace.value is True 156 assert a_has_trace.metadata[AssessmentMetadataKey.SOURCE_RUN_ID] is not None 157 158 a_relevance = assessments["relevance"] 159 assert isinstance(a_relevance, Feedback) 160 assert a_relevance.value == "yes" 161 assert a_relevance.source.source_id == "gpt" 162 assert a_relevance.source.source_type == "LLM_JUDGE" 163 assert a_relevance.rationale == "The response is relevant to the question" 164 assert a_relevance.metadata[AssessmentMetadataKey.SOURCE_RUN_ID] is not None 165 166 a_expected_response = assessments["expected_response"] 167 assert isinstance(a_expected_response, Expectation) 168 assert isinstance(a_expected_response.value, str) 169 assert a_expected_response.source.source_type == AssessmentSourceType.HUMAN 170 assert a_expected_response.source.source_id is not None 171 172 a_max_length = assessments["max_length"] 173 assert isinstance(a_max_length, Expectation) 174 assert isinstance(a_max_length.value, (int, float)) 175 assert a_max_length.source.source_type == AssessmentSourceType.HUMAN 176 177 178 def _validate_eval_result_df(result: EvaluationResult): 179 search_traces_df = mlflow.search_traces(run_id=result.run_id) 180 assert result.result_df is not None 181 assert len(result.result_df) == len(search_traces_df) 182 assert set(result.result_df.columns) >= set(search_traces_df.columns) 183 184 actual = result.result_df.sort_values(by="trace_id").reset_index(drop=True) 185 expected = search_traces_df.sort_values(by="trace_id").reset_index(drop=True) 186 for i in range(len(actual)): 187 assert actual.iloc[i].trace_id == expected.iloc[i].trace_id 188 assert actual.iloc[i].spans == expected.iloc[i].spans 189 assert actual.iloc[i].assessments == expected.iloc[i].assessments 190 assert actual.iloc[i]["exact_match/value"] is not None 191 assert actual.iloc[i]["is_concise/value"] is not None 192 assert actual.iloc[i]["relevance/value"] is not None 193 assert actual.iloc[i]["has_trace/value"] is not None 194 assert actual.iloc[i]["expected_response/value"] is not None 195 assert actual.iloc[i]["max_length/value"] is not None 196 197 # backwards compatibility 198 assert len(result.tables["eval_results"]) == len(result.result_df) 199 200 201 @dataclass 202 class ServerConfig: 203 host_type: Literal["local", "remote", "databricks"] 204 backend_type: Literal["file", "sqlalchemy"] | None = None 205 206 207 # Test with different server configurations 208 # 1. local file backend 209 # 2. local sqlalchemy backend 210 # 3. remote server running on file backend 211 # 4. remote server running on sqlalchemy backend 212 @pytest.fixture( 213 params=[ 214 ServerConfig(host_type="local", backend_type="file"), 215 ServerConfig(host_type="local", backend_type="sqlalchemy"), 216 ServerConfig(host_type="remote", backend_type="file"), 217 ServerConfig(host_type="remote", backend_type="sqlalchemy"), 218 ], 219 ids=["local_file", "local_sqlalchemy", "remote_file", "remote_sqlalchemy"], 220 ) 221 def server_config(request, tmp_path: Path, db_uri: str): 222 """Provides an MLflow Tracking API client pointed at the local tracking server.""" 223 config = request.param 224 if config.backend_type == "file": 225 pytest.skip("FileStore is no longer supported.") 226 227 match config.backend_type: 228 case "file": 229 backend_uri = tmp_path.joinpath("file").as_uri() 230 case "sqlalchemy": 231 backend_uri = db_uri 232 233 match config.host_type: 234 case "local": 235 mlflow.set_tracking_uri(backend_uri) 236 yield config 237 238 case "remote": 239 # Force-reset backend stores before each test. 240 handlers._tracking_store = None 241 handlers._model_registry_store = None 242 initialize_backend_stores(backend_uri, default_artifact_root=tmp_path.as_uri()) 243 244 with ServerThread(app, get_safe_port()) as url: 245 mlflow.set_tracking_uri(url) 246 yield config 247 248 249 def test_evaluate_with_static_dataset(server_config): 250 data = [ 251 { 252 "inputs": {"question": "What is MLflow?"}, 253 "outputs": "MLflow is a tool for ML", 254 "expectations": { 255 "expected_response": "MLflow is a tool for ML", 256 "max_length": 100, 257 }, 258 }, 259 { 260 "inputs": {"question": "What is Spark?"}, 261 "outputs": "Spark is a fast data processing engine", 262 "expectations": { 263 "expected_response": "Spark is a fast data processing engine", 264 "max_length": 1, 265 }, 266 }, 267 ] 268 269 result = mlflow.genai.evaluate( 270 data=data, 271 scorers=[exact_match, is_concise, relevance, has_trace], 272 ) 273 274 # OSS evaluator doesn't support metrics aggregation yet. 275 metrics = result.metrics 276 assert metrics["exact_match/mean"] == 1.0 277 assert metrics["is_concise/mean"] == 0.5 278 assert metrics["relevance/mean"] == 1.0 279 assert metrics["has_trace/mean"] == 1.0 280 281 # Exact number of traces should be generated 282 traces = get_traces() 283 assert len(traces) == len(data) 284 285 # Traces should be associated with the eval run 286 traces = mlflow.search_traces(run_id=result.run_id, return_type="list") 287 assert len(traces) == len(data) 288 289 # Re-order traces to match with the order of the input data 290 traces = sorted(traces, key=lambda t: t.data.spans[0].inputs["question"]) 291 292 for i in range(len(traces)): 293 assert len(traces[i].data.spans) == 1 294 span = traces[i].data.spans[0] 295 assert span.name == "root_span" 296 assert span.inputs == data[i]["inputs"] 297 assert span.outputs == data[i]["outputs"] 298 299 _validate_assessments(traces) 300 _validate_eval_result_df(result) 301 302 # Dataset input should be logged to the run 303 run = mlflow.get_run(result.run_id) 304 assert len(run.inputs.dataset_inputs) == 1 305 assert run.inputs.dataset_inputs[0].dataset.name == "dataset" 306 assert run.inputs.dataset_inputs[0].dataset.source_type == "code" 307 308 309 @pytest.mark.parametrize("is_predict_fn_traced", [True, False]) 310 def test_evaluate_with_predict_fn(is_predict_fn_traced, server_config): 311 model_id = mlflow.set_active_model(name="test-model-id").model_id 312 313 data = [ 314 { 315 "inputs": {"question": "What is MLflow?"}, 316 "expectations": { 317 "expected_response": "MLflow is a tool for ML", 318 "max_length": 100, 319 }, 320 }, 321 { 322 "inputs": {"question": "What is Spark?"}, 323 "expectations": { 324 "expected_response": "Spark is a fast data processing engine", 325 "max_length": 1, 326 }, 327 }, 328 ] 329 model = TestModel() 330 predict_fn = mlflow.trace(model.predict) if is_predict_fn_traced else model.predict 331 332 result = mlflow.genai.evaluate( 333 predict_fn=predict_fn, 334 data=data, 335 scorers=[exact_match, is_concise, relevance, has_trace], 336 model_id=model_id, 337 ) 338 339 metrics = result.metrics 340 assert metrics["exact_match/mean"] == 0.0 341 assert metrics["is_concise/mean"] == 0.5 342 assert metrics["relevance/mean"] == 1.0 343 assert metrics["has_trace/mean"] == 1.0 344 345 # Metrics should be logged to the model ID as well 346 model = mlflow.get_logged_model(model_id) 347 assert metrics == {m.key: m.value for m in model.metrics} 348 349 # Exact number of traces should be generated 350 traces = get_traces() 351 assert len(traces) == len(data) 352 353 # Traces should be associated with the eval run 354 traces = mlflow.search_traces(run_id=result.run_id, return_type="list") 355 assert len(traces) == len(data) 356 357 # Re-order traces to match with the order of the input data 358 traces = sorted(traces, key=lambda t: t.data.spans[0].inputs["question"]) 359 360 # Check if the model_id is set in the traces 361 assert traces[0].info.trace_metadata[TraceMetadataKey.MODEL_ID] == model_id 362 assert traces[1].info.trace_metadata[TraceMetadataKey.MODEL_ID] == model_id 363 364 # Validate assessments are added to the traces 365 for i in range(len(traces)): 366 assert len(traces[i].data.spans) == 1 367 span = traces[i].data.spans[0] 368 assert span.name == "predict" 369 assert span.inputs == data[i]["inputs"] 370 assert span.outputs == "I don't know" 371 372 _validate_assessments(traces) 373 _validate_eval_result_df(result) 374 375 376 @pytest.mark.parametrize("return_type", ["pandas", "list"]) 377 def test_evaluate_with_traces(monkeypatch: pytest.MonkeyPatch, server_config, return_type): 378 questions = ["What is MLflow?", "What is Spark?"] 379 380 @mlflow.trace(span_type=SpanType.AGENT) 381 def predict(question: str) -> str: 382 return TestModel().predict(question) 383 384 predict(questions[0]) 385 trace_id = mlflow.get_last_active_trace_id() 386 mlflow.log_expectation( 387 trace_id=trace_id, 388 name="expected_response", 389 value="MLflow is a tool for ML", 390 source=AssessmentSource(source_id="me", source_type="HUMAN"), 391 ) 392 mlflow.log_expectation( 393 trace_id=trace_id, 394 name="max_length", 395 value=100, 396 source=AssessmentSource(source_id="me", source_type="HUMAN"), 397 ) 398 predict(questions[1]) 399 trace_id = mlflow.get_last_active_trace_id() 400 mlflow.log_expectation( 401 trace_id=trace_id, 402 name="expected_response", 403 value="Spark is a fast data processing engine", 404 source=AssessmentSource(source_id="me", source_type="HUMAN"), 405 ) 406 mlflow.log_expectation( 407 trace_id=trace_id, 408 name="max_length", 409 value=1, 410 source=AssessmentSource(source_id="me", source_type="HUMAN"), 411 ) 412 413 data = mlflow.search_traces(return_type=return_type) 414 assert len(data) == len(questions) 415 416 result = mlflow.genai.evaluate( 417 data=data, 418 scorers=[exact_match, is_concise, relevance, has_trace], 419 ) 420 421 metrics = result.metrics 422 assert metrics["exact_match/mean"] == 0.0 423 assert metrics["is_concise/mean"] == 0.5 424 assert metrics["relevance/mean"] == 1.0 425 assert metrics["has_trace/mean"] == 1.0 426 427 if server_config.backend_type == "sqlalchemy": 428 # Assessments should be added to the traces in-place and no new trace should be created 429 traces = get_traces() 430 assert len(traces) == len(questions) 431 else: 432 # File store doesn't support trace linking, so each trace will be cloned to the eval run 433 assert len(get_traces()) == len(questions) * 2 434 435 # Traces are associated with the eval run 436 traces = mlflow.search_traces(run_id=result.run_id, return_type="list") 437 assert len(traces) == len(questions) 438 439 # Re-order traces to match with the order of the input data 440 traces = sorted(traces, key=lambda t: t.data.spans[0].inputs["question"]) 441 442 # Validate assessments are added to the traces 443 _validate_assessments(traces) 444 _validate_eval_result_df(result) 445 446 447 def test_evaluate_with_managed_dataset(is_in_databricks): 448 if is_in_databricks: 449 # Databricks path: Use managed dataset with mocks 450 class MockDatasetClient: 451 def __init__(self): 452 # dataset_id -> list of records 453 self.records = {} 454 455 def create_dataset(self, uc_table_name: str, experiment_ids: list[str]): 456 from databricks.agents.datasets import Dataset 457 458 dataset = Dataset( 459 dataset_id=str(uuid.uuid4()), 460 name=uc_table_name, 461 digest=None, 462 source_type="databricks-uc-table", 463 ) 464 self.records[dataset.dataset_id] = [] 465 return dataset 466 467 def list_dataset_records(self, dataset_id: str): 468 return self.records[dataset_id] 469 470 def batch_create_dataset_records(self, name: str, dataset_id: str, records): 471 self.records[dataset_id].extend(records) 472 473 def upsert_dataset_record_expectations( 474 self, name: str, dataset_id: str, record_id: str, expectations: list[dict[str, Any]] 475 ): 476 for record in self.records[dataset_id]: 477 if record.id == record_id: 478 record.expectations.update(expectations) 479 480 def sync_dataset_to_uc(self, dataset_id: str, uc_table_name: str): 481 pass 482 483 mock_client = MockDatasetClient() 484 with ( 485 mock.patch("databricks.rag_eval.datasets.api._get_client", return_value=mock_client), 486 mock.patch( 487 "databricks.rag_eval.datasets.entities._get_client", return_value=mock_client 488 ), 489 mock.patch("mlflow.genai.datasets.is_databricks_uri", return_value=True), 490 ): 491 dataset = create_dataset( 492 uc_table_name="mlflow.managed.dataset", experiment_id="exp-123" 493 ) 494 dataset.merge_records([ 495 { 496 "inputs": {"question": "What is MLflow?"}, 497 "expectations": { 498 "expected_response": "MLflow is a tool for ML", 499 "max_length": 100, 500 }, 501 }, 502 { 503 "inputs": {"question": "What is Spark?"}, 504 "expectations": { 505 "expected_response": "Spark is a fast data processing engine", 506 "max_length": 1, 507 }, 508 }, 509 ]) 510 511 result = mlflow.genai.evaluate( 512 data=dataset, 513 predict_fn=TestModel().predict, 514 scorers=[exact_match, is_concise, relevance, has_trace], 515 ) 516 else: 517 dataset = create_dataset( 518 name="eval_test_dataset", tags={"source": "test", "version": "1.0"} 519 ) 520 dataset.merge_records([ 521 { 522 "inputs": {"question": "What is MLflow?"}, 523 "expectations": { 524 "expected_response": "MLflow is a tool for ML", 525 "max_length": 100, 526 }, 527 }, 528 { 529 "inputs": {"question": "What is Spark?"}, 530 "expectations": { 531 "expected_response": "Spark is a fast data processing engine", 532 "max_length": 1, 533 }, 534 }, 535 ]) 536 537 result = mlflow.genai.evaluate( 538 data=dataset, 539 predict_fn=TestModel().predict, 540 scorers=[exact_match, is_concise, relevance, has_trace], 541 ) 542 543 metrics = result.metrics 544 assert metrics["exact_match/mean"] == 0.0 545 assert metrics["is_concise/mean"] == 0.5 546 assert metrics["relevance/mean"] == 1.0 547 assert metrics["has_trace/mean"] == 1.0 548 549 run = mlflow.get_run(result.run_id) 550 # Dataset metadata should be added to the run 551 assert len(run.inputs.dataset_inputs) == 1 552 assert run.inputs.dataset_inputs[0].dataset.name == dataset.name 553 assert run.inputs.dataset_inputs[0].dataset.digest == dataset.digest 554 # Check for the correct source_type based on whether we're in Databricks or OSS 555 expected_source_type = ( 556 "databricks-uc-table" if is_in_databricks else "mlflow_evaluation_dataset" 557 ) 558 assert run.inputs.dataset_inputs[0].dataset.source_type == expected_source_type 559 560 # Traces are associated with the eval run 561 traces = mlflow.search_traces(run_id=result.run_id, return_type="list") 562 assert len(traces) == 2 563 564 _validate_assessments(traces) 565 _validate_eval_result_df(result) 566 567 568 def test_evaluate_with_managed_dataset_from_searched_traces(): 569 for i in range(3): 570 with mlflow.start_span(name=f"qa_span_{i}") as span: 571 question = f"What is item {i}?" 572 span.set_inputs({"question": question}) 573 span.set_outputs({"answer": f"Item {i} is something"}) 574 575 mlflow.log_expectation( 576 trace_id=span.trace_id, 577 name="expected_response", 578 value=f"Item {i} is a detailed answer", 579 ) 580 mlflow.log_expectation( 581 trace_id=span.trace_id, 582 name="max_length", 583 value=50 if i % 2 == 0 else 10, 584 ) 585 586 traces_df = mlflow.search_traces() 587 588 dataset = create_dataset( 589 name="traces_eval_dataset", tags={"source": "traces", "evaluation": "test"} 590 ) 591 dataset.merge_records(traces_df) 592 593 result = mlflow.genai.evaluate( 594 data=dataset, 595 predict_fn=TestModel().predict, 596 scorers=[exact_match, is_concise, has_trace], 597 ) 598 599 metrics = result.metrics 600 assert "exact_match/mean" in metrics 601 assert "is_concise/mean" in metrics 602 assert "has_trace/mean" in metrics 603 assert metrics["has_trace/mean"] == 1.0 604 605 606 def test_model_from_deployment_endpoint(is_in_databricks): 607 with mock.patch("mlflow.deployments.get_deploy_client") as mock_get_deploy_client: 608 mock_client = mock_get_deploy_client.return_value 609 mock_client.predict.return_value = _DUMMY_CHAT_RESPONSE 610 611 data = [ 612 { 613 "inputs": { 614 "messages": [ 615 {"content": "You are a helpful assistant.", "role": "system"}, 616 {"content": "What is Spark?", "role": "user"}, 617 ], 618 "max_tokens": 10, 619 } 620 }, 621 { 622 "inputs": { 623 "messages": [ 624 {"content": "What is MLflow?", "role": "user"}, 625 ] 626 } 627 }, 628 ] 629 predict_fn = mlflow.genai.to_predict_fn("endpoints:/chat") 630 result = mlflow.genai.evaluate( 631 data=data, 632 predict_fn=predict_fn, 633 scorers=[has_trace], 634 ) 635 636 databricks_options = {"databricks_options": {"return_trace": True}} 637 mock_client.predict.assert_has_calls( 638 [ 639 # Test call to check if the function is traced or not 640 mock.call(endpoint="chat", inputs={**data[0]["inputs"], **databricks_options}), 641 # First evaluation call 642 mock.call(endpoint="chat", inputs={**data[0]["inputs"], **databricks_options}), 643 # Second evaluation call 644 mock.call(endpoint="chat", inputs={**data[1]["inputs"], **databricks_options}), 645 ], 646 any_order=True, 647 ) 648 649 # Validate traces 650 traces = mlflow.search_traces(run_id=result.run_id, return_type="list") 651 652 assert len(traces) == 2 653 spans = traces[0].data.spans 654 assert len(spans) == 1 655 assert spans[0].name == "predict" 656 # Eval harness runs prediction in parallel, so the order is not deterministic 657 assert spans[0].inputs in (data[0]["inputs"], data[1]["inputs"]) 658 assert spans[0].outputs == _DUMMY_CHAT_RESPONSE 659 660 661 def test_missing_scorers_argument(): 662 with pytest.raises(TypeError, match=r"evaluate\(\) missing 1 required positional"): 663 mlflow.genai.evaluate(data=[{"inputs": "Hello", "outputs": "Hi"}]) 664 665 666 def test_empty_scorers_allowed(): 667 mock_result = EvaluationResult(run_id="test-run", metrics={}, result_df=pd.DataFrame()) 668 669 data = [{"inputs": {"question": "What is MLflow?"}, "outputs": "MLflow is an ML platform"}] 670 671 with mock.patch("mlflow.genai.evaluation.base._run_harness") as mock_evaluate_oss: 672 mock_evaluate_oss.return_value = (mock_result, {}) 673 result = mlflow.genai.evaluate(data=data, scorers=[]) 674 675 assert result is mock_result 676 mock_evaluate_oss.assert_called_once() 677 678 679 @pytest.mark.parametrize("pass_full_dataframe", [True, False]) 680 def test_trace_input_can_contain_string_input(pass_full_dataframe, is_in_databricks): 681 """ 682 The `inputs` column must be a dictionary when a static dataset is provided. 683 However, when a trace is provided, it doesn't need to be validated and the 684 harness can handle it nicely. 685 """ 686 with mlflow.start_span() as span: 687 span.set_inputs("What is MLflow?") 688 span.set_outputs("MLflow is a tool for ML") 689 690 traces = mlflow.search_traces() 691 if not pass_full_dataframe: 692 traces = traces[["trace"]] 693 694 # Harness should run without an error 695 mlflow.genai.evaluate(data=traces, scorers=[RelevanceToQuery()]) 696 697 698 def test_max_workers_env_var(monkeypatch): 699 # Disable rate limits so auto-derivation doesn't override the default 700 monkeypatch.setenv("MLFLOW_GENAI_EVAL_PREDICT_RATE_LIMIT", "0") 701 monkeypatch.setenv("MLFLOW_GENAI_EVAL_SCORER_RATE_LIMIT", "0") 702 703 def _validate_max_workers(expected_max_workers): 704 with mock.patch( 705 "mlflow.genai.evaluation.harness.ThreadPoolExecutor", wraps=ThreadPoolExecutor 706 ) as mock_executor: 707 mlflow.genai.evaluate( 708 data=[ 709 { 710 "inputs": {"question": "What is MLflow?"}, 711 "outputs": "MLflow is a tool for ML", 712 } 713 ], 714 scorers=[RelevanceToQuery()], 715 ) 716 # ThreadPoolExecutor is called twice in OSS (harness + scorers) 717 first_call = mock_executor.call_args_list[0] 718 assert first_call[1]["max_workers"] == expected_max_workers 719 720 # default workers is 10 721 _validate_max_workers(10) 722 723 # override workers with env var 724 monkeypatch.setenv("MLFLOW_GENAI_EVAL_MAX_WORKERS", "20") 725 _validate_max_workers(20) 726 727 # legacy env var for backward compatibility 728 monkeypatch.delenv("MLFLOW_GENAI_EVAL_MAX_WORKERS", raising=False) 729 monkeypatch.setenv("RAG_EVAL_MAX_WORKERS", "30") 730 _validate_max_workers(30) 731 732 733 def test_dataset_name_is_logged_correctly(is_in_databricks): 734 data = pd.DataFrame({ 735 "inputs": [{"question": "What is MLflow?"}], 736 "outputs": ["MLflow is a tool for ML"], 737 }) 738 739 with mlflow.start_run() as run: 740 mlflow.genai.evaluate( 741 data=data, 742 scorers=[RelevanceToQuery()], 743 ) 744 745 if not is_in_databricks: 746 run_data = mlflow.get_run(run.info.run_id) 747 assert run_data.inputs is not None 748 assert run_data.inputs.dataset_inputs is not None 749 assert len(run_data.inputs.dataset_inputs) > 0 750 751 dataset_input = run_data.inputs.dataset_inputs[0] 752 dataset = dataset_input.dataset 753 assert dataset.name == "dataset" 754 755 756 def test_evaluate_with_dataset_preserves_name(is_in_databricks): 757 from mlflow.entities import Dataset as DatasetEntity 758 759 data = pd.DataFrame({ 760 "inputs": [{"question": "What is MLflow?"}], 761 "outputs": ["MLflow is a tool for ML"], 762 }) 763 764 mock_managed_dataset = MagicMock(spec=EvaluationDataset) 765 type(mock_managed_dataset).name = mock.PropertyMock(return_value="my_managed_dataset") 766 mock_managed_dataset.to_df.return_value = data 767 mock_managed_dataset.digest = "test_digest" 768 mock_managed_dataset.source = MagicMock() 769 mock_managed_dataset.source.to_json.return_value = "{}" 770 mock_managed_dataset.source._get_source_type.return_value = "test" 771 mock_managed_dataset._to_mlflow_entity.return_value = DatasetEntity( 772 name="my_managed_dataset", 773 digest="test_digest", 774 source_type="test", 775 source="{}", 776 schema=None, 777 profile=None, 778 ) 779 780 if not is_in_databricks: 781 with mlflow.start_run() as run: 782 mlflow.genai.evaluate( 783 data=data, 784 scorers=[RelevanceToQuery()], 785 ) 786 787 run_data = mlflow.get_run(run.info.run_id) 788 dataset_input = run_data.inputs.dataset_inputs[0] 789 assert dataset_input.dataset.name == "dataset" 790 791 with mlflow.start_run() as run: 792 mlflow.genai.evaluate( 793 data=mock_managed_dataset, 794 scorers=[RelevanceToQuery()], 795 ) 796 797 run_data = mlflow.get_run(run.info.run_id) 798 dataset_input = run_data.inputs.dataset_inputs[0] 799 assert dataset_input.dataset.name == "my_managed_dataset" 800 801 802 def test_evaluate_with_managed_dataset_preserves_name(): 803 mock_managed_dataset = MagicMock() 804 mock_managed_dataset.dataset_id = "d-1234567890abcdef1234567890abcdef" 805 mock_managed_dataset.name = "test.evaluation.sample_dataset" 806 mock_managed_dataset.digest = "abc123" 807 mock_managed_dataset.schema = None 808 mock_managed_dataset.profile = None 809 mock_managed_dataset.source_type = "databricks-uc-table" 810 mock_managed_dataset.create_time = None 811 mock_managed_dataset.created_by = None 812 mock_managed_dataset.last_update_time = None 813 mock_managed_dataset.last_updated_by = None 814 mock_managed_dataset.to_df.return_value = pd.DataFrame({ 815 "inputs": [{"question": "What is MLflow?"}], 816 "outputs": ["MLflow is a tool for ML"], 817 }) 818 819 dataset = EvaluationDataset(mock_managed_dataset) 820 821 with mlflow.start_run() as run: 822 mlflow.genai.evaluate( 823 data=dataset, 824 scorers=[RelevanceToQuery()], 825 ) 826 827 run_data = mlflow.get_run(run.info.run_id) 828 829 assert run_data.inputs is not None 830 assert run_data.inputs.dataset_inputs is not None 831 assert len(run_data.inputs.dataset_inputs) > 0 832 833 dataset_input = run_data.inputs.dataset_inputs[0] 834 logged_dataset = dataset_input.dataset 835 assert logged_dataset.name == "test.evaluation.sample_dataset" 836 837 838 @pytest.mark.parametrize( 839 ("tags_data", "expected_calls"), 840 [ 841 # Regular tags 842 ( 843 [ 844 {"environment": "test", "model_version": "v1.0"}, 845 {"environment": "production", "team": "data-science"}, 846 ], 847 [ 848 ("environment", "test"), 849 ("model_version", "v1.0"), 850 ("environment", "production"), 851 ("team", "data-science"), 852 ], 853 ), 854 # Empty tags dict 855 ( 856 [{}, {}], 857 [], 858 ), 859 # None tags (no tags field) 860 ( 861 [None, None], 862 [], 863 ), 864 # Mix of tags and empty/None 865 ( 866 [{"env": "test"}, {}, None], 867 [("env", "test")], 868 ), 869 ], 870 ) 871 def test_evaluate_with_tags(tags_data, expected_calls): 872 data = [ 873 { 874 "inputs": {"question": f"What is question {i}?"}, 875 "outputs": f"Answer {i}", 876 "expectations": {"expected_response": f"Answer {i}"}, 877 "tags": tags, 878 } 879 for i, tags in enumerate(tags_data) 880 ] 881 882 with mock.patch("mlflow.set_trace_tag") as mock_set_trace_tag: 883 mlflow.genai.evaluate( 884 data=data, 885 scorers=[exact_match], 886 ) 887 888 # Check that all expected calls were made (order may vary due to parallel execution) 889 actual_calls = mock_set_trace_tag.call_args_list 890 expected_mock_calls = [ 891 mock.call(trace_id=ANY, key=key, value=value) for key, value in expected_calls 892 ] 893 assert len(actual_calls) == len(expected_mock_calls) 894 for expected_call in expected_mock_calls: 895 assert expected_call in actual_calls 896 897 898 def test_evaluate_with_traces_tags_no_warnings(): 899 with mlflow.start_span() as span: 900 span.set_inputs({"question": "Hello?"}) 901 902 traces = mlflow.search_traces() 903 with mock.patch("mlflow.tracing.client._logger.warning") as mock_warning: 904 mlflow.genai.evaluate( 905 data=traces, 906 scorers=[has_trace], 907 ) 908 assert not any( 909 "immutable and cannot be set on a trace" in call.args[0] 910 for call in mock_warning.call_args_list 911 ) 912 913 914 def test_evaluate_with_tags_error_handling(is_in_databricks): 915 data = [ 916 { 917 "inputs": {"question": "What is MLflow?"}, 918 "outputs": "MLflow is a tool for ML", 919 "expectations": {"expected_response": "MLflow is a tool for ML"}, 920 "tags": {"invalid_tag": "value"}, 921 } 922 ] 923 924 # Mock set_trace_tag to raise an exception 925 with mock.patch("mlflow.set_trace_tag", side_effect=Exception("Tag logging failed")): 926 # This should not raise an exception 927 result = mlflow.genai.evaluate( 928 data=data, 929 scorers=[exact_match], 930 ) 931 932 # Evaluation should still succeed 933 assert "exact_match/mean" in result.metrics 934 935 936 def test_evaluate_with_invalid_tags_type(): 937 data = [ 938 { 939 "inputs": {"question": "What is MLflow?"}, 940 "outputs": "MLflow is a tool for ML", 941 "expectations": {"expected_response": "MLflow is a tool for ML"}, 942 "tags": "invalid_tags_string", # Should be dict 943 } 944 ] 945 946 with pytest.raises(MlflowException, match="Tags must be a dictionary"): 947 mlflow.genai.evaluate( 948 data=data, 949 scorers=[exact_match], 950 ) 951 952 953 def test_evaluate_without_inputs_in_eval_dataset(): 954 answers = [ 955 "MLflow is an open-source platform for managing ML lifecycle", 956 "Apache Spark is a fast data processing engine", 957 "I don't know", 958 ] 959 for answer in answers: 960 with mlflow.start_span() as span: 961 span.set_outputs(answer) 962 963 trace_df = mlflow.search_traces() 964 trace_df["inputs"] = None 965 trace_df["expectations"] = pd.Series([ 966 {"expected_response": answer, "max_length": 100} for answer in answers 967 ]) 968 969 result = mlflow.genai.evaluate( 970 data=trace_df, 971 scorers=[is_concise, exact_match, has_trace], 972 ) 973 974 assert "is_concise/mean" in result.metrics 975 assert "exact_match/mean" in result.metrics 976 assert "has_trace/mean" in result.metrics 977 978 @scorer 979 def input_exist(inputs): 980 if inputs is None: 981 return False 982 return True 983 984 trace_df["outputs"] = None 985 result = mlflow.genai.evaluate( 986 data=trace_df, 987 scorers=[input_exist], 988 ) 989 assert result.metrics["input_exist/mean"] == 0.0 990 991 992 def test_evaluate_with_only_trace_in_eval_dataset(): 993 for _ in range(3): 994 with mlflow.start_span(): 995 pass 996 997 trace_df = mlflow.search_traces() 998 trace_df = trace_df[["trace"]] 999 1000 result = mlflow.genai.evaluate( 1001 data=trace_df, 1002 scorers=[has_trace], 1003 ) 1004 1005 assert result.metrics["has_trace/mean"] == 1.0 1006 1007 1008 @pytest.mark.parametrize("is_enabled", [True, False]) 1009 def test_evaluate_with_scorer_tracing(server_config, monkeypatch, is_enabled): 1010 monkeypatch.setenv("MLFLOW_GENAI_EVAL_ENABLE_SCORER_TRACING", str(is_enabled).lower()) 1011 1012 data = [ 1013 { 1014 "inputs": {"question": "What is MLflow?"}, 1015 "expectations": { 1016 "expected_response": "MLflow is a tool for ML", 1017 "max_length": 100, 1018 }, 1019 }, 1020 { 1021 "inputs": {"question": "What is Spark?"}, 1022 "expectations": { 1023 "expected_response": "Spark is a fast data processing engine", 1024 "max_length": 1, 1025 }, 1026 }, 1027 ] 1028 1029 result = mlflow.genai.evaluate( 1030 predict_fn=TestModel().predict, 1031 data=data, 1032 scorers=[exact_match, is_concise, relevance, has_trace], 1033 ) 1034 1035 metrics = result.metrics 1036 assert metrics["exact_match/mean"] == 0.0 1037 assert metrics["is_concise/mean"] == 0.5 1038 assert metrics["relevance/mean"] == 1.0 1039 assert metrics["has_trace/mean"] == 1.0 1040 1041 traces = get_traces() 1042 if is_enabled: 1043 assert len(traces) == len(data) * 5 # 1 trace for prediction + 4 scorer traces 1044 else: 1045 assert len(traces) == len(data) 1046 1047 # Traces should be associated with the eval run 1048 traces = mlflow.search_traces( 1049 filter_string="tags.`mlflow.eval.requestId` != 'None'", 1050 run_id=result.run_id, 1051 return_type="list", 1052 ) 1053 assert len(traces) == len(data) 1054 1055 # Each assessment should have a source trace ID 1056 for trace in traces: 1057 for a in trace.info.assessments: 1058 if isinstance(a, Feedback) and is_enabled: 1059 assert a.metadata[AssessmentMetadataKey.SCORER_TRACE_ID] is not None 1060 assert a.metadata[AssessmentMetadataKey.SCORER_TRACE_ID] != trace.info.trace_id 1061 else: 1062 assert AssessmentMetadataKey.SCORER_TRACE_ID not in a.metadata 1063 1064 1065 @pytest.mark.parametrize("diff_experiment_id", [True, False]) 1066 def test_eval_with_traces_log_spans_correctly(diff_experiment_id): 1067 exp_id = mlflow.set_experiment("traces exp").experiment_id 1068 with mlflow.start_span() as span: 1069 span.set_inputs({"question": "What is MLflow?"}) 1070 span.set_outputs({"answer": "MLflow is a tool for ML"}) 1071 span.set_attributes({"key": "value"}) 1072 with mlflow.start_span() as child_span: 1073 child_span.set_inputs("test") 1074 1075 # set to a different experiment 1076 if diff_experiment_id: 1077 mlflow.set_experiment("diff exp") 1078 1079 # search traces from the original experiment 1080 trace_df = mlflow.search_traces(locations=[exp_id]) 1081 1082 result = mlflow.genai.evaluate( 1083 data=trace_df, 1084 scorers=[has_trace], 1085 ) 1086 1087 assert result.metrics["has_trace/mean"] == 1.0 1088 1089 traces = get_traces() 1090 assert len(traces) == 1 1091 # copied trace should contain all spans 1092 assert len(traces[0].data.spans) == 2 1093 span = traces[0].data.spans[0] 1094 assert span.get_attribute("key") == "value" 1095 assert span.inputs == {"question": "What is MLflow?"} 1096 assert span.outputs == {"answer": "MLflow is a tool for ML"} 1097 child_span = traces[0].data.spans[1] 1098 assert child_span.inputs == "test" 1099 1100 1101 def test_evaluate_with_mixed_single_turn_and_multi_turn_scorers(server_config): 1102 """Test evaluation with a combination of single-turn and multi-turn scorers. 1103 1104 Validates that: 1105 - Single-turn scorers are applied to all traces 1106 - Multi-turn scorers are only applied to the first trace of each session 1107 """ 1108 1109 # Define a multi-turn scorer that counts conversation turns 1110 class ConversationLengthScorer(mlflow.genai.Scorer): 1111 def __init__(self): 1112 super().__init__(name="conversation_length") 1113 1114 @property 1115 def is_session_level_scorer(self) -> bool: 1116 return True 1117 1118 def __call__(self, session=None, **kwargs): 1119 """Return the number of turns in the conversation.""" 1120 return len(session or []) 1121 1122 # Define a single-turn scorer 1123 @scorer 1124 def response_length(outputs) -> int: 1125 """Return the length of the response.""" 1126 return len(outputs) if isinstance(outputs, str) else 0 1127 1128 # Create a traced model function 1129 @mlflow.trace(span_type=SpanType.CHAT_MODEL) 1130 def model(question, session_id): 1131 mlflow.update_current_trace(metadata={"mlflow.trace.session": session_id}) 1132 return f"Answer to: {question}" 1133 1134 # Generate traces for 2 sessions (3 turns + 2 turns = 5 total traces) 1135 mlflow.set_experiment("multi_turn_test") 1136 with mlflow.start_run() as run: 1137 # Session 1: 3 turns 1138 for q in ["Q1", "Q2", "Q3"]: 1139 model(q, session_id="session_1") 1140 1141 # Session 2: 2 turns 1142 for q in ["Q4", "Q5"]: 1143 model(q, session_id="session_2") 1144 1145 # Get traces for evaluation 1146 traces = mlflow.search_traces( 1147 locations=[run.info.experiment_id], filter_string=f'run_id = "{run.info.run_id}"' 1148 ) 1149 1150 # Evaluate with both single-turn and multi-turn scorers 1151 result = mlflow.genai.evaluate( 1152 data=traces, scorers=[response_length, ConversationLengthScorer()] 1153 ) 1154 1155 # Validate results 1156 result_df = result.result_df 1157 1158 # Should have one row per trace 1159 assert len(result_df) == 5, f"Expected 5 traces, got {len(result_df)}" 1160 1161 # Single-turn scorer should be applied to all traces 1162 single_turn_scores = result_df["response_length/value"].notna().sum() 1163 assert single_turn_scores == 5, ( 1164 f"Expected single-turn scores for all 5 traces, got {single_turn_scores}" 1165 ) 1166 1167 # Multi-turn scorer should only be applied to first trace of each session (2 total) 1168 multi_turn_scores = result_df["conversation_length/value"].notna().sum() 1169 assert multi_turn_scores == 2, ( 1170 f"Expected multi-turn scores for 2 sessions (first trace only), got {multi_turn_scores}" 1171 ) 1172 1173 # Validate the conversation length values 1174 # Session 1 should have 3 turns, Session 2 should have 2 turns 1175 conv_lengths = result_df["conversation_length/value"].dropna().sort_values().tolist() 1176 assert conv_lengths == [2.0, 3.0], ( 1177 f"Expected conversation lengths [2.0, 3.0], got {conv_lengths}" 1178 ) 1179 1180 # Validate that all single-turn scores are the same (based on our dummy response) 1181 response_lengths = result_df["response_length/value"].dropna() 1182 # All responses should be "Answer to: Qx" format, so lengths should be consistent 1183 assert all(length > 0 for length in response_lengths) 1184 1185 # Verify multi-turn assessments were persisted to the backend (not just in-memory). 1186 # Fetch each trace from the backend and check the first trace of each session has 1187 # the conversation_length assessment logged. 1188 all_traces = mlflow.search_traces( 1189 locations=[run.info.experiment_id], 1190 run_id=result.run_id, 1191 return_type="list", 1192 ) 1193 for trace in all_traces: 1194 assessment_names = {a.name for a in trace.info.assessments} 1195 has_session = "mlflow.trace.session" in trace.info.trace_metadata 1196 is_first_in_session = "conversation_length" in assessment_names 1197 if has_session and is_first_in_session: 1198 a = next(a for a in trace.info.assessments if a.name == "conversation_length") 1199 assert isinstance(a, Feedback) 1200 assert a.value in (2.0, 3.0) 1201 1202 1203 def test_evaluate_with_evaluation_dataset_and_session_level_scorers(): 1204 # Define a session-level scorer 1205 class ConversationLengthScorer(mlflow.genai.Scorer): 1206 def __init__(self): 1207 super().__init__(name="conversation_length") 1208 1209 @property 1210 def is_session_level_scorer(self) -> bool: 1211 return True 1212 1213 def __call__(self, session=None, **kwargs): 1214 return len(session or []) 1215 1216 # Create traces with session metadata (2 traces in session_1, 1 in session_2) 1217 @mlflow.trace(span_type=mlflow.entities.SpanType.CHAT_MODEL) 1218 def model(question, session_id): 1219 mlflow.update_current_trace(metadata={"mlflow.trace.session": session_id}) 1220 return f"Answer to {question}" 1221 1222 model("Q1", session_id="session_1") 1223 trace_1 = mlflow.get_trace(mlflow.get_last_active_trace_id()) 1224 1225 model("Q2", session_id="session_1") 1226 trace_2 = mlflow.get_trace(mlflow.get_last_active_trace_id()) 1227 1228 model("Q3", session_id="session_2") 1229 trace_3 = mlflow.get_trace(mlflow.get_last_active_trace_id()) 1230 1231 # Create dataset from traces 1232 dataset = create_dataset(name="multi_turn_dataset") 1233 dataset.merge_records([trace_1, trace_2, trace_3]) 1234 1235 # Evaluate with session-level scorer 1236 result = mlflow.genai.evaluate(data=dataset, scorers=[ConversationLengthScorer()]) 1237 result_df = result.result_df 1238 1239 # Session-level scorer should produce 2 scores (one per session) 1240 assert "conversation_length/value" in result_df.columns 1241 assert result_df["conversation_length/value"].notna().sum() == 2 1242 1243 # Verify conversation lengths: session_1 has 2 traces, session_2 has 1 trace 1244 conv_lengths = result_df["conversation_length/value"].dropna().sort_values().tolist() 1245 assert conv_lengths == [1.0, 2.0] 1246 1247 1248 def test_evaluate_dataset_mixed_traces_with_and_without_sessions(): 1249 class SessionScorer(mlflow.genai.Scorer): 1250 def __init__(self): 1251 super().__init__(name="session_length") 1252 1253 @property 1254 def is_session_level_scorer(self): 1255 return True 1256 1257 def __call__(self, session=None, **kwargs): 1258 return len(session or []) 1259 1260 # Create mixed traces 1261 @mlflow.trace(span_type=mlflow.entities.SpanType.CHAT_MODEL) 1262 def model_with_session(question, session_id): 1263 mlflow.update_current_trace(metadata={"mlflow.trace.session": session_id}) 1264 return "answer" 1265 1266 @mlflow.trace(span_type=mlflow.entities.SpanType.CHAT_MODEL) 1267 def model_without_session(question): 1268 return "answer" 1269 1270 model_with_session("Q1", "session_1") 1271 trace_1 = mlflow.get_trace(mlflow.get_last_active_trace_id()) 1272 1273 model_without_session("Q2") 1274 trace_2 = mlflow.get_trace(mlflow.get_last_active_trace_id()) 1275 1276 model_with_session("Q3", "session_1") 1277 trace_3 = mlflow.get_trace(mlflow.get_last_active_trace_id()) 1278 1279 # Create dataset and evaluate 1280 dataset = create_dataset(name="mixed_dataset") 1281 dataset.merge_records([trace_1, trace_2, trace_3]) 1282 1283 result = mlflow.genai.evaluate(data=dataset, scorers=[SessionScorer()]) 1284 result_df = result.result_df 1285 1286 # Should have 1 session-level score (for session_1 with 2 traces) 1287 # The trace without session should not be scored by session-level scorer 1288 assert result_df["session_length/value"].notna().sum() == 1 1289 assert result_df["session_length/value"].dropna().iloc[0] == 2.0 1290 1291 1292 def test_max_scorer_workers_env_var(monkeypatch): 1293 @scorer 1294 def dummy_scorer_1(outputs): 1295 return True 1296 1297 @scorer 1298 def dummy_scorer_2(outputs): 1299 return True 1300 1301 @scorer 1302 def dummy_scorer_3(outputs): 1303 return True 1304 1305 def _validate_scorer_max_workers(expected_max_workers, num_scorers): 1306 scorers_list = [dummy_scorer_1, dummy_scorer_2, dummy_scorer_3][:num_scorers] 1307 with mock.patch( 1308 "mlflow.genai.evaluation.harness.ThreadPoolExecutor", wraps=ThreadPoolExecutor 1309 ) as mock_executor: 1310 mlflow.genai.evaluate( 1311 data=[ 1312 { 1313 "inputs": {"question": "What is MLflow?"}, 1314 "outputs": "MLflow is a tool for ML", 1315 } 1316 ], 1317 scorers=scorers_list, 1318 ) 1319 # Find the scorer pool call by its thread_name_prefix 1320 scorer_call = next( 1321 call 1322 for call in mock_executor.call_args_list 1323 if call[1].get("thread_name_prefix") == "MlflowGenAIEvalScorer" 1324 ) 1325 assert scorer_call[1]["max_workers"] == expected_max_workers 1326 1327 # default scorer workers is 10, but limited by number of scorers (3) 1328 _validate_scorer_max_workers(expected_max_workers=3, num_scorers=3) 1329 1330 # override scorer workers with env var (limit to 2) 1331 monkeypatch.setenv("MLFLOW_GENAI_EVAL_MAX_SCORER_WORKERS", "2") 1332 _validate_scorer_max_workers(expected_max_workers=2, num_scorers=3) 1333 1334 # when num_scorers < max_scorer_workers, use num_scorers 1335 monkeypatch.setenv("MLFLOW_GENAI_EVAL_MAX_SCORER_WORKERS", "10") 1336 _validate_scorer_max_workers(expected_max_workers=2, num_scorers=2) 1337 1338 # set to 1 for sequential execution 1339 monkeypatch.setenv("MLFLOW_GENAI_EVAL_MAX_SCORER_WORKERS", "1") 1340 _validate_scorer_max_workers(expected_max_workers=1, num_scorers=3) 1341 1342 1343 # ===================== ConversationSimulator Integration Tests ===================== 1344 1345 1346 def test_evaluate_with_conversation_simulator_requires_predict_fn(): 1347 simulator = ConversationSimulator( 1348 test_cases=[{"goal": "Learn about MLflow"}], 1349 max_turns=2, 1350 ) 1351 1352 with pytest.raises(MlflowException, match="predict_fn is required"): 1353 mlflow.genai.evaluate( 1354 data=simulator, 1355 scorers=[has_trace], 1356 ) 1357 1358 1359 def test_evaluate_with_conversation_simulator_empty_simulation_error(): 1360 def failing_predict_fn(input: list[dict[str, Any]], **kwargs): 1361 raise Exception("Simulated failure") 1362 1363 simulator = ConversationSimulator( 1364 test_cases=[{"goal": "Learn about MLflow"}], 1365 max_turns=2, 1366 ) 1367 1368 with mock.patch( 1369 "mlflow.genai.simulators.simulator.invoke_model_without_tracing" 1370 ) as mock_invoke: 1371 # Simulate a failure that produces no traces 1372 mock_invoke.side_effect = Exception("LLM call failed") 1373 1374 with pytest.raises(MlflowException, match="Simulation produced no traces"): 1375 mlflow.genai.evaluate( 1376 data=simulator, 1377 predict_fn=failing_predict_fn, 1378 scorers=[has_trace], 1379 ) 1380 1381 1382 def test_session_level_evaluation_with_predict_fn_without_simulator(): 1383 class SessionScorer(mlflow.genai.Scorer): 1384 def __init__(self): 1385 super().__init__(name="session_scorer") 1386 1387 @property 1388 def is_session_level_scorer(self): 1389 return True 1390 1391 def __call__(self, session=None, **kwargs): 1392 return len(session or []) 1393 1394 data = [ 1395 {"inputs": {"question": "What is MLflow?"}, "outputs": "MLflow is a tool"}, 1396 ] 1397 1398 with pytest.raises( 1399 MlflowException, 1400 match=( 1401 r"Session-level scorers require traces with session IDs.*" 1402 r"session_scorer.*" 1403 r"Either pass a ConversationSimulator to `data` with `predict_fn`" 1404 ), 1405 ): 1406 mlflow.genai.evaluate( 1407 data=data, 1408 predict_fn=TestModel().predict, 1409 scorers=[SessionScorer()], 1410 ) 1411 1412 1413 def test_evaluate_with_conversation_simulator_calls_simulate(): 1414 simulator = ConversationSimulator( 1415 test_cases=[{"goal": "Learn MLflow"}], 1416 max_turns=2, 1417 ) 1418 1419 def mock_predict_fn(input: list[dict[str, Any]], **kwargs): 1420 return {"output": "Mock response"} 1421 1422 with mock.patch.object(simulator, "simulate") as mock_simulate: 1423 # Return empty list to trigger the "no traces" error 1424 mock_simulate.return_value = [] 1425 1426 with pytest.raises(MlflowException, match="Simulation produced no traces"): 1427 mlflow.genai.evaluate( 1428 data=simulator, 1429 predict_fn=mock_predict_fn, 1430 scorers=[has_trace], 1431 ) 1432 1433 # Verify simulate was called with predict_fn 1434 mock_simulate.assert_called_once_with(mock_predict_fn) 1435 1436 1437 @scorer 1438 def always_pass(outputs): 1439 return True 1440 1441 1442 @pytest.mark.parametrize( 1443 ("expectations_values", "expected_count"), 1444 [ 1445 ([None, {}], 2), 1446 ([float("nan")], 1), 1447 ([{"expected": "value"}, None, {}], 3), 1448 ], 1449 ) 1450 def test_evaluate_handles_empty_expectations(expectations_values, expected_count): 1451 data = [ 1452 { 1453 "inputs": {"question": f"Q{i}"}, 1454 "outputs": f"A{i}", 1455 "expectations": exp, 1456 } 1457 for i, exp in enumerate(expectations_values) 1458 ] 1459 1460 result = mlflow.genai.evaluate(data=data, scorers=[always_pass]) 1461 1462 assert result is not None 1463 assert result.metrics is not None 1464 assert result.result_df is not None 1465 assert len(result.result_df) == expected_count 1466 assert result.metrics["always_pass/mean"] == 1.0 1467 1468 1469 from mlflow.genai.simulators.simulator import BaseSimulatedUserAgent 1470 1471 1472 class MockUserAgent(BaseSimulatedUserAgent): 1473 def __init__(self, **kwargs): 1474 pass 1475 1476 def generate_message(self, context): 1477 if context.turn == 0: 1478 return f"Hello, I want to {context.goal}" 1479 return "[GOAL ACHIEVED]" 1480 1481 1482 def test_evaluate_with_simulator_creates_single_run(tmp_path): 1483 mlflow.set_tracking_uri(f"sqlite:///{tmp_path}/mlflow.db") 1484 mlflow.set_experiment("test-experiment") 1485 1486 simulator = ConversationSimulator( 1487 test_cases=[{"goal": "get help"}], 1488 max_turns=2, 1489 user_agent_class=MockUserAgent, 1490 ) 1491 1492 def mock_predict_fn(input: list[dict[str, Any]], **kwargs): 1493 return {"content": "Response"} 1494 1495 with mock.patch( 1496 "mlflow.genai.simulators.simulator.invoke_model_without_tracing", 1497 return_value='{"rationale": "Goal achieved!", "result": "yes"}', 1498 ): 1499 mlflow.genai.evaluate(data=simulator, predict_fn=mock_predict_fn, scorers=[]) 1500 1501 runs = mlflow.search_runs() 1502 assert len(runs) == 1 1503 1504 1505 def test_evaluate_with_simulator_within_parent_run(tmp_path): 1506 mlflow.set_tracking_uri(f"sqlite:///{tmp_path}/mlflow.db") 1507 mlflow.set_experiment("test-experiment") 1508 1509 simulator = ConversationSimulator( 1510 test_cases=[{"goal": "get help"}], 1511 max_turns=2, 1512 user_agent_class=MockUserAgent, 1513 ) 1514 1515 def mock_predict_fn(input: list[dict[str, Any]], **kwargs): 1516 return {"content": "Response"} 1517 1518 with mock.patch( 1519 "mlflow.genai.simulators.simulator.invoke_model_without_tracing", 1520 return_value='{"rationale": "Goal achieved!", "result": "yes"}', 1521 ): 1522 with mlflow.start_run(run_name="parent-run") as parent_run: 1523 parent_run_id = parent_run.info.run_id 1524 mlflow.genai.evaluate(data=simulator, predict_fn=mock_predict_fn, scorers=[]) 1525 assert mlflow.active_run().info.run_id == parent_run_id 1526 1527 runs = mlflow.search_runs() 1528 assert len(runs) == 1 1529 assert runs.iloc[0]["tags.mlflow.runName"] == "parent-run" 1530 1531 1532 # ===================== Rate Limiting & Pipelining Tests ===================== 1533 1534 1535 def test_predict_rate_limiter_is_wired_to_predict_fn(monkeypatch): 1536 monkeypatch.setenv("MLFLOW_GENAI_EVAL_PREDICT_RATE_LIMIT", "100") 1537 1538 with mock.patch.object( 1539 RPSRateLimiter, "acquire", autospec=True, side_effect=lambda self: None 1540 ) as mock_acquire: 1541 data = [{"inputs": {"q": f"Q{i}"}} for i in range(5)] 1542 mlflow.genai.evaluate(data=data, predict_fn=lambda q: "answer", scorers=[]) 1543 assert mock_acquire.call_count == 5 1544 1545 1546 def test_scorer_rate_limiter_is_wired_to_scorers(monkeypatch): 1547 monkeypatch.setenv("MLFLOW_GENAI_EVAL_SCORER_RATE_LIMIT", "100") 1548 1549 with mock.patch.object( 1550 RPSRateLimiter, "acquire", autospec=True, side_effect=lambda self: None 1551 ) as mock_acquire: 1552 data = [{"inputs": {"q": f"Q{i}"}, "outputs": "a"} for i in range(3)] 1553 mlflow.genai.evaluate(data=data, scorers=[always_pass, always_pass]) 1554 # 3 items x 2 scorers = 6 scorer acquire calls (no predict_fn → no predict acquires) 1555 assert mock_acquire.call_count == 6 1556 1557 1558 def test_pipelining_scores_while_predicts_pending(monkeypatch): 1559 monkeypatch.setenv("MLFLOW_GENAI_EVAL_MAX_WORKERS", "2") 1560 1561 # Gate that blocks predicts after the first batch completes 1562 gate = threading.Event() 1563 predict_call_count = 0 1564 predict_lock = threading.Lock() 1565 scoring_started_while_predicts_pending = threading.Event() 1566 1567 def gated_predict_fn(q): 1568 nonlocal predict_call_count 1569 with predict_lock: 1570 predict_call_count += 1 1571 call_num = predict_call_count 1572 # Let the first 2 predictions through immediately, block the rest 1573 if call_num > 2: 1574 gate.wait(timeout=5) 1575 return "answer" 1576 1577 @scorer 1578 def signaling_scorer(outputs): 1579 # If we're scoring while predicts are still pending, signal success 1580 with predict_lock: 1581 current_predicts = predict_call_count 1582 if ( 1583 current_predicts > 0 and current_predicts < 6 1584 ): # 6 total items, so some must still be pending 1585 scoring_started_while_predicts_pending.set() 1586 return True 1587 1588 data = [{"inputs": {"q": f"Q{i}"}} for i in range(6)] 1589 1590 try: 1591 # Run evaluate in a background thread so we can release the gate 1592 result_holder = [] 1593 1594 def run_eval(): 1595 result = mlflow.genai.evaluate( 1596 data=data, predict_fn=gated_predict_fn, scorers=[signaling_scorer] 1597 ) 1598 result_holder.append(result) 1599 1600 eval_thread = threading.Thread(name="test-evaluation-eval", target=run_eval) 1601 eval_thread.start() 1602 1603 # Wait for scoring to signal it started while predicts are pending 1604 signaled = scoring_started_while_predicts_pending.wait(timeout=10) 1605 1606 # Release all blocked predicts 1607 gate.set() 1608 eval_thread.join(timeout=30) 1609 1610 assert signaled 1611 finally: 1612 gate.set() # Ensure we don't leave threads blocked 1613 1614 1615 def test_backpressure_limits_in_flight_items(monkeypatch): 1616 workers = 2 1617 monkeypatch.setenv("MLFLOW_GENAI_EVAL_MAX_WORKERS", str(workers)) 1618 monkeypatch.setenv("MLFLOW_GENAI_EVAL_PREDICT_RATE_LIMIT", "0") 1619 monkeypatch.setenv("MLFLOW_GENAI_EVAL_SCORER_RATE_LIMIT", "0") 1620 # Skip pre-flight predict_fn call that runs outside the backpressure semaphore 1621 monkeypatch.setenv("MLFLOW_GENAI_EVAL_SKIP_TRACE_VALIDATION", "true") 1622 buffer = backpressure_buffer(workers) 1623 1624 max_in_flight = 0 1625 in_flight = 0 1626 lock = threading.Lock() 1627 predict_done = threading.Semaphore(0) 1628 score_gate = threading.Event() 1629 1630 def tracking_predict(q): 1631 nonlocal in_flight, max_in_flight 1632 with lock: 1633 in_flight += 1 1634 max_in_flight = max(max_in_flight, in_flight) 1635 predict_done.release() 1636 return "answer" 1637 1638 @scorer 1639 def blocking_scorer(outputs): 1640 score_gate.wait() 1641 nonlocal in_flight 1642 with lock: 1643 in_flight -= 1 1644 return True 1645 1646 num_items = 3 * buffer 1647 data = [{"inputs": {"q": f"Q{i}"}} for i in range(num_items)] 1648 1649 eval_thread = threading.Thread( 1650 name="test-evaluation-blocking", 1651 target=lambda: mlflow.genai.evaluate( 1652 data=data, predict_fn=tracking_predict, scorers=[blocking_scorer] 1653 ), 1654 ) 1655 1656 try: 1657 eval_thread.start() 1658 1659 # Wait for exactly `buffer` predicts to complete. The semaphore guarantees 1660 # each acquire() returns once a predict runs. After this, the submit thread 1661 # is blocked on the backpressure semaphore and no more predicts can run 1662 # (scorers are blocked on score_gate). 1663 for _ in range(buffer): 1664 predict_done.acquire() 1665 1666 with lock: 1667 observed_max = max_in_flight 1668 finally: 1669 score_gate.set() 1670 eval_thread.join() 1671 1672 # The semaphore bounds in-flight items. Without backpressure all 1673 # num_items would pile up. 1674 assert observed_max == buffer 1675 1676 1677 def test_evaluate_logs_scorer_failure_summary(): 1678 @scorer 1679 def failing_scorer(inputs, outputs): 1680 raise ValueError("Model endpoint not found") 1681 1682 @scorer 1683 def working_scorer(inputs, outputs): 1684 return True 1685 1686 data = [ 1687 { 1688 "inputs": {"question": "What is MLflow?"}, 1689 "outputs": "MLflow is a platform", 1690 }, 1691 { 1692 "inputs": {"question": "What is Python?"}, 1693 "outputs": "Python is a language", 1694 }, 1695 ] 1696 1697 with mock.patch("mlflow.genai.evaluation.harness._logger.warning") as mock_warning: 1698 result = mlflow.genai.evaluate( 1699 data=data, 1700 scorers=[failing_scorer, working_scorer], 1701 ) 1702 1703 # Evaluation should complete without raising an exception 1704 assert "working_scorer/mean" in result.metrics 1705 1706 # Verify warning was logged with failure summary 1707 warning_calls = [call.args[0] for call in mock_warning.call_args_list] 1708 failure_warnings = [ 1709 msg 1710 for msg in warning_calls 1711 if "Some scorer invocations failed during evaluation" in msg 1712 ] 1713 warning_message = failure_warnings[0] 1714 assert "'failing_scorer': 2/2 failed" in warning_message 1715 assert "Check individual trace assessments for detailed error messages" in warning_message 1716 1717 1718 def test_evaluate_no_warning_when_all_scorers_succeed(): 1719 @scorer 1720 def working_scorer_1(inputs, outputs): 1721 return True 1722 1723 @scorer 1724 def working_scorer_2(inputs, outputs): 1725 return False 1726 1727 data = [ 1728 { 1729 "inputs": {"question": "What is MLflow?"}, 1730 "outputs": "MLflow is a platform", 1731 }, 1732 ] 1733 1734 with mock.patch("mlflow.genai.evaluation.harness._logger.warning") as mock_warning: 1735 result = mlflow.genai.evaluate( 1736 data=data, 1737 scorers=[working_scorer_1, working_scorer_2], 1738 ) 1739 1740 assert "working_scorer_1/mean" in result.metrics 1741 assert "working_scorer_2/mean" in result.metrics 1742 warning_calls = [call.args[0] for call in mock_warning.call_args_list] 1743 assert not any( 1744 "Some scorer invocations failed during evaluation" in msg for msg in warning_calls 1745 ) 1746 1747 1748 def test_evaluate_logs_scorer_failure_summary_with_multi_turn_scorers(): 1749 1750 @mlflow.trace 1751 def model(question, session_id): 1752 mlflow.update_current_trace(metadata={"mlflow.trace.session": session_id}) 1753 return f"Answer to {question}" 1754 1755 model("Q1", session_id="session_1") 1756 trace_1 = mlflow.get_trace(mlflow.get_last_active_trace_id()) 1757 1758 model("Q2", session_id="session_1") 1759 trace_2 = mlflow.get_trace(mlflow.get_last_active_trace_id()) 1760 1761 model("Q3", session_id="session_2") 1762 trace_3 = mlflow.get_trace(mlflow.get_last_active_trace_id()) 1763 1764 dataset = create_dataset(name="multi_turn_failure_test") 1765 dataset.merge_records([trace_1, trace_2, trace_3]) 1766 1767 with mock.patch("mlflow.genai.evaluation.harness._logger.warning") as mock_warning: 1768 result = mlflow.genai.evaluate( 1769 data=dataset, 1770 scorers=[FailingSessionScorer(), WorkingSessionScorer()], 1771 ) 1772 1773 assert "working_session_scorer/mean" in result.metrics 1774 1775 warning_calls = [call.args[0] for call in mock_warning.call_args_list] 1776 failure_warnings = [ 1777 msg 1778 for msg in warning_calls 1779 if "Some scorer invocations failed during evaluation" in msg 1780 ] 1781 assert len(failure_warnings) > 0 1782 warning_message = failure_warnings[0] 1783 assert "'failing_session_scorer': 2/2 failed" in warning_message 1784 assert "Check individual trace assessments for detailed error messages" in warning_message 1785 1786 1787 def test_evaluate_logs_scorer_failure_summary_with_mixed_scorers(): 1788 @scorer 1789 def failing_single_turn(inputs, outputs): 1790 raise ValueError("Single turn scorer error") 1791 1792 @mlflow.trace 1793 def model(question, session_id): 1794 mlflow.update_current_trace(metadata={"mlflow.trace.session": session_id}) 1795 return f"Answer to {question}" 1796 1797 model("Q1", session_id="session_1") 1798 trace_1 = mlflow.get_trace(mlflow.get_last_active_trace_id()) 1799 1800 model("Q2", session_id="session_1") 1801 trace_2 = mlflow.get_trace(mlflow.get_last_active_trace_id()) 1802 1803 model("Q3", session_id="session_2") 1804 trace_3 = mlflow.get_trace(mlflow.get_last_active_trace_id()) 1805 1806 model("Q4", session_id="session_2") 1807 trace_4 = mlflow.get_trace(mlflow.get_last_active_trace_id()) 1808 1809 # Create dataset from traces 1810 dataset = create_dataset(name="mixed_scorer_failure_test") 1811 dataset.merge_records([trace_1, trace_2, trace_3, trace_4]) 1812 1813 with mock.patch("mlflow.genai.evaluation.harness._logger.warning") as mock_warning: 1814 result = mlflow.genai.evaluate( 1815 data=dataset, 1816 scorers=[ 1817 failing_single_turn, 1818 always_pass, 1819 FailingSessionScorer(), 1820 WorkingSessionScorer(), 1821 ], 1822 ) 1823 1824 assert "always_pass/mean" in result.metrics 1825 assert "working_session_scorer/mean" in result.metrics 1826 1827 warning_calls = [call.args[0] for call in mock_warning.call_args_list] 1828 failure_warnings = [ 1829 msg 1830 for msg in warning_calls 1831 if "Some scorer invocations failed during evaluation" in msg 1832 ] 1833 assert len(failure_warnings) > 0 1834 warning_message = failure_warnings[0] 1835 assert "'failing_single_turn': 4/4 failed" in warning_message 1836 assert "'failing_session_scorer': 2/2 failed" in warning_message 1837 assert "Check individual trace assessments for detailed error messages" in warning_message 1838 1839 1840 def test_no_rate_limit_backward_compat(monkeypatch): 1841 monkeypatch.setenv("MLFLOW_GENAI_EVAL_PREDICT_RATE_LIMIT", "0") 1842 monkeypatch.setenv("MLFLOW_GENAI_EVAL_SCORER_RATE_LIMIT", "0") 1843 data = [ 1844 { 1845 "inputs": {"question": "What is MLflow?"}, 1846 "outputs": "MLflow is a tool for ML", 1847 "expectations": {"expected_response": "MLflow is a tool for ML", "max_length": 100}, 1848 }, 1849 ] 1850 1851 result = mlflow.genai.evaluate(data=data, scorers=[exact_match, is_concise]) 1852 1853 assert result.metrics["exact_match/mean"] == 1.0 1854 assert result.metrics["is_concise/mean"] == 1.0 1855 1856 1857 # ===================== Retry & Adaptive Rate Limiting Tests ===================== 1858 1859 1860 def test_predict_retries_on_429(monkeypatch): 1861 monkeypatch.setenv("MLFLOW_GENAI_EVAL_PREDICT_RATE_LIMIT", "0") 1862 monkeypatch.setenv("MLFLOW_GENAI_EVAL_MAX_RETRIES", "3") 1863 1864 attempts = [] 1865 1866 def flaky_predict(q): 1867 attempts.append(1) 1868 # First call is the pre-flight validation check — let it pass. 1869 # Fail on calls 2 and 3, succeed on call 4. 1870 if 1 < len(attempts) < 4: 1871 raise Exception("429 Too Many Requests") 1872 return "answer" 1873 1874 data = [{"inputs": {"q": "Q1"}}] 1875 result = mlflow.genai.evaluate(data=data, predict_fn=flaky_predict, scorers=[always_pass]) 1876 1877 # 1 validation + 1 fail + 1 fail + 1 success = 4 total 1878 assert len(attempts) == 4 1879 assert result.metrics["always_pass/mean"] == 1.0 1880 1881 1882 def test_scorer_retries_on_429(monkeypatch): 1883 monkeypatch.setenv("MLFLOW_GENAI_EVAL_PREDICT_RATE_LIMIT", "0") 1884 monkeypatch.setenv("MLFLOW_GENAI_EVAL_SCORER_RATE_LIMIT", "0") 1885 monkeypatch.setenv("MLFLOW_GENAI_EVAL_MAX_RETRIES", "3") 1886 1887 attempts = [] 1888 1889 @scorer 1890 def flaky_scorer(outputs): 1891 attempts.append(1) 1892 if len(attempts) < 3: 1893 raise Exception("429 Too Many Requests") 1894 return True 1895 1896 data = [{"inputs": {"q": "Q1"}, "outputs": "a"}] 1897 result = mlflow.genai.evaluate(data=data, scorers=[flaky_scorer]) 1898 1899 assert len(attempts) == 3 1900 assert result.metrics["flaky_scorer/mean"] == 1.0 1901 1902 1903 def test_adaptive_rate_reduces_on_429(monkeypatch): 1904 monkeypatch.setenv("MLFLOW_GENAI_EVAL_PREDICT_RATE_LIMIT", "auto") 1905 monkeypatch.setenv("MLFLOW_GENAI_EVAL_MAX_RETRIES", "3") 1906 1907 rate_after_throttle = [] 1908 1909 original_report_throttle = RPSRateLimiter.report_throttle 1910 1911 def spy_report_throttle(self): 1912 original_report_throttle(self) 1913 rate_after_throttle.append(self._rps) 1914 1915 attempts = [] 1916 1917 def flaky_predict(q): 1918 attempts.append(1) 1919 # First call is the pre-flight validation check — let it pass. 1920 # Fail on call 2 to trigger a throttle event. 1921 if len(attempts) == 2: 1922 raise Exception("429 Too Many Requests") 1923 return "answer" 1924 1925 data = [{"inputs": {"q": f"Q{i}"}} for i in range(3)] 1926 1927 with mock.patch.object(RPSRateLimiter, "report_throttle", spy_report_throttle): 1928 result = mlflow.genai.evaluate(data=data, predict_fn=flaky_predict, scorers=[always_pass]) 1929 1930 assert result.metrics["always_pass/mean"] == 1.0 1931 # At least one throttle event observed, and the rate was reduced 1932 assert len(rate_after_throttle) >= 1 1933 assert rate_after_throttle[0] < AUTO_INITIAL_RPS 1934 1935 1936 @pytest.mark.parametrize( 1937 ("trace_or_none", "run_id"), 1938 [ 1939 (None, "run-1"), 1940 ( 1941 Trace( 1942 info=create_test_trace_info_with_uc_table( 1943 trace_id="tr-uc", catalog_name="catalog", schema_name="schema" 1944 ), 1945 data=TraceData(spans=[]), 1946 ), 1947 "run-1", 1948 ), 1949 ], 1950 ids=["none_trace", "uc_schema_trace"], 1951 ) 1952 def test_should_clone_trace_returns_false_early(trace_or_none, run_id): 1953 assert _should_clone_trace(trace_or_none, run_id=run_id) is False 1954 1955 1956 @pytest.mark.parametrize( 1957 ("experiment_id", "expected"), 1958 [ 1959 ("exp-999", True), 1960 ("exp-123", False), 1961 ], 1962 ids=["different_experiment", "matching_experiment"], 1963 ) 1964 def test_should_clone_trace_with_explicit_experiment_id( 1965 mlflow_experiment_trace, experiment_id, expected 1966 ): 1967 with mock.patch( 1968 "mlflow.genai.evaluation.harness._does_store_support_trace_linking", 1969 return_value=True, 1970 ) as mock_store: 1971 result = _should_clone_trace( 1972 mlflow_experiment_trace, run_id="run-1", experiment_id=experiment_id 1973 ) 1974 assert result is expected 1975 if expected is False: 1976 mock_store.assert_called_once() 1977 1978 1979 def test_should_clone_trace_falls_back_to_get_experiment_id_when_none(mlflow_experiment_trace): 1980 with ( 1981 mock.patch( 1982 "mlflow.tracking.fluent._get_experiment_id", 1983 return_value="exp-999", 1984 ) as mock_get_exp, 1985 mock.patch( 1986 "mlflow.genai.evaluation.harness._does_store_support_trace_linking", 1987 return_value=True, 1988 ), 1989 ): 1990 result = _should_clone_trace(mlflow_experiment_trace, run_id="run-1", experiment_id=None) 1991 mock_get_exp.assert_called_once() 1992 assert result is True 1993 1994 1995 def test_should_clone_trace_does_not_call_get_experiment_id_when_provided(mlflow_experiment_trace): 1996 with ( 1997 mock.patch("mlflow.tracking.fluent._get_experiment_id") as mock_get_exp, 1998 mock.patch( 1999 "mlflow.genai.evaluation.harness._does_store_support_trace_linking", 2000 return_value=True, 2001 ), 2002 ): 2003 _should_clone_trace(mlflow_experiment_trace, run_id="run-1", experiment_id="exp-123") 2004 mock_get_exp.assert_not_called()