/ tests / genai / evaluate / test_session_utils.py
test_session_utils.py
  1  from unittest.mock import Mock, patch
  2  
  3  import pytest
  4  
  5  import mlflow
  6  from mlflow.entities import TraceData, TraceInfo, TraceLocation, TraceState
  7  from mlflow.entities.assessment import Feedback
  8  from mlflow.entities.assessment_source import AssessmentSource, AssessmentSourceType
  9  from mlflow.entities.trace import Trace
 10  from mlflow.exceptions import MlflowException
 11  from mlflow.genai import scorer
 12  from mlflow.genai.evaluation.entities import EvalItem
 13  from mlflow.genai.evaluation.session_utils import (
 14      classify_scorers,
 15      evaluate_session_level_scorers,
 16      get_first_trace_in_session,
 17      group_traces_by_session,
 18      validate_session_level_evaluation_inputs,
 19  )
 20  from mlflow.tracing.constant import TraceMetadataKey
 21  
 22  
 23  class _MultiTurnTestScorer:
 24      """Helper class for testing multi-turn scorers."""
 25  
 26      def __init__(self, name="test_multi_turn_scorer"):
 27          self.name = name
 28          self.is_session_level_scorer = True
 29          self.aggregations = []
 30  
 31      def run(self, session=None, **kwargs):
 32          return True
 33  
 34      def __call__(self, traces=None, **kwargs):
 35          return 1.0
 36  
 37  
 38  # ==================== Tests for classify_scorers ====================
 39  
 40  
 41  def test_classify_scorers_all_single_turn():
 42      @scorer
 43      def custom_scorer1(outputs):
 44          return 1.0
 45  
 46      @scorer
 47      def custom_scorer2(outputs):
 48          return 2.0
 49  
 50      scorers_list = [custom_scorer1, custom_scorer2]
 51      single_turn, multi_turn = classify_scorers(scorers_list)
 52  
 53      assert len(single_turn) == 2
 54      assert len(multi_turn) == 0
 55      assert single_turn == scorers_list
 56  
 57  
 58  def test_classify_scorers_all_multi_turn():
 59      multi_turn_scorer1 = _MultiTurnTestScorer(name="multi_turn_scorer1")
 60      multi_turn_scorer2 = _MultiTurnTestScorer(name="multi_turn_scorer2")
 61  
 62      scorers_list = [multi_turn_scorer1, multi_turn_scorer2]
 63      single_turn, multi_turn = classify_scorers(scorers_list)
 64  
 65      assert len(single_turn) == 0
 66      assert len(multi_turn) == 2
 67      assert multi_turn == scorers_list
 68      # Verify they are actually multi-turn
 69      assert multi_turn_scorer1.is_session_level_scorer is True
 70      assert multi_turn_scorer2.is_session_level_scorer is True
 71  
 72  
 73  def test_classify_scorers_mixed():
 74      @scorer
 75      def single_turn_scorer(outputs):
 76          return 1.0
 77  
 78      multi_turn_scorer = _MultiTurnTestScorer(name="multi_turn_scorer")
 79  
 80      scorers_list = [single_turn_scorer, multi_turn_scorer]
 81      single_turn, multi_turn = classify_scorers(scorers_list)
 82  
 83      assert len(single_turn) == 1
 84      assert len(multi_turn) == 1
 85      assert single_turn[0] == single_turn_scorer
 86      assert multi_turn[0] == multi_turn_scorer
 87      # Verify properties
 88      assert single_turn_scorer.is_session_level_scorer is False
 89      assert multi_turn_scorer.is_session_level_scorer is True
 90  
 91  
 92  def test_classify_scorers_empty_list():
 93      single_turn, multi_turn = classify_scorers([])
 94  
 95      assert len(single_turn) == 0
 96      assert len(multi_turn) == 0
 97  
 98  
 99  # ==================== Tests for group_traces_by_session ====================
