test_session_utils.py
1 from unittest.mock import Mock, patch 2 3 import pytest 4 5 import mlflow 6 from mlflow.entities import TraceData, TraceInfo, TraceLocation, TraceState 7 from mlflow.entities.assessment import Feedback 8 from mlflow.entities.assessment_source import AssessmentSource, AssessmentSourceType 9 from mlflow.entities.trace import Trace 10 from mlflow.exceptions import MlflowException 11 from mlflow.genai import scorer 12 from mlflow.genai.evaluation.entities import EvalItem 13 from mlflow.genai.evaluation.session_utils import ( 14 classify_scorers, 15 evaluate_session_level_scorers, 16 get_first_trace_in_session, 17 group_traces_by_session, 18 validate_session_level_evaluation_inputs, 19 ) 20 from mlflow.tracing.constant import TraceMetadataKey 21 22 23 class _MultiTurnTestScorer: 24 """Helper class for testing multi-turn scorers.""" 25 26 def __init__(self, name="test_multi_turn_scorer"): 27 self.name = name 28 self.is_session_level_scorer = True 29 self.aggregations = [] 30 31 def run(self, session=None, **kwargs): 32 return True 33 34 def __call__(self, traces=None, **kwargs): 35 return 1.0 36 37 38 # ==================== Tests for classify_scorers ==================== 39 40 41 def test_classify_scorers_all_single_turn(): 42 @scorer 43 def custom_scorer1(outputs): 44 return 1.0 45 46 @scorer 47 def custom_scorer2(outputs): 48 return 2.0 49 50 scorers_list = [custom_scorer1, custom_scorer2] 51 single_turn, multi_turn = classify_scorers(scorers_list) 52 53 assert len(single_turn) == 2 54 assert len(multi_turn) == 0 55 assert single_turn == scorers_list 56 57 58 def test_classify_scorers_all_multi_turn(): 59 multi_turn_scorer1 = _MultiTurnTestScorer(name="multi_turn_scorer1") 60 multi_turn_scorer2 = _MultiTurnTestScorer(name="multi_turn_scorer2") 61 62 scorers_list = [multi_turn_scorer1, multi_turn_scorer2] 63 single_turn, multi_turn = classify_scorers(scorers_list) 64 65 assert len(single_turn) == 0 66 assert len(multi_turn) == 2 67 assert multi_turn == scorers_list 68 # Verify they are actually multi-turn 69 assert multi_turn_scorer1.is_session_level_scorer is True 70 assert multi_turn_scorer2.is_session_level_scorer is True 71 72 73 def test_classify_scorers_mixed(): 74 @scorer 75 def single_turn_scorer(outputs): 76 return 1.0 77 78 multi_turn_scorer = _MultiTurnTestScorer(name="multi_turn_scorer") 79 80 scorers_list = [single_turn_scorer, multi_turn_scorer] 81 single_turn, multi_turn = classify_scorers(scorers_list) 82 83 assert len(single_turn) == 1 84 assert len(multi_turn) == 1 85 assert single_turn[0] == single_turn_scorer 86 assert multi_turn[0] == multi_turn_scorer 87 # Verify properties 88 assert single_turn_scorer.is_session_level_scorer is False 89 assert multi_turn_scorer.is_session_level_scorer is True 90 91 92 def test_classify_scorers_empty_list(): 93 single_turn, multi_turn = classify_scorers([]) 94 95 assert len(single_turn) == 0 96 assert len(multi_turn) == 0 97 98 99 # ==================== Tests for group_traces_by_session ==================== 100 101 102 def _create_mock_trace(trace_id: str, session_id: str | None, request_time: int): 103 """Helper to create a mock trace with session_id and request_time.""" 104 trace_metadata = {} 105 if session_id is not None: 106 trace_metadata[TraceMetadataKey.TRACE_SESSION] = session_id 107 108 trace_info = TraceInfo( 109 trace_id=trace_id, 110 trace_location=TraceLocation.from_experiment_id("0"), 111 request_time=request_time, 112 execution_duration=1000, 113 state=TraceState.OK, 114 trace_metadata=trace_metadata, 115 tags={}, 116 ) 117 118 trace = Mock(spec=Trace) 119 trace.info = trace_info 120 trace.data = TraceData(spans=[]) 121 return trace 122 123 124 def _create_mock_eval_item(trace): 125 """Helper to create a mock EvalItem with a trace.""" 126 eval_item = Mock(spec=EvalItem) 127 eval_item.trace = trace 128 eval_item.source = None # Explicitly set to None so it doesn't return a Mock 129 return eval_item 130 131 132 def test_group_traces_by_session_single_session(): 133 trace1 = _create_mock_trace("trace-1", "session-1", 1000) 134 trace2 = _create_mock_trace("trace-2", "session-1", 2000) 135 trace3 = _create_mock_trace("trace-3", "session-1", 3000) 136 137 eval_item1 = _create_mock_eval_item(trace1) 138 eval_item2 = _create_mock_eval_item(trace2) 139 eval_item3 = _create_mock_eval_item(trace3) 140 141 eval_items = [eval_item1, eval_item2, eval_item3] 142 session_groups = group_traces_by_session(eval_items) 143 144 assert len(session_groups) == 1 145 assert "session-1" in session_groups 146 assert len(session_groups["session-1"]) == 3 147 148 # Check that all traces are included 149 session_traces = [item.trace for item in session_groups["session-1"]] 150 assert trace1 in session_traces 151 assert trace2 in session_traces 152 assert trace3 in session_traces 153 154 155 def test_group_traces_by_session_multiple_sessions(): 156 trace1 = _create_mock_trace("trace-1", "session-1", 1000) 157 trace2 = _create_mock_trace("trace-2", "session-1", 2000) 158 trace3 = _create_mock_trace("trace-3", "session-2", 1500) 159 trace4 = _create_mock_trace("trace-4", "session-2", 2500) 160 161 eval_items = [ 162 _create_mock_eval_item(trace1), 163 _create_mock_eval_item(trace2), 164 _create_mock_eval_item(trace3), 165 _create_mock_eval_item(trace4), 166 ] 167 168 session_groups = group_traces_by_session(eval_items) 169 170 assert len(session_groups) == 2 171 assert "session-1" in session_groups 172 assert "session-2" in session_groups 173 assert len(session_groups["session-1"]) == 2 174 assert len(session_groups["session-2"]) == 2 175 176 177 def test_group_traces_by_session_excludes_no_session_id(): 178 trace1 = _create_mock_trace("trace-1", "session-1", 1000) 179 trace2 = _create_mock_trace("trace-2", None, 2000) # No session_id 180 trace3 = _create_mock_trace("trace-3", "session-1", 3000) 181 182 eval_items = [ 183 _create_mock_eval_item(trace1), 184 _create_mock_eval_item(trace2), 185 _create_mock_eval_item(trace3), 186 ] 187 188 session_groups = group_traces_by_session(eval_items) 189 190 assert len(session_groups) == 1 191 assert "session-1" in session_groups 192 assert len(session_groups["session-1"]) == 2 193 # trace2 should not be included 194 session_traces = [item.trace for item in session_groups["session-1"]] 195 assert trace1 in session_traces 196 assert trace2 not in session_traces 197 assert trace3 in session_traces 198 199 200 def test_group_traces_by_session_excludes_none_traces(): 201 trace1 = _create_mock_trace("trace-1", "session-1", 1000) 202 203 eval_item1 = _create_mock_eval_item(trace1) 204 eval_item2 = Mock() 205 eval_item2.trace = None # No trace 206 eval_item2.source = None # No source 207 208 eval_items = [eval_item1, eval_item2] 209 session_groups = group_traces_by_session(eval_items) 210 211 assert len(session_groups) == 1 212 assert "session-1" in session_groups 213 assert len(session_groups["session-1"]) == 1 214 215 216 def test_group_traces_by_session_empty_list(): 217 session_groups = group_traces_by_session([]) 218 219 assert len(session_groups) == 0 220 assert session_groups == {} 221 222 223 # ==================== Tests for get_first_trace_in_session ==================== 224 225 226 def test_get_first_trace_in_session_chronological_order(): 227 trace1 = _create_mock_trace("trace-1", "session-1", 3000) 228 trace2 = _create_mock_trace("trace-2", "session-1", 1000) # Earliest 229 trace3 = _create_mock_trace("trace-3", "session-1", 2000) 230 231 eval_item1 = _create_mock_eval_item(trace1) 232 eval_item2 = _create_mock_eval_item(trace2) 233 eval_item3 = _create_mock_eval_item(trace3) 234 235 session_items = [eval_item1, eval_item2, eval_item3] 236 237 first_item = get_first_trace_in_session(session_items) 238 239 assert first_item.trace == trace2 240 assert first_item == eval_item2 241 242 243 def test_get_first_trace_in_session_single_trace(): 244 trace1 = _create_mock_trace("trace-1", "session-1", 1000) 245 eval_item1 = _create_mock_eval_item(trace1) 246 247 session_items = [eval_item1] 248 249 first_item = get_first_trace_in_session(session_items) 250 251 assert first_item.trace == trace1 252 assert first_item == eval_item1 253 254 255 def test_get_first_trace_in_session_same_timestamp(): 256 # When timestamps are equal, min() will return the first one in the list 257 trace1 = _create_mock_trace("trace-1", "session-1", 1000) 258 trace2 = _create_mock_trace("trace-2", "session-1", 1000) 259 trace3 = _create_mock_trace("trace-3", "session-1", 1000) 260 261 eval_item1 = _create_mock_eval_item(trace1) 262 eval_item2 = _create_mock_eval_item(trace2) 263 eval_item3 = _create_mock_eval_item(trace3) 264 265 session_items = [eval_item1, eval_item2, eval_item3] 266 267 first_item = get_first_trace_in_session(session_items) 268 269 # Should return one of the traces with timestamp 1000 (likely the first one) 270 assert first_item.trace.info.request_time == 1000 271 272 273 # ==================== Tests for validate_session_level_evaluation_inputs ==================== 274 275 276 def test_validate_session_level_evaluation_inputs_no_session_level_scorers(): 277 @scorer 278 def single_turn_scorer(outputs): 279 return 1.0 280 281 scorers_list = [single_turn_scorer] 282 283 # Should not raise any exceptions 284 validate_session_level_evaluation_inputs( 285 scorers=scorers_list, 286 predict_fn=None, 287 ) 288 289 290 def test_validate_session_level_evaluation_inputs_with_predict_fn(): 291 multi_turn_scorer = _MultiTurnTestScorer() 292 scorers_list = [multi_turn_scorer] 293 294 def dummy_predict_fn(): 295 return "output" 296 297 with pytest.raises( 298 MlflowException, 299 match=r"Session-level scorers require traces with session IDs.*" 300 r"Either pass a ConversationSimulator to `data` with `predict_fn`", 301 ): 302 validate_session_level_evaluation_inputs( 303 scorers=scorers_list, 304 predict_fn=dummy_predict_fn, 305 ) 306 307 308 def test_validate_session_level_evaluation_inputs_mixed_scorers(): 309 @scorer 310 def single_turn_scorer(outputs): 311 return 1.0 312 313 multi_turn_scorer = _MultiTurnTestScorer() 314 scorers_list = [single_turn_scorer, multi_turn_scorer] 315 316 # Should not raise any exceptions 317 validate_session_level_evaluation_inputs( 318 scorers=scorers_list, 319 predict_fn=None, 320 ) 321 322 323 # ==================== Tests for evaluate_session_level_scorers ==================== 324 325 326 def _create_test_trace(trace_id: str, request_time: int = 0) -> Trace: 327 """Helper to create a minimal test trace""" 328 return Trace( 329 info=TraceInfo( 330 trace_id=trace_id, 331 trace_location=TraceLocation.from_experiment_id("0"), 332 request_time=request_time, 333 execution_duration=100, 334 state=TraceState.OK, 335 trace_metadata={}, 336 tags={}, 337 ), 338 data=TraceData(spans=[]), 339 ) 340 341 342 def _create_eval_item(trace_id: str, request_time: int = 0) -> EvalItem: 343 """Helper to create a minimal EvalItem with a trace""" 344 trace = _create_test_trace(trace_id, request_time) 345 return EvalItem( 346 request_id=trace_id, 347 trace=trace, 348 inputs={}, 349 outputs={}, 350 expectations={}, 351 ) 352 353 354 def test_evaluate_session_level_scorers_success(): 355 mock_scorer = Mock(spec=mlflow.genai.Scorer) 356 mock_scorer.name = "test_scorer" 357 mock_scorer.run.return_value = 0.8 358 359 # Test with a single session containing multiple traces 360 session_items = [ 361 _create_eval_item("trace1", request_time=100), 362 _create_eval_item("trace2", request_time=200), 363 ] 364 365 with patch( 366 "mlflow.genai.evaluation.session_utils.standardize_scorer_value" 367 ) as mock_standardize: 368 # Return a new Feedback object each time to avoid metadata overwriting 369 def create_feedback(*args, **kwargs): 370 return [ 371 Feedback( 372 name="test_scorer", 373 source=AssessmentSource( 374 source_type=AssessmentSourceType.CODE, source_id="test" 375 ), 376 value=0.8, 377 ) 378 ] 379 380 mock_standardize.side_effect = create_feedback 381 382 result = evaluate_session_level_scorers("session1", session_items, [mock_scorer]) 383 384 # Verify scorer was called once (for the single session) 385 assert mock_scorer.run.call_count == 1 386 387 # Verify scorer received session traces 388 call_args = mock_scorer.run.call_args 389 assert "session" in call_args.kwargs 390 assert len(call_args.kwargs["session"]) == 2 # session has 2 traces 391 392 # Verify result is for first item 393 assert result.eval_item.trace.info.trace_id == "trace1" 394 assert len(result.assessments) == 1 395 assert result.assessments[0].name == "test_scorer" 396 assert result.assessments[0].value == 0.8 397 398 # Verify session_id was added to metadata 399 assert result.assessments[0].metadata is not None 400 assert result.assessments[0].metadata[TraceMetadataKey.TRACE_SESSION] == "session1" 401 402 403 def test_evaluate_session_level_scorers_handles_scorer_error(): 404 mock_scorer = Mock(spec=mlflow.genai.Scorer) 405 mock_scorer.name = "failing_scorer" 406 mock_scorer.run.side_effect = ValueError("Scorer failed!") 407 408 session_items = [_create_eval_item("trace1", 100)] 409 410 result = evaluate_session_level_scorers("session1", session_items, [mock_scorer]) 411 412 # Verify error feedback was created 413 assert result.eval_item.trace.info.trace_id == "trace1" 414 assert len(result.assessments) == 1 415 feedback = result.assessments[0] 416 assert feedback.name == "failing_scorer" 417 assert feedback.error is not None 418 assert feedback.error.error_code == "SCORER_ERROR" 419 assert feedback.error.stack_trace is not None 420 421 assert feedback.error.to_proto().error_message == "Scorer failed!" 422 assert isinstance(feedback.error.error_message, str) 423 assert feedback.error.error_message == "Scorer failed!" 424 425 # Verify session_id metadata is present even on error feedbacks 426 assert feedback.metadata is not None 427 assert feedback.metadata[TraceMetadataKey.TRACE_SESSION] == "session1" 428 429 430 def test_evaluate_session_level_scorers_multiple_feedbacks_per_scorer(): 431 mock_scorer = Mock(spec=mlflow.genai.Scorer) 432 mock_scorer.name = "multi_feedback_scorer" 433 mock_scorer.run.return_value = {"metric1": 0.7, "metric2": 0.9} 434 435 session_items = [_create_eval_item("trace1", 100)] 436 437 with patch( 438 "mlflow.genai.evaluation.session_utils.standardize_scorer_value" 439 ) as mock_standardize: 440 feedbacks = [ 441 Feedback( 442 name="multi_feedback_scorer/metric1", 443 source=AssessmentSource(source_type=AssessmentSourceType.CODE, source_id="test"), 444 value=0.7, 445 ), 446 Feedback( 447 name="multi_feedback_scorer/metric2", 448 source=AssessmentSource(source_type=AssessmentSourceType.CODE, source_id="test"), 449 value=0.9, 450 ), 451 ] 452 mock_standardize.return_value = feedbacks 453 454 result = evaluate_session_level_scorers("session1", session_items, [mock_scorer]) 455 456 # Verify both feedbacks are stored 457 assert result.eval_item.trace.info.trace_id == "trace1" 458 assert len(result.assessments) == 2 459 # Find feedbacks by name 460 feedback_by_name = {f.name: f for f in result.assessments} 461 assert "multi_feedback_scorer/metric1" in feedback_by_name 462 assert "multi_feedback_scorer/metric2" in feedback_by_name 463 assert feedback_by_name["multi_feedback_scorer/metric1"].value == 0.7 464 assert feedback_by_name["multi_feedback_scorer/metric2"].value == 0.9 465 466 467 def test_evaluate_session_level_scorers_first_trace_selection(): 468 mock_scorer = Mock(spec=mlflow.genai.Scorer) 469 mock_scorer.name = "first_trace_scorer" 470 mock_scorer.run.return_value = 1.0 471 472 # Create session with traces in non-chronological order 473 session_items = [ 474 _create_eval_item("trace2", request_time=200), # Second chronologically 475 _create_eval_item("trace1", request_time=100), # First chronologically 476 _create_eval_item("trace3", request_time=300), # Third chronologically 477 ] 478 479 with patch( 480 "mlflow.genai.evaluation.session_utils.standardize_scorer_value" 481 ) as mock_standardize: 482 feedback = Feedback( 483 name="first_trace_scorer", 484 source=AssessmentSource(source_type=AssessmentSourceType.CODE, source_id="test"), 485 value=1.0, 486 ) 487 mock_standardize.return_value = [feedback] 488 489 result = evaluate_session_level_scorers("session1", session_items, [mock_scorer]) 490 491 # Verify assessment is for trace1 (earliest request_time) 492 assert result.eval_item.trace.info.trace_id == "trace1" 493 assert len(result.assessments) == 1 494 assert result.assessments[0].name == "first_trace_scorer" 495 assert result.assessments[0].value == 1.0 496 497 498 def test_evaluate_session_level_scorers_multiple_scorers(): 499 mock_scorer1 = Mock(spec=mlflow.genai.Scorer) 500 mock_scorer1.name = "scorer1" 501 mock_scorer1.run.return_value = 0.6 502 503 mock_scorer2 = Mock(spec=mlflow.genai.Scorer) 504 mock_scorer2.name = "scorer2" 505 mock_scorer2.run.return_value = 0.8 506 507 session_items = [_create_eval_item("trace1", 100)] 508 509 with patch( 510 "mlflow.genai.evaluation.session_utils.standardize_scorer_value" 511 ) as mock_standardize: 512 513 def create_feedback(name, value): 514 return [ 515 Feedback( 516 name=name, 517 source=AssessmentSource( 518 source_type=AssessmentSourceType.CODE, source_id="test" 519 ), 520 value=value, 521 ) 522 ] 523 524 mock_standardize.side_effect = [ 525 create_feedback("scorer1", 0.6), 526 create_feedback("scorer2", 0.8), 527 ] 528 529 result = evaluate_session_level_scorers( 530 "session1", session_items, [mock_scorer1, mock_scorer2] 531 ) 532 533 # Verify both scorers were evaluated (runs in parallel) 534 assert mock_scorer1.run.call_count == 1 535 assert mock_scorer2.run.call_count == 1 536 537 # Verify result contains assessments from both scorers 538 assert result.eval_item.trace.info.trace_id == "trace1" 539 assert len(result.assessments) == 2 540 # Find feedbacks by name 541 feedback_by_name = {f.name: f for f in result.assessments} 542 assert "scorer1" in feedback_by_name 543 assert "scorer2" in feedback_by_name 544 assert feedback_by_name["scorer1"].value == 0.6 545 assert feedback_by_name["scorer2"].value == 0.8 546 547 548 def test_evaluate_session_level_scorers_error_multiple_traces(): 549 mock_scorer = Mock(spec=mlflow.genai.Scorer) 550 mock_scorer.name = "failing_scorer" 551 mock_scorer.run.side_effect = RuntimeError("boom") 552 553 session_items = [ 554 _create_eval_item("trace1", request_time=100), 555 _create_eval_item("trace2", request_time=200), 556 ] 557 558 result = evaluate_session_level_scorers("session-abc", session_items, [mock_scorer]) 559 560 assert result.eval_item.trace.info.trace_id == "trace1" 561 feedback = result.assessments[0] 562 assert feedback.error is not None 563 assert feedback.metadata[TraceMetadataKey.TRACE_SESSION] == "session-abc"