/ tests / tracing / utils / test_otlp.py
test_otlp.py
  1  import gzip
  2  import time
  3  import zlib
  4  from collections.abc import Callable
  5  
  6  import pytest
  7  from fastapi import HTTPException
  8  
  9  import mlflow
 10  from mlflow.entities.span import SpanType
 11  from mlflow.environment_variables import MLFLOW_TRACE_ENABLE_OTLP_DUAL_EXPORT
 12  from mlflow.tracing.processor.mlflow_v3 import MlflowV3SpanProcessor
 13  from mlflow.tracing.processor.otel import OtelSpanProcessor
 14  from mlflow.tracing.provider import _get_trace_exporter, _get_tracer
 15  from mlflow.tracing.provider import provider as mlflow_provider
 16  from mlflow.tracking import MlflowClient
 17  from mlflow.utils.os import is_windows
 18  
 19  from tests.tracing.helper import get_traces
 20  
 21  # OTLP exporters are not installed in some CI jobs
 22  try:
 23      from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
 24          OTLPSpanExporter as GrpcExporter,
 25      )
 26      from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
 27          OTLPSpanExporter as HttpExporter,
 28      )
 29  except ImportError:
 30      pytest.skip("OTLP exporters are not installed", allow_module_level=True)
 31  
 32  from mlflow.exceptions import MlflowException
 33  from mlflow.tracing.utils.otlp import (
 34      decompress_otlp_body,
 35      get_otlp_exporter,
 36      should_use_otlp_exporter,
 37  )
 38  
 39  _TEST_HTTP_OTLP_ENDPOINT = "http://127.0.0.1:4317/v1/traces"
 40  _TEST_HTTPS_OTLP_ENDPOINT = "https://127.0.0.1:4317/v1/traces"
 41  
 42  
 43  @pytest.mark.parametrize(
 44      ("traces_endpoint", "otlp_endpoint", "mlflow_enable", "expected"),
 45      [
 46          # No endpoints configured
 47          (None, None, None, False),  # Default behavior - no export without endpoint
 48          (None, None, "true", False),  # Explicit enable but no endpoint
 49          (None, None, "false", False),  # Explicit disable and no endpoint
 50          # OTEL_EXPORTER_OTLP_TRACES_ENDPOINT configured
 51          (_TEST_HTTP_OTLP_ENDPOINT, None, None, True),  # Default behavior - export enabled
 52          (_TEST_HTTP_OTLP_ENDPOINT, None, "true", True),  # Explicit enable
 53          (_TEST_HTTP_OTLP_ENDPOINT, None, "false", False),  # Explicit disable
 54          # OTEL_EXPORTER_OTLP_ENDPOINT configured
 55          (None, _TEST_HTTP_OTLP_ENDPOINT, None, True),  # Default behavior - export enabled
 56          (None, _TEST_HTTP_OTLP_ENDPOINT, "true", True),  # Explicit enable
 57          (None, _TEST_HTTP_OTLP_ENDPOINT, "false", False),  # Explicit disable
 58          # Both endpoints configured (traces endpoint takes precedence)
 59          (_TEST_HTTP_OTLP_ENDPOINT, _TEST_HTTPS_OTLP_ENDPOINT, None, True),
 60          (_TEST_HTTP_OTLP_ENDPOINT, _TEST_HTTPS_OTLP_ENDPOINT, "false", False),
 61      ],
 62  )
 63  def test_should_use_otlp_exporter(
 64      traces_endpoint, otlp_endpoint, mlflow_enable, expected, monkeypatch
 65  ):
 66      # Clear all relevant environment variables to ensure test isolation
 67      monkeypatch.delenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", raising=False)
 68      monkeypatch.delenv("OTEL_EXPORTER_OTLP_ENDPOINT", raising=False)
 69      monkeypatch.delenv("MLFLOW_ENABLE_OTLP_EXPORTER", raising=False)
 70  
 71      # Set environment variables based on test parameters
 72      if traces_endpoint is not None:
 73          monkeypatch.setenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", traces_endpoint)
 74      if otlp_endpoint is not None:
 75          monkeypatch.setenv("OTEL_EXPORTER_OTLP_ENDPOINT", otlp_endpoint)
 76      if mlflow_enable is not None:
 77          monkeypatch.setenv("MLFLOW_ENABLE_OTLP_EXPORTER", mlflow_enable)
 78  
 79      assert should_use_otlp_exporter() is expected
 80  
 81  
 82  @pytest.mark.parametrize(
 83      ("endpoint", "protocol", "expected_type"),
 84      [
 85          (_TEST_HTTP_OTLP_ENDPOINT, None, GrpcExporter),
 86          (_TEST_HTTP_OTLP_ENDPOINT, "grpc", GrpcExporter),
 87          (_TEST_HTTPS_OTLP_ENDPOINT, "grpc", GrpcExporter),
 88          (_TEST_HTTP_OTLP_ENDPOINT, "http/protobuf", HttpExporter),
 89          (_TEST_HTTPS_OTLP_ENDPOINT, "http/protobuf", HttpExporter),
 90      ],
 91  )
 92  def test_get_otlp_exporter_success(endpoint, protocol, expected_type, monkeypatch):
 93      monkeypatch.setenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", endpoint)
 94      if protocol:
 95          monkeypatch.setenv("OTEL_EXPORTER_OTLP_TRACES_PROTOCOL", protocol)
 96  
 97      exporter = get_otlp_exporter()
 98      assert isinstance(exporter, expected_type)
 99  