100  
101  
102  def _create_mock_trace(trace_id: str, session_id: str | None, request_time: int):
103      """Helper to create a mock trace with session_id and request_time."""
104      trace_metadata = {}
105      if session_id is not None:
106          trace_metadata[TraceMetadataKey.TRACE_SESSION] = session_id
107  
108      trace_info = TraceInfo(
109          trace_id=trace_id,
110          trace_location=TraceLocation.from_experiment_id("0"),
111          request_time=request_time,
112          execution_duration=1000,
113          state=TraceState.OK,
114          trace_metadata=trace_metadata,
115          tags={},
116      )
117  
118      trace = Mock(spec=Trace)
119      trace.info = trace_info
120      trace.data = TraceData(spans=[])
121      return trace
122  
123  
124  def _create_mock_eval_item(trace):
125      """Helper to create a mock EvalItem with a trace."""
126      eval_item = Mock(spec=EvalItem)
127      eval_item.trace = trace
128      eval_item.source = None  # Explicitly set to None so it doesn't return a Mock
129      return eval_item
130  
131  
132  def test_group_traces_by_session_single_session():
133      trace1 = _create_mock_trace("trace-1", "session-1", 1000)
134      trace2 = _create_mock_trace("trace-2", "session-1", 2000)
135      trace3 = _create_mock_trace("trace-3", "session-1", 3000)
136  
137      eval_item1 = _create_mock_eval_item(trace1)
138      eval_item2 = _create_mock_eval_item(trace2)
139      eval_item3 = _create_mock_eval_item(trace3)
140  
141      eval_items = [eval_item1, eval_item2, eval_item3]
142      session_groups = group_traces_by_session(eval_items)
143  
144      assert len(session_groups) == 1
145      assert "session-1" in session_groups
146      assert len(session_groups["session-1"]) == 3
147  
148      # Check that all traces are included
149      session_traces = [item.trace for item in session_groups["session-1"]]
150      assert trace1 in session_traces
151      assert trace2 in session_traces
152      assert trace3 in session_traces
153  
154  
155  def test_group_traces_by_session_multiple_sessions():
156      trace1 = _create_mock_trace("trace-1", "session-1", 1000)
157      trace2 = _create_mock_trace("trace-2", "session-1", 2000)
158      trace3 = _create_mock_trace("trace-3", "session-2", 1500)
159      trace4 = _create_mock_trace("trace-4", "session-2", 2500)
160  
161      eval_items = [
162          _create_mock_eval_item(trace1),
163          _create_mock_eval_item(trace2),
164          _create_mock_eval_item(trace3),
165          _create_mock_eval_item(trace4),
166      ]
167  
168      session_groups = group_traces_by_session(eval_items)
169  
170      assert len(session_groups) == 2
171      assert "session-1" in session_groups
172      assert "session-2" in session_groups
173      assert len(session_groups["session-1"]) == 2
174      assert len(session_groups["session-2"]) == 2
175  
176  
177  def test_group_traces_by_session_excludes_no_session_id():
178      trace1 = _create_mock_trace("trace-1", "session-1", 1000)
179      trace2 = _create_mock_trace("trace-2", None, 2000)  # No session_id
180      trace3 = _create_mock_trace("trace-3", "session-1", 3000)
181  
182      eval_items = [
183          _create_mock_eval_item(trace1),
184          _create_mock_eval_item(trace2),
185          _create_mock_eval_item(trace3),
186      ]
187  
188      session_groups = group_traces_by_session(eval_items)
189  
190      assert len(session_groups) == 1
191      assert "session-1" in session_groups
192      assert len(session_groups["session-1"]) == 2
193      # trace2 should not be included
194      session_traces = [item.trace for item in session_groups["session-1"]]
195      assert trace1 in session_traces
196      assert trace2 not in session_traces
197      assert trace3 in session_traces
198  
199  
200  def test_group_traces_by_session_excludes_none_traces():
201      trace1 = _create_mock_trace("trace-1", "session-1", 1000)
202  
203      eval_item1 = _create_mock_eval_item(trace1)
204      eval_item2 = Mock()
205      eval_item2.trace = None  # No trace
206      eval_item2.source = None  # No source
207  
208      eval_items = [eval_item1, eval_item2]
209      session_groups = group_traces_by_session(eval_items)
210  
211      assert len(session_groups) == 1
212      assert "session-1" in session_groups
213      assert len(session_groups["session-1"]) == 1
214  
215  
216  def test_group_traces_by_session_empty_list():
217      session_groups = group_traces_by_session([])
218  
219      assert len(session_groups) == 0
220      assert session_groups == {}
221  
222  
223  # ==================== Tests for get_first_trace_in_session ====================
224  
225  
226  def test_get_first_trace_in_session_chronological_order():
227      trace1 = _create_mock_trace("trace-1", "session-1", 3000)
228      trace2 = _create_mock_trace("trace-2", "session-1", 1000)  # Earliest
229      trace3 = _create_mock_trace("trace-3", "session-1", 2000)
230  
231      eval_item1 = _create_mock_eval_item(trace1)
232      eval_item2 = _create_mock_eval_item(trace2)
233      eval_item3 = _create_mock_eval_item(trace3)
234  
235      session_items = [eval_item1, eval_item2, eval_item3]
236  
237      first_item = get_first_trace_in_session(session_items)
238  
239      assert first_item.trace == trace2
240      assert first_item == eval_item2
241  
242  
243  def test_get_first_trace_in_session_single_trace():
244      trace1 = _create_mock_trace("trace-1", "session-1", 1000)
245      eval_item1 = _create_mock_eval_item(trace1)
246  
247      session_items = [eval_item1]
248  
249      first_item = get_first_trace_in_session(session_items)
250  
251      assert first_item.trace == trace1
252      assert first_item == eval_item1
253  
254  
255  def test_get_first_trace_in_session_same_timestamp():
256      # When timestamps are equal, min() will return the first one in the list
257      trace1 = _create_mock_trace("trace-1", "session-1", 1000)
258      trace2 = _create_mock_trace("trace-2", "session-1", 1000)
259      trace3 = _create_mock_trace("trace-3", "session-1", 1000)
260  
261      eval_item1 = _create_mock_eval_item(trace1)
262      eval_item2 = _create_mock_eval_item(trace2)
263      eval_item3 = _create_mock_eval_item(trace3)
264  
265      session_items = [eval_item1, eval_item2, eval_item3]
266  
267      first_item = get_first_trace_in_session(session_items)
268  
269      # Should return one of the traces with timestamp 1000 (likely the first one)
270      assert first_item.trace.info.request_time == 1000
271  
272  
273  # ==================== Tests for validate_session_level_evaluation_inputs ====================
274  
275  
276  def test_validate_session_level_evaluation_inputs_no_session_level_scorers():
277      @scorer
278      def single_turn_scorer(outputs):
279          return 1.0
280  
281      scorers_list = [single_turn_scorer]
282  
283      # Should not raise any exceptions
284      validate_session_level_evaluation_inputs(
285          scorers=scorers_list,
286          predict_fn=None,
287      )
288  
289  
290  def test_validate_session_level_evaluation_inputs_with_predict_fn():
291      multi_turn_scorer = _MultiTurnTestScorer()
292      scorers_list = [multi_turn_scorer]
293  
294      def dummy_predict_fn():
295          return "output"
296  
297      with pytest.raises(
298          MlflowException,
299          match=r"Session-level scorers require traces with session IDs.*"
300          r"Either pass a ConversationSimulator to `data` with `predict_fn`",
301      ):
302          validate_session_level_evaluation_inputs(
303              scorers=scorers_list,
304              predict_fn=dummy_predict_fn,
305          )
306  
307  
308  def test_validate_session_level_evaluation_inputs_mixed_scorers():
309      @scorer
310      def single_turn_scorer(outputs):
311          return 1.0
312  
313      multi_turn_scorer = _MultiTurnTestScorer()
314      scorers_list = [single_turn_scorer, multi_turn_scorer]
315  
316      # Should not raise any exceptions
317      validate_session_level_evaluation_inputs(
318          scorers=scorers_list,
319          predict_fn=None,
320      )
321  
322  
323  # ==================== Tests for evaluate_session_level_scorers ====================
324  
325  
326  def _create_test_trace(trace_id: str, request_time: int = 0) -> Trace:
327      """Helper to create a minimal test trace"""
328      return Trace(
329          info=TraceInfo(
330              trace_id=trace_id,
331              trace_location=TraceLocation.from_experiment_id("0"),
332              request_time=request_time,
333              execution_duration=100,
334              state=TraceState.OK,
335              trace_metadata={},
336              tags={},
337          ),
338          data=TraceData(spans=[]),
339      )
340  
341  
342  def _create_eval_item(trace_id: str, request_time: int = 0) -> EvalItem:
343      """Helper to create a minimal EvalItem with a trace"""
344      trace = _create_test_trace(trace_id, request_time)
345      return EvalItem(
346          request_id=trace_id,
347          trace=trace,
348          inputs={},
349          outputs={},
350          expectations={},
351      )
352  
353  
354  def test_evaluate_session_level_scorers_success():
355      mock_scorer = Mock(spec=mlflow.genai.Scorer)
356      mock_scorer.name = "test_scorer"
357      mock_scorer.run.return_value = 0.8
358  
359      # Test with a single session containing multiple traces
360      session_items = [
361          _create_eval_item("trace1", request_time=100),
362          _create_eval_item("trace2", request_time=200),
363      ]
364  
365      with patch(
366          "mlflow.genai.evaluation.session_utils.standardize_scorer_value"
367      ) as mock_standardize:
368          # Return a new Feedback object each time to avoid metadata overwriting
369          def create_feedback(*args, **kwargs):
370              return [
371                  Feedback(
372                      name="test_scorer",
373                      source=AssessmentSource(
374                          source_type=AssessmentSourceType.CODE, source_id="test"
375                      ),
376                      value=0.8,
377                  )
378              ]
379  
380          mock_standardize.side_effect = create_feedback
381  
382          result = evaluate_session_level_scorers("session1", session_items, [mock_scorer])
383  
384          # Verify scorer was called once (for the single session)
385          assert mock_scorer.run.call_count == 1
386  
387          # Verify scorer received session traces
388          call_args = mock_scorer.run.call_args
389          assert "session" in call_args.kwargs
390          assert len(call_args.kwargs["session"]) == 2  # session has 2 traces
391  
392          # Verify result is for first item
393          assert result.eval_item.trace.info.trace_id == "trace1"
394          assert len(result.assessments) == 1
395          assert result.assessments[0].name == "test_scorer"
396          assert result.assessments[0].value == 0.8
397  
398          # Verify session_id was added to metadata
399          assert result.assessments[0].metadata is not None
400          assert result.assessments[0].metadata[TraceMetadataKey.TRACE_SESSION] == "session1"
401  
402  
403  def test_evaluate_session_level_scorers_handles_scorer_error():
404      mock_scorer = Mock(spec=mlflow.genai.Scorer)
405      mock_scorer.name = "failing_scorer"
406      mock_scorer.run.side_effect = ValueError("Scorer failed!")
407  
408      session_items = [_create_eval_item("trace1", 100)]
409  
410      result = evaluate_session_level_scorers("session1", session_items, [mock_scorer])
411  
412      # Verify error feedback was created
413      assert result.eval_item.trace.info.trace_id == "trace1"
414      assert len(result.assessments) == 1
415      feedback = result.assessments[0]
416      assert feedback.name == "failing_scorer"
417      assert feedback.error is not None
418      assert feedback.error.error_code == "SCORER_ERROR"
419      assert feedback.error.stack_trace is not None
420  
421      assert feedback.error.to_proto().error_message == "Scorer failed!"
422      assert isinstance(feedback.error.error_message, str)
423      assert feedback.error.error_message == "Scorer failed!"
424  
425      # Verify session_id metadata is present even on error feedbacks
426      assert feedback.metadata is not None
427      assert feedback.metadata[TraceMetadataKey.TRACE_SESSION] == "session1"
428  
429  
430  def test_evaluate_session_level_scorers_multiple_feedbacks_per_scorer():
431      mock_scorer = Mock(spec=mlflow.genai.Scorer)
432      mock_scorer.name = "multi_feedback_scorer"
433      mock_scorer.run.return_value = {"metric1": 0.7, "metric2": 0.9}
434  
435      session_items = [_create_eval_item("trace1", 100)]
436  
437      with patch(
438          "mlflow.genai.evaluation.session_utils.standardize_scorer_value"
439      ) as mock_standardize:
440          feedbacks = [
441              Feedback(
442                  name="multi_feedback_scorer/metric1",
443                  source=AssessmentSource(source_type=AssessmentSourceType.CODE, source_id="test"),
444                  value=0.7,
445              ),
446              Feedback(
447                  name="multi_feedback_scorer/metric2",
448                  source=AssessmentSource(source_type=AssessmentSourceType.CODE, source_id="test"),
449                  value=0.9,
450              ),
451          ]
452          mock_standardize.return_value = feedbacks
453  
454          result = evaluate_session_level_scorers("session1", session_items, [mock_scorer])
455  
456          # Verify both feedbacks are stored
457          assert result.eval_item.trace.info.trace_id == "trace1"
458          assert len(result.assessments) == 2
459          # Find feedbacks by name
460          feedback_by_name = {f.name: f for f in result.assessments}
461          assert "multi_feedback_scorer/metric1" in feedback_by_name
462          assert "multi_feedback_scorer/metric2" in feedback_by_name
463          assert feedback_by_name["multi_feedback_scorer/metric1"].value == 0.7
464          assert feedback_by_name["multi_feedback_scorer/metric2"].value == 0.9
465  
466  
467  def test_evaluate_session_level_scorers_first_trace_selection():
468      mock_scorer = Mock(spec=mlflow.genai.Scorer)
469      mock_scorer.name = "first_trace_scorer"
470      mock_scorer.run.return_value = 1.0
471  
472      # Create session with traces in non-chronological order
473      session_items = [
474          _create_eval_item("trace2", request_time=200),  # Second chronologically
475          _create_eval_item("trace1", request_time=100),  # First chronologically
476          _create_eval_item("trace3", request_time=300),  # Third chronologically
477      ]
478  
479      with patch(
480          "mlflow.genai.evaluation.session_utils.standardize_scorer_value"
481      ) as mock_standardize:
482          feedback = Feedback(
483              name="first_trace_scorer",
484              source=AssessmentSource(source_type=AssessmentSourceType.CODE, source_id="test"),
485              value=1.0,
486          )
487          mock_standardize.return_value = [feedback]
488  
489          result = evaluate_session_level_scorers("session1", session_items, [mock_scorer])
490  
491          # Verify assessment is for trace1 (earliest request_time)
492          assert result.eval_item.trace.info.trace_id == "trace1"
493          assert len(result.assessments) == 1
494          assert result.assessments[0].name == "first_trace_scorer"
495          assert result.assessments[0].value == 1.0
496  
497  
498  def test_evaluate_session_level_scorers_multiple_scorers():
499      mock_scorer1 = Mock(spec=mlflow.genai.Scorer)
500      mock_scorer1.name = "scorer1"
501      mock_scorer1.run.return_value = 0.6
502  
503      mock_scorer2 = Mock(spec=mlflow.genai.Scorer)
504      mock_scorer2.name = "scorer2"
505      mock_scorer2.run.return_value = 0.8
506  
507      session_items = [_create_eval_item("trace1", 100)]
508  
509      with patch(
510          "mlflow.genai.evaluation.session_utils.standardize_scorer_value"
511      ) as mock_standardize:
512  
513          def create_feedback(name, value):
514              return [
515                  Feedback(
516                      name=name,
517                      source=AssessmentSource(
518                          source_type=AssessmentSourceType.CODE, source_id="test"
519                      ),
520                      value=value,
521                  )
522              ]
523  
524          mock_standardize.side_effect = [
525              create_feedback("scorer1", 0.6),
526              create_feedback("scorer2", 0.8),
527          ]
528  
529          result = evaluate_session_level_scorers(
530              "session1", session_items, [mock_scorer1, mock_scorer2]
531          )
532  
533          # Verify both scorers were evaluated (runs in parallel)
534          assert mock_scorer1.run.call_count == 1
535          assert mock_scorer2.run.call_count == 1
536  
537          # Verify result contains assessments from both scorers
538          assert result.eval_item.trace.info.trace_id == "trace1"
539          assert len(result.assessments) == 2
540          # Find feedbacks by name
541          feedback_by_name = {f.name: f for f in result.assessments}
542          assert "scorer1" in feedback_by_name
543          assert "scorer2" in feedback_by_name
544          assert feedback_by_name["scorer1"].value == 0.6
545          assert feedback_by_name["scorer2"].value == 0.8
546  
547  
548  def test_evaluate_session_level_scorers_error_multiple_traces():
549      mock_scorer = Mock(spec=mlflow.genai.Scorer)
550      mock_scorer.name = "failing_scorer"
551      mock_scorer.run.side_effect = RuntimeError("boom")
552  
553      session_items = [
554          _create_eval_item("trace1", request_time=100),
555          _create_eval_item("trace2", request_time=200),
556      ]
557  
558      result = evaluate_session_level_scorers("session-abc", session_items, [mock_scorer])
559  
560      assert result.eval_item.trace.info.trace_id == "trace1"
561      feedback = result.assessments[0]
562      assert feedback.error is not None
563      assert feedback.metadata[TraceMetadataKey.TRACE_SESSION] == "session-abc"