test_otlp.py
1 import gzip 2 import time 3 import zlib 4 from collections.abc import Callable 5 6 import pytest 7 from fastapi import HTTPException 8 9 import mlflow 10 from mlflow.entities.span import SpanType 11 from mlflow.environment_variables import MLFLOW_TRACE_ENABLE_OTLP_DUAL_EXPORT 12 from mlflow.tracing.processor.mlflow_v3 import MlflowV3SpanProcessor 13 from mlflow.tracing.processor.otel import OtelSpanProcessor 14 from mlflow.tracing.provider import _get_trace_exporter, _get_tracer 15 from mlflow.tracing.provider import provider as mlflow_provider 16 from mlflow.tracking import MlflowClient 17 from mlflow.utils.os import is_windows 18 19 from tests.tracing.helper import get_traces 20 21 # OTLP exporters are not installed in some CI jobs 22 try: 23 from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( 24 OTLPSpanExporter as GrpcExporter, 25 ) 26 from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( 27 OTLPSpanExporter as HttpExporter, 28 ) 29 except ImportError: 30 pytest.skip("OTLP exporters are not installed", allow_module_level=True) 31 32 from mlflow.exceptions import MlflowException 33 from mlflow.tracing.utils.otlp import ( 34 decompress_otlp_body, 35 get_otlp_exporter, 36 should_use_otlp_exporter, 37 ) 38 39 _TEST_HTTP_OTLP_ENDPOINT = "http://127.0.0.1:4317/v1/traces" 40 _TEST_HTTPS_OTLP_ENDPOINT = "https://127.0.0.1:4317/v1/traces" 41 42 43 @pytest.mark.parametrize( 44 ("traces_endpoint", "otlp_endpoint", "mlflow_enable", "expected"), 45 [ 46 # No endpoints configured 47 (None, None, None, False), # Default behavior - no export without endpoint 48 (None, None, "true", False), # Explicit enable but no endpoint 49 (None, None, "false", False), # Explicit disable and no endpoint 50 # OTEL_EXPORTER_OTLP_TRACES_ENDPOINT configured 51 (_TEST_HTTP_OTLP_ENDPOINT, None, None, True), # Default behavior - export enabled 52 (_TEST_HTTP_OTLP_ENDPOINT, None, "true", True), # Explicit enable 53 (_TEST_HTTP_OTLP_ENDPOINT, None, "false", False), # Explicit disable 54 # OTEL_EXPORTER_OTLP_ENDPOINT configured 55 (None, _TEST_HTTP_OTLP_ENDPOINT, None, True), # Default behavior - export enabled 56 (None, _TEST_HTTP_OTLP_ENDPOINT, "true", True), # Explicit enable 57 (None, _TEST_HTTP_OTLP_ENDPOINT, "false", False), # Explicit disable 58 # Both endpoints configured (traces endpoint takes precedence) 59 (_TEST_HTTP_OTLP_ENDPOINT, _TEST_HTTPS_OTLP_ENDPOINT, None, True), 60 (_TEST_HTTP_OTLP_ENDPOINT, _TEST_HTTPS_OTLP_ENDPOINT, "false", False), 61 ], 62 ) 63 def test_should_use_otlp_exporter( 64 traces_endpoint, otlp_endpoint, mlflow_enable, expected, monkeypatch 65 ): 66 # Clear all relevant environment variables to ensure test isolation 67 monkeypatch.delenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", raising=False) 68 monkeypatch.delenv("OTEL_EXPORTER_OTLP_ENDPOINT", raising=False) 69 monkeypatch.delenv("MLFLOW_ENABLE_OTLP_EXPORTER", raising=False) 70 71 # Set environment variables based on test parameters 72 if traces_endpoint is not None: 73 monkeypatch.setenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", traces_endpoint) 74 if otlp_endpoint is not None: 75 monkeypatch.setenv("OTEL_EXPORTER_OTLP_ENDPOINT", otlp_endpoint) 76 if mlflow_enable is not None: 77 monkeypatch.setenv("MLFLOW_ENABLE_OTLP_EXPORTER", mlflow_enable) 78 79 assert should_use_otlp_exporter() is expected 80 81 82 @pytest.mark.parametrize( 83 ("endpoint", "protocol", "expected_type"), 84 [ 85 (_TEST_HTTP_OTLP_ENDPOINT, None, GrpcExporter), 86 (_TEST_HTTP_OTLP_ENDPOINT, "grpc", GrpcExporter), 87 (_TEST_HTTPS_OTLP_ENDPOINT, "grpc", GrpcExporter), 88 (_TEST_HTTP_OTLP_ENDPOINT, "http/protobuf", HttpExporter), 89 (_TEST_HTTPS_OTLP_ENDPOINT, "http/protobuf", HttpExporter), 90 ], 91 ) 92 def test_get_otlp_exporter_success(endpoint, protocol, expected_type, monkeypatch): 93 monkeypatch.setenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", endpoint) 94 if protocol: 95 monkeypatch.setenv("OTEL_EXPORTER_OTLP_TRACES_PROTOCOL", protocol) 96 97 exporter = get_otlp_exporter() 98 assert isinstance(exporter, expected_type) 99 100 if isinstance(exporter, GrpcExporter): 101 assert exporter._endpoint == "127.0.0.1:4317" 102 else: 103 assert exporter._endpoint == endpoint 104 105 106 def test_get_otlp_exporter_invalid_protocol(monkeypatch): 107 monkeypatch.setenv("OTEL_EXPORTER_OTLP_TRACES_PROTOCOL", _TEST_HTTP_OTLP_ENDPOINT) 108 109 with pytest.raises(MlflowException, match="Unsupported OTLP protocol"): 110 get_otlp_exporter() 111 112 113 @pytest.mark.skipif(is_windows(), reason="Otel collector docker image does not support Windows") 114 @pytest.mark.parametrize("dual_export", [True, False, None], ids=["enable", "disable", "default"]) 115 def test_export_to_otel_collector(otel_collector, monkeypatch, dual_export): 116 if dual_export: 117 monkeypatch.setenv("MLFLOW_TRACE_ENABLE_OTLP_DUAL_EXPORT", "true") 118 elif dual_export is False: 119 monkeypatch.setenv("MLFLOW_TRACE_ENABLE_OTLP_DUAL_EXPORT", "false") 120 121 from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter 122 123 _, _, port = otel_collector 124 monkeypatch.setenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", f"http://127.0.0.1:{port}/v1/traces") 125 126 class TestModel: 127 @mlflow.trace() 128 def predict(self, x, y): 129 z = x + y 130 z = self.add_one(z) 131 z = mlflow.trace(self.square)(z) 132 return z # noqa: RET504 133 134 @mlflow.trace( 135 span_type=SpanType.LLM, name="add_one_with_custom_name", attributes={"delta": 1} 136 ) 137 def add_one(self, z): 138 return z + 1 139 140 def square(self, t): 141 res = t**2 142 time.sleep(0.1) 143 return res 144 145 model = TestModel() 146 model.predict(2, 5) 147 148 # Tracer should be configured to export to OTLP. 149 # In dual-export mode, _get_trace_exporter() returns the MLflow exporter 150 # (since _get_span_processor prefers BaseMlflowSpanProcessor), so we find 151 # the OTLP exporter by looking for the OtelSpanProcessor directly. 152 if dual_export: 153 # In dual-export mode, _get_trace_exporter() returns the MLflow exporter 154 # (since _get_span_processor prefers BaseMlflowSpanProcessor). Find the 155 # OTLP exporter by looking up the OtelSpanProcessor from the tracer provider. 156 tp = mlflow_provider.get() 157 processors = tp._active_span_processor._span_processors 158 otel_processor = next(p for p in processors if isinstance(p, OtelSpanProcessor)) 159 exporter = otel_processor.span_exporter 160 else: 161 exporter = _get_trace_exporter() 162 assert isinstance(exporter, OTLPSpanExporter) 163 assert exporter._endpoint == f"127.0.0.1:{port}" 164 165 mlflow_traces = get_traces() 166 if dual_export: 167 assert len(mlflow_traces) == 1 168 assert mlflow_traces[0].info.state == "OK" 169 assert len(mlflow_traces[0].data.spans) == 3 170 else: 171 assert len(mlflow_traces) == 0 172 173 # Wait for collector to receive spans, checking every second for up to 60 seconds 174 _, output_file, _ = otel_collector 175 spans_found = False 176 for _ in range(60): 177 time.sleep(1) 178 with open(output_file) as f: 179 collector_logs = f.read() 180 # Check if spans are in the logs - the debug exporter outputs span details 181 # The BatchSpanProcessor may send spans in multiple batches, so we check for any evidence 182 # that the collector is receiving spans from our test 183 if ( 184 "predict" in collector_logs 185 and "add_one_with_custom_name" in collector_logs 186 and "square" in collector_logs 187 ): 188 # We found evidence that spans are being exported to the collector 189 # The child spans may come in separate batches, but OTLP export works 190 spans_found = True 191 break 192 193 # Assert that spans were found in collector logs 194 assert spans_found, ( 195 f"Expected spans not found in collector logs after 60 seconds. " 196 f"Logs: {collector_logs[:2000]}" 197 ) 198 199 200 @pytest.mark.skipif(is_windows(), reason="Otel collector docker image does not support Windows") 201 def test_dual_export_to_mlflow_and_otel(otel_collector, monkeypatch): 202 """ 203 Test that dual export mode sends traces to both MLflow and OTLP collector. 204 """ 205 _, _, port = otel_collector 206 monkeypatch.setenv(MLFLOW_TRACE_ENABLE_OTLP_DUAL_EXPORT.name, "true") 207 monkeypatch.setenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", f"http://127.0.0.1:{port}/v1/traces") 208 209 experiment = mlflow.set_experiment("dual_export_test") 210 211 processors = _get_tracer("test").span_processor._span_processors 212 assert len(processors) == 2 213 assert isinstance(processors[0], OtelSpanProcessor) 214 assert isinstance(processors[1], MlflowV3SpanProcessor) 215 216 @mlflow.trace(name="parent_span") 217 def parent_function(): 218 result = child_function("Hello", "World") 219 return f"Parent: {result}" 220 221 @mlflow.trace(name="child_span") 222 def child_function(arg1, arg2): 223 # Test that update_current_trace works in dual export mode 224 mlflow.update_current_trace({"env": "production", "version": "1.0"}) 225 return f"{arg1} {arg2}" 226 227 result = parent_function() 228 assert result == "Parent: Hello World" 229 230 mlflow.flush_trace_async_logging() 231 232 client = MlflowClient() 233 traces = client.search_traces(locations=[experiment.experiment_id]) 234 assert len(traces) == 1 235 trace = traces[0] 236 assert len(trace.data.spans) == 2 237 238 # Verify trace tags were set correctly 239 assert "env" in trace.info.tags 240 assert trace.info.tags["env"] == "production" 241 assert "version" in trace.info.tags 242 assert trace.info.tags["version"] == "1.0" 243 244 # Wait for collector to receive spans, checking every second for up to 60 seconds 245 _, output_file, _ = otel_collector 246 spans_found = False 247 for _ in range(60): 248 time.sleep(1) 249 with open(output_file) as f: 250 collector_logs = f.read() 251 # Check if spans are in the logs - the debug exporter outputs span details 252 # Look for evidence that spans were received 253 if "parent_span" in collector_logs or "child_span" in collector_logs: 254 # Evidence of traces being exported to OTLP 255 spans_found = True 256 break 257 258 # Assert that spans were found in collector logs 259 assert spans_found, ( 260 f"Expected spans not found in collector logs after 60 seconds. " 261 f"Logs: {collector_logs[:2000]}" 262 ) 263 264 265 @pytest.mark.parametrize( 266 ("encoding", "compress_fn", "data"), 267 [ 268 ("gzip", gzip.compress, b"otlp-data-test"), 269 ("deflate", zlib.compress, b"otlp-deflate-data"), 270 ("deflate", lambda d: zlib.compress(d)[2:-4], b"raw-deflate-data"), # Raw deflate 271 ], 272 ids=["gzip", "deflate-rfc", "deflate-raw"], 273 ) 274 def test_decompress_otlp_body_valid( 275 encoding: str, compress_fn: Callable[[bytes], bytes], data: bytes 276 ): 277 compressed = compress_fn(data) 278 output = decompress_otlp_body(compressed, encoding) 279 assert output == data 280 281 282 @pytest.mark.parametrize( 283 ("encoding", "invalid_data", "expected_error"), 284 [ 285 ("gzip", b"not-gzip-data", r"Failed to decompress gzip payload"), 286 ("deflate", b"not-deflate-data", r"Failed to decompress deflate payload"), 287 ("unknown-encoding", b"xxx", r"Unsupported Content-Encoding"), 288 ], 289 ids=["gzip-invalid", "deflate-invalid", "unknown-encoding"], 290 ) 291 def test_decompress_otlp_body_invalid(encoding: str, invalid_data: bytes, expected_error: str): 292 with pytest.raises(HTTPException, match=expected_error, check=lambda e: e.status_code == 400): 293 decompress_otlp_body(invalid_data, encoding)