100      if isinstance(exporter, GrpcExporter):
101          assert exporter._endpoint == "127.0.0.1:4317"
102      else:
103          assert exporter._endpoint == endpoint
104  
105  
106  def test_get_otlp_exporter_invalid_protocol(monkeypatch):
107      monkeypatch.setenv("OTEL_EXPORTER_OTLP_TRACES_PROTOCOL", _TEST_HTTP_OTLP_ENDPOINT)
108  
109      with pytest.raises(MlflowException, match="Unsupported OTLP protocol"):
110          get_otlp_exporter()
111  
112  
113  @pytest.mark.skipif(is_windows(), reason="Otel collector docker image does not support Windows")
114  @pytest.mark.parametrize("dual_export", [True, False, None], ids=["enable", "disable", "default"])
115  def test_export_to_otel_collector(otel_collector, monkeypatch, dual_export):
116      if dual_export:
117          monkeypatch.setenv("MLFLOW_TRACE_ENABLE_OTLP_DUAL_EXPORT", "true")
118      elif dual_export is False:
119          monkeypatch.setenv("MLFLOW_TRACE_ENABLE_OTLP_DUAL_EXPORT", "false")
120  
121      from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
122  
123      _, _, port = otel_collector
124      monkeypatch.setenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", f"http://127.0.0.1:{port}/v1/traces")
125  
126      class TestModel:
127          @mlflow.trace()
128          def predict(self, x, y):
129              z = x + y
130              z = self.add_one(z)
131              z = mlflow.trace(self.square)(z)
132              return z  # noqa: RET504
133  
134          @mlflow.trace(
135              span_type=SpanType.LLM, name="add_one_with_custom_name", attributes={"delta": 1}
136          )
137          def add_one(self, z):
138              return z + 1
139  
140          def square(self, t):
141              res = t**2
142              time.sleep(0.1)
143              return res
144  
145      model = TestModel()
146      model.predict(2, 5)
147  
148      # Tracer should be configured to export to OTLP.
149      # In dual-export mode, _get_trace_exporter() returns the MLflow exporter
150      # (since _get_span_processor prefers BaseMlflowSpanProcessor), so we find
151      # the OTLP exporter by looking for the OtelSpanProcessor directly.
152      if dual_export:
153          # In dual-export mode, _get_trace_exporter() returns the MLflow exporter
154          # (since _get_span_processor prefers BaseMlflowSpanProcessor). Find the
155          # OTLP exporter by looking up the OtelSpanProcessor from the tracer provider.
156          tp = mlflow_provider.get()
157          processors = tp._active_span_processor._span_processors
158          otel_processor = next(p for p in processors if isinstance(p, OtelSpanProcessor))
159          exporter = otel_processor.span_exporter
160      else:
161          exporter = _get_trace_exporter()
162      assert isinstance(exporter, OTLPSpanExporter)
163      assert exporter._endpoint == f"127.0.0.1:{port}"
164  
165      mlflow_traces = get_traces()
166      if dual_export:
167          assert len(mlflow_traces) == 1
168          assert mlflow_traces[0].info.state == "OK"
169          assert len(mlflow_traces[0].data.spans) == 3
170      else:
171          assert len(mlflow_traces) == 0
172  
173      # Wait for collector to receive spans, checking every second for up to 60 seconds
174      _, output_file, _ = otel_collector
175      spans_found = False
176      for _ in range(60):
177          time.sleep(1)
178          with open(output_file) as f:
179              collector_logs = f.read()
180          # Check if spans are in the logs - the debug exporter outputs span details
181          # The BatchSpanProcessor may send spans in multiple batches, so we check for any evidence
182          # that the collector is receiving spans from our test
183          if (
184              "predict" in collector_logs
185              and "add_one_with_custom_name" in collector_logs
186              and "square" in collector_logs
187          ):
188              # We found evidence that spans are being exported to the collector
189              # The child spans may come in separate batches, but OTLP export works
190              spans_found = True
191              break
192  
193      # Assert that spans were found in collector logs
194      assert spans_found, (
195          f"Expected spans not found in collector logs after 60 seconds. "
196          f"Logs: {collector_logs[:2000]}"
197      )
198  
199  
200  @pytest.mark.skipif(is_windows(), reason="Otel collector docker image does not support Windows")
201  def test_dual_export_to_mlflow_and_otel(otel_collector, monkeypatch):
202      """
203      Test that dual export mode sends traces to both MLflow and OTLP collector.
204      """
205      _, _, port = otel_collector
206      monkeypatch.setenv(MLFLOW_TRACE_ENABLE_OTLP_DUAL_EXPORT.name, "true")
207      monkeypatch.setenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", f"http://127.0.0.1:{port}/v1/traces")
208  
209      experiment = mlflow.set_experiment("dual_export_test")
210  
211      processors = _get_tracer("test").span_processor._span_processors
212      assert len(processors) == 2
213      assert isinstance(processors[0], OtelSpanProcessor)
214      assert isinstance(processors[1], MlflowV3SpanProcessor)
215  
216      @mlflow.trace(name="parent_span")
217      def parent_function():
218          result = child_function("Hello", "World")
219          return f"Parent: {result}"
220  
221      @mlflow.trace(name="child_span")
222      def child_function(arg1, arg2):
223          # Test that update_current_trace works in dual export mode
224          mlflow.update_current_trace({"env": "production", "version": "1.0"})
225          return f"{arg1} {arg2}"
226  
227      result = parent_function()
228      assert result == "Parent: Hello World"
229  
230      mlflow.flush_trace_async_logging()
231  
232      client = MlflowClient()
233      traces = client.search_traces(locations=[experiment.experiment_id])
234      assert len(traces) == 1
235      trace = traces[0]
236      assert len(trace.data.spans) == 2
237  
238      # Verify trace tags were set correctly
239      assert "env" in trace.info.tags
240      assert trace.info.tags["env"] == "production"
241      assert "version" in trace.info.tags
242      assert trace.info.tags["version"] == "1.0"
243  
244      # Wait for collector to receive spans, checking every second for up to 60 seconds
245      _, output_file, _ = otel_collector
246      spans_found = False
247      for _ in range(60):
248          time.sleep(1)
249          with open(output_file) as f:
250              collector_logs = f.read()
251          # Check if spans are in the logs - the debug exporter outputs span details
252          # Look for evidence that spans were received
253          if "parent_span" in collector_logs or "child_span" in collector_logs:
254              # Evidence of traces being exported to OTLP
255              spans_found = True
256              break
257  
258      # Assert that spans were found in collector logs
259      assert spans_found, (
260          f"Expected spans not found in collector logs after 60 seconds. "
261          f"Logs: {collector_logs[:2000]}"
262      )
263  
264  
265  @pytest.mark.parametrize(
266      ("encoding", "compress_fn", "data"),
267      [
268          ("gzip", gzip.compress, b"otlp-data-test"),
269          ("deflate", zlib.compress, b"otlp-deflate-data"),
270          ("deflate", lambda d: zlib.compress(d)[2:-4], b"raw-deflate-data"),  # Raw deflate
271      ],
272      ids=["gzip", "deflate-rfc", "deflate-raw"],
273  )
274  def test_decompress_otlp_body_valid(
275      encoding: str, compress_fn: Callable[[bytes], bytes], data: bytes
276  ):
277      compressed = compress_fn(data)
278      output = decompress_otlp_body(compressed, encoding)
279      assert output == data
280  
281  
282  @pytest.mark.parametrize(
283      ("encoding", "invalid_data", "expected_error"),
284      [
285          ("gzip", b"not-gzip-data", r"Failed to decompress gzip payload"),
286          ("deflate", b"not-deflate-data", r"Failed to decompress deflate payload"),
287          ("unknown-encoding", b"xxx", r"Unsupported Content-Encoding"),
288      ],
289      ids=["gzip-invalid", "deflate-invalid", "unknown-encoding"],
290  )
291  def test_decompress_otlp_body_invalid(encoding: str, invalid_data: bytes, expected_error: str):
292      with pytest.raises(HTTPException, match=expected_error, check=lambda e: e.status_code == 400):
293          decompress_otlp_body(invalid_data, encoding)