test_haystack_tracing.py
1 from unittest.mock import patch 2 3 from haystack import Document, Pipeline, component 4 from haystack.components.rankers import LostInTheMiddleRanker 5 from haystack.components.retrievers import InMemoryBM25Retriever 6 from haystack.document_stores.in_memory import InMemoryDocumentStore 7 8 import mlflow 9 from mlflow.entities import SpanType 10 from mlflow.environment_variables import MLFLOW_USE_DEFAULT_TRACER_PROVIDER 11 from mlflow.tracing.constant import SpanAttributeKey 12 from mlflow.version import IS_TRACING_SDK_ONLY 13 14 from tests.tracing.helper import get_traces 15 16 17 @component 18 class Add: 19 def run(self, a: int, b: int): 20 return {"sum": a + b} 21 22 23 @component 24 class Multiply: 25 def run(self, value: int, factor: int): 26 return {"product": value * factor} 27 28 29 def test_haystack_autolog_single_trace(): 30 mlflow.haystack.autolog() 31 32 pipe = Pipeline() 33 pipe.add_component("adder", Add()) 34 pipe.run({"adder": {"a": 1, "b": 2}}) 35 36 traces = get_traces() 37 assert len(traces) == 1 38 spans = traces[0].data.spans 39 assert spans[0].span_type == SpanType.CHAIN 40 assert spans[0].name == "haystack.pipeline.run" 41 assert spans[0].inputs == {"adder": {"a": 1, "b": 2}} 42 assert spans[0].outputs == {"adder": {"sum": 3}} 43 assert spans[1].span_type == SpanType.TOOL 44 assert spans[1].name == "Add" 45 assert spans[1].inputs == {"a": 1, "b": 2} 46 assert spans[1].outputs == {"sum": 3} 47 48 mlflow.haystack.autolog(disable=True) 49 pipe.run({"adder": {"a": 3, "b": 4}}) 50 assert len(get_traces()) == 1 51 52 53 def test_pipeline_with_multiple_components_single_trace(): 54 mlflow.haystack.autolog() 55 56 pipe = Pipeline() 57 pipe.add_component("adder", Add()) 58 pipe.add_component("multiplier", Multiply()) 59 60 pipe.run({"adder": {"a": 1, "b": 2}, "multiplier": {"value": 3, "factor": 4}}) 61 62 traces = get_traces() 63 assert len(traces) == 1 64 spans = traces[0].data.spans 65 assert spans[0].span_type == SpanType.CHAIN 66 assert spans[0].name == "haystack.pipeline.run" 67 assert spans[1].span_type == SpanType.TOOL 68 assert spans[2].span_type == SpanType.TOOL 69 assert spans[1].name == "Add" 70 assert spans[2].name == "Multiply" 71 assert spans[1].inputs == {"a": 1, "b": 2} 72 assert spans[1].outputs == {"sum": 3} 73 assert spans[2].inputs == {"value": 3, "factor": 4} 74 assert spans[2].outputs == {"product": 12} 75 76 mlflow.haystack.autolog(disable=True) 77 pipe.run({"adder": {"a": 1, "b": 2}, "multiplier": {"value": 3, "factor": 4}}) 78 79 traces = get_traces() 80 assert len(traces) == 1 81 82 83 def test_token_usage_parsed_for_llm_component(mock_litellm_cost): 84 mlflow.haystack.autolog() 85 86 @component 87 class MyLLM: 88 def run(self, prompt: str, model: str): 89 return {} 90 91 pipe = Pipeline() 92 pipe.add_component("my_llm", MyLLM()) 93 94 output = { 95 "replies": [ 96 { 97 "content": [{"text": "hi"}], 98 "meta": {"usage": {"prompt_tokens": 1, "completion_tokens": 2, "total_tokens": 3}}, 99 } 100 ] 101 } 102 103 with patch.object(MyLLM, "run", return_value=output): 104 pipe.run({"my_llm": {"prompt": "hello", "model": "gpt-4"}}) 105 106 traces = get_traces() 107 assert len(traces) == 1 108 span = traces[0].data.spans[1] 109 assert span.span_type == SpanType.LLM 110 assert span.name == "MyLLM" 111 assert span.attributes[SpanAttributeKey.CHAT_USAGE] == { 112 "input_tokens": 1, 113 "output_tokens": 2, 114 "total_tokens": 3, 115 } 116 assert span.model_name == "gpt-4" 117 if not IS_TRACING_SDK_ONLY: 118 # Verify cost is calculated (1 input token * 1.0 + 2 output tokens * 2.0) 119 assert span.llm_cost == { 120 "input_cost": 1.0, 121 "output_cost": 4.0, 122 "total_cost": 5.0, 123 } 124 125 mlflow.haystack.autolog(disable=True) 126 127 traces = get_traces() 128 with patch.object(MyLLM, "run", return_value=output): 129 pipe.run({"my_llm": {"prompt": "hello", "model": "gpt-4"}}) 130 131 assert len(traces) == 1 132 133 134 def test_autolog_disable(): 135 mlflow.haystack.autolog() 136 137 pipe1 = Pipeline() 138 pipe1.add_component("adder", Add()) 139 pipe1.run({"adder": {"a": 1, "b": 2}}) 140 assert len(get_traces()) == 1 141 142 mlflow.haystack.autolog(disable=True) 143 pipe2 = Pipeline() 144 pipe2.add_component("adder", Add()) 145 pipe2.run({"adder": {"a": 2, "b": 3}}) 146 assert len(get_traces()) == 1 147 148 149 def test_in_memory_retriever_component_traced(): 150 mlflow.set_experiment("haystack_retriever") 151 mlflow.haystack.autolog() 152 153 store = InMemoryDocumentStore() 154 store.write_documents([Document(content="foo")]) 155 pipe = Pipeline() 156 pipe.add_component("retriever", InMemoryBM25Retriever(document_store=store)) 157 pipe.run({"retriever": {"query": "foo"}}) 158 159 traces = get_traces() 160 assert len(traces) == 1 161 span = traces[0].data.spans[1] 162 assert span.span_type == SpanType.RETRIEVER 163 assert span.name == "InMemoryBM25Retriever" 164 assert span.outputs["documents"][0]["content"] == "foo" 165 166 167 def test_multiple_components_in_pipeline_reranker(): 168 mlflow.haystack.autolog() 169 170 pipe = Pipeline() 171 store = InMemoryDocumentStore() 172 store.write_documents([Document(content="foo")]) 173 174 pipe.add_component("retriever", InMemoryBM25Retriever(document_store=store)) 175 pipe.add_component("reranker", LostInTheMiddleRanker()) 176 177 pipe.connect("retriever.documents", "reranker.documents") 178 179 pipe.run({"retriever": {"query": "foo"}}) 180 181 traces = get_traces() 182 assert len(traces) == 1 183 spans = traces[0].data.spans 184 assert spans[0].span_type == SpanType.CHAIN 185 assert spans[0].name == "haystack.pipeline.run" 186 assert spans[1].name == "InMemoryBM25Retriever" 187 assert spans[2].name == "LostInTheMiddleRanker" 188 assert spans[1].span_type == SpanType.RETRIEVER 189 assert spans[2].span_type == SpanType.RERANKER 190 assert spans[1].inputs["query"] == "foo" 191 assert spans[2].inputs["documents"][0]["content"] == "foo" 192 193 mlflow.haystack.autolog(disable=True) 194 pipe.run({"retriever": {"query": "foo"}}) 195 assert len(get_traces()) == 1 196 197 198 def test_haystack_autolog_shared_provider_no_recursion(monkeypatch): 199 # Verify haystack.autolog() works with shared tracer provider (no RecursionError) 200 monkeypatch.setenv(MLFLOW_USE_DEFAULT_TRACER_PROVIDER.name, "false") 201 202 mlflow.haystack.autolog() 203 204 pipe = Pipeline() 205 pipe.add_component("adder", Add()) 206 pipe.run({"adder": {"a": 1, "b": 2}}) 207 208 traces = get_traces() 209 assert len(traces) == 1 210 spans = traces[0].data.spans 211 assert spans[0].span_type == SpanType.CHAIN 212 assert spans[0].inputs == {"adder": {"a": 1, "b": 2}} 213 assert spans[0].outputs == {"adder": {"sum": 3}}