/ tests / haystack / test_haystack_tracing.py
test_haystack_tracing.py
  1  from unittest.mock import patch
  2  
  3  from haystack import Document, Pipeline, component
  4  from haystack.components.rankers import LostInTheMiddleRanker
  5  from haystack.components.retrievers import InMemoryBM25Retriever
  6  from haystack.document_stores.in_memory import InMemoryDocumentStore
  7  
  8  import mlflow
  9  from mlflow.entities import SpanType
 10  from mlflow.environment_variables import MLFLOW_USE_DEFAULT_TRACER_PROVIDER
 11  from mlflow.tracing.constant import SpanAttributeKey
 12  from mlflow.version import IS_TRACING_SDK_ONLY
 13  
 14  from tests.tracing.helper import get_traces
 15  
 16  
 17  @component
 18  class Add:
 19      def run(self, a: int, b: int):
 20          return {"sum": a + b}
 21  
 22  
 23  @component
 24  class Multiply:
 25      def run(self, value: int, factor: int):
 26          return {"product": value * factor}
 27  
 28  
 29  def test_haystack_autolog_single_trace():
 30      mlflow.haystack.autolog()
 31  
 32      pipe = Pipeline()
 33      pipe.add_component("adder", Add())
 34      pipe.run({"adder": {"a": 1, "b": 2}})
 35  
 36      traces = get_traces()
 37      assert len(traces) == 1
 38      spans = traces[0].data.spans
 39      assert spans[0].span_type == SpanType.CHAIN
 40      assert spans[0].name == "haystack.pipeline.run"
 41      assert spans[0].inputs == {"adder": {"a": 1, "b": 2}}
 42      assert spans[0].outputs == {"adder": {"sum": 3}}
 43      assert spans[1].span_type == SpanType.TOOL
 44      assert spans[1].name == "Add"
 45      assert spans[1].inputs == {"a": 1, "b": 2}
 46      assert spans[1].outputs == {"sum": 3}
 47  
 48      mlflow.haystack.autolog(disable=True)
 49      pipe.run({"adder": {"a": 3, "b": 4}})
 50      assert len(get_traces()) == 1
 51  
 52  
 53  def test_pipeline_with_multiple_components_single_trace():
 54      mlflow.haystack.autolog()
 55  
 56      pipe = Pipeline()
 57      pipe.add_component("adder", Add())
 58      pipe.add_component("multiplier", Multiply())
 59  
 60      pipe.run({"adder": {"a": 1, "b": 2}, "multiplier": {"value": 3, "factor": 4}})
 61  
 62      traces = get_traces()
 63      assert len(traces) == 1
 64      spans = traces[0].data.spans
 65      assert spans[0].span_type == SpanType.CHAIN
 66      assert spans[0].name == "haystack.pipeline.run"
 67      assert spans[1].span_type == SpanType.TOOL
 68      assert spans[2].span_type == SpanType.TOOL
 69      assert spans[1].name == "Add"
 70      assert spans[2].name == "Multiply"
 71      assert spans[1].inputs == {"a": 1, "b": 2}
 72      assert spans[1].outputs == {"sum": 3}
 73      assert spans[2].inputs == {"value": 3, "factor": 4}
 74      assert spans[2].outputs == {"product": 12}
 75  
 76      mlflow.haystack.autolog(disable=True)
 77      pipe.run({"adder": {"a": 1, "b": 2}, "multiplier": {"value": 3, "factor": 4}})
 78  
 79      traces = get_traces()
 80      assert len(traces) == 1
 81  
 82  
 83  def test_token_usage_parsed_for_llm_component(mock_litellm_cost):
 84      mlflow.haystack.autolog()
 85  
 86      @component
 87      class MyLLM:
 88          def run(self, prompt: str, model: str):
 89              return {}
 90  
 91      pipe = Pipeline()
 92      pipe.add_component("my_llm", MyLLM())
 93  
 94      output = {
 95          "replies": [
 96              {
 97                  "content": [{"text": "hi"}],
 98                  "meta": {"usage": {"prompt_tokens": 1, "completion_tokens": 2, "total_tokens": 3}},
 99              }
100          ]
101      }
102  
103      with patch.object(MyLLM, "run", return_value=output):
104          pipe.run({"my_llm": {"prompt": "hello", "model": "gpt-4"}})
105  
106      traces = get_traces()
107      assert len(traces) == 1
108      span = traces[0].data.spans[1]
109      assert span.span_type == SpanType.LLM
110      assert span.name == "MyLLM"
111      assert span.attributes[SpanAttributeKey.CHAT_USAGE] == {
112          "input_tokens": 1,
113          "output_tokens": 2,
114          "total_tokens": 3,
115      }
116      assert span.model_name == "gpt-4"
117      if not IS_TRACING_SDK_ONLY:
118          # Verify cost is calculated (1 input token * 1.0 + 2 output tokens * 2.0)
119          assert span.llm_cost == {
120              "input_cost": 1.0,
121              "output_cost": 4.0,
122              "total_cost": 5.0,
123          }
124  
125      mlflow.haystack.autolog(disable=True)
126  
127      traces = get_traces()
128      with patch.object(MyLLM, "run", return_value=output):
129          pipe.run({"my_llm": {"prompt": "hello", "model": "gpt-4"}})
130  
131      assert len(traces) == 1
132  
133  
134  def test_autolog_disable():
135      mlflow.haystack.autolog()
136  
137      pipe1 = Pipeline()
138      pipe1.add_component("adder", Add())
139      pipe1.run({"adder": {"a": 1, "b": 2}})
140      assert len(get_traces()) == 1
141  
142      mlflow.haystack.autolog(disable=True)
143      pipe2 = Pipeline()
144      pipe2.add_component("adder", Add())
145      pipe2.run({"adder": {"a": 2, "b": 3}})
146      assert len(get_traces()) == 1
147  
148  
149  def test_in_memory_retriever_component_traced():
150      mlflow.set_experiment("haystack_retriever")
151      mlflow.haystack.autolog()
152  
153      store = InMemoryDocumentStore()
154      store.write_documents([Document(content="foo")])
155      pipe = Pipeline()
156      pipe.add_component("retriever", InMemoryBM25Retriever(document_store=store))
157      pipe.run({"retriever": {"query": "foo"}})
158  
159      traces = get_traces()
160      assert len(traces) == 1
161      span = traces[0].data.spans[1]
162      assert span.span_type == SpanType.RETRIEVER
163      assert span.name == "InMemoryBM25Retriever"
164      assert span.outputs["documents"][0]["content"] == "foo"
165  
166  
167  def test_multiple_components_in_pipeline_reranker():
168      mlflow.haystack.autolog()
169  
170      pipe = Pipeline()
171      store = InMemoryDocumentStore()
172      store.write_documents([Document(content="foo")])
173  
174      pipe.add_component("retriever", InMemoryBM25Retriever(document_store=store))
175      pipe.add_component("reranker", LostInTheMiddleRanker())
176  
177      pipe.connect("retriever.documents", "reranker.documents")
178  
179      pipe.run({"retriever": {"query": "foo"}})
180  
181      traces = get_traces()
182      assert len(traces) == 1
183      spans = traces[0].data.spans
184      assert spans[0].span_type == SpanType.CHAIN
185      assert spans[0].name == "haystack.pipeline.run"
186      assert spans[1].name == "InMemoryBM25Retriever"
187      assert spans[2].name == "LostInTheMiddleRanker"
188      assert spans[1].span_type == SpanType.RETRIEVER
189      assert spans[2].span_type == SpanType.RERANKER
190      assert spans[1].inputs["query"] == "foo"
191      assert spans[2].inputs["documents"][0]["content"] == "foo"
192  
193      mlflow.haystack.autolog(disable=True)
194      pipe.run({"retriever": {"query": "foo"}})
195      assert len(get_traces()) == 1
196  
197  
198  def test_haystack_autolog_shared_provider_no_recursion(monkeypatch):
199      # Verify haystack.autolog() works with shared tracer provider (no RecursionError)
200      monkeypatch.setenv(MLFLOW_USE_DEFAULT_TRACER_PROVIDER.name, "false")
201  
202      mlflow.haystack.autolog()
203  
204      pipe = Pipeline()
205      pipe.add_component("adder", Add())
206      pipe.run({"adder": {"a": 1, "b": 2}})
207  
208      traces = get_traces()
209      assert len(traces) == 1
210      spans = traces[0].data.spans
211      assert spans[0].span_type == SpanType.CHAIN
212      assert spans[0].inputs == {"adder": {"a": 1, "b": 2}}
213      assert spans[0].outputs == {"adder": {"sum": 3}}