/ tests / ui / test_tracing_storage.py
test_tracing_storage.py
  1  import datetime
  2  import tempfile
  3  import uuid
  4  
  5  import pytest
  6  from opentelemetry.proto.collector.trace.v1 import trace_service_pb2
  7  from opentelemetry.proto.trace.v1 import trace_pb2
  8  from sqlalchemy import create_engine
  9  from sqlalchemy.pool import NullPool
 10  
 11  from evidently.core.datasets import DataDefinition
 12  from evidently.legacy.core import new_id
 13  from evidently.ui.service.datasets.data_source import TracingDataSource
 14  from evidently.ui.service.datasets.metadata import DatasetMetadata
 15  from evidently.ui.service.datasets.metadata import DatasetOrigin
 16  from evidently.ui.service.datasets.metadata import FileDatasetMetadataStorage
 17  from evidently.ui.service.tracing.storage.base import ExportID
 18  from evidently.ui.service.tracing.storage.file import FileTracingStorage
 19  from evidently.ui.service.tracing.storage.sql import SQLTracingStorage
 20  
 21  
 22  @pytest.fixture
 23  def tmp_path():
 24      """Create a temporary directory."""
 25      with tempfile.TemporaryDirectory() as tmpdir:
 26          yield tmpdir
 27  
 28  
 29  @pytest.fixture
 30  def sqlite_engine():
 31      """Create a temporary SQLite database for testing."""
 32      import gc
 33      import os
 34      import sys
 35      import time
 36  
 37      with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
 38          db_path = f.name
 39  
 40      engine = create_engine(f"sqlite:///{db_path}", poolclass=NullPool)
 41      from evidently.ui.service.storage.sql.utils import migrate_database
 42  
 43      try:
 44          migrate_database(f"sqlite:///{db_path}")
 45      except Exception:
 46          from evidently.ui.service.tracing.storage.sql import TraceSpanModel
 47  
 48          TraceSpanModel.__table__.create(engine, checkfirst=True)
 49  
 50      yield engine
 51  
 52      engine.dispose(close=True)
 53      gc.collect()
 54  
 55      if sys.platform == "win32":
 56          time.sleep(0.2)
 57  
 58      max_retries = 10
 59      for attempt in range(max_retries):
 60          try:
 61              os.unlink(db_path)
 62              break
 63          except PermissionError:
 64              if attempt < max_retries - 1:
 65                  time.sleep(0.1 * (attempt + 1))
 66                  gc.collect()
 67              else:
 68                  if sys.platform == "win32":
 69                      import warnings
 70  
 71                      warnings.warn(f"Could not delete SQLite database file {db_path} on Windows.")
 72                  else:
 73                      raise
 74  
 75  
 76  @pytest.fixture
 77  def file_storage(tmp_path, dataset_metadata_storage):
 78      """Create file-based tracing storage."""
 79      return FileTracingStorage(base_path=tmp_path)
 80  
 81  
 82  @pytest.fixture
 83  def sql_storage(sqlite_engine):
 84      """Create SQL-based tracing storage."""
 85      return SQLTracingStorage(engine=sqlite_engine)
 86  
 87  
 88  @pytest.fixture
 89  def project_id():
 90      """Create a test project ID."""
 91      return new_id()
 92  
 93  
 94  @pytest.fixture
 95  def user_id():
 96      """Create a test user ID."""
 97      return new_id()
 98  
 99  
100  @pytest.fixture
101  def export_id():
102      """Create a test export ID."""
103      return new_id()
104  
105  
106  @pytest.fixture
107  def dataset_metadata_storage(tmp_path):
108      """Create dataset metadata storage."""
109      return FileDatasetMetadataStorage(base_path=tmp_path)
110  
111  
112  @pytest.fixture
113  def create_tracing_dataset(dataset_metadata_storage, project_id, user_id, export_id):
114      """Create a tracing dataset metadata."""
115      import asyncio
116  
117      dataset = DatasetMetadata(
118          id=export_id,
119          project_id=project_id,
120          author_id=user_id,
121          name="test-tracing-dataset",
122          source=TracingDataSource(export_id=export_id),
123          data_definition=DataDefinition(),
124          description="",
125          size_bytes=0,
126          row_count=0,
127          column_count=0,
128          all_columns=[],
129          is_draft=False,
130          draft_params=None,
131          origin=DatasetOrigin.tracing,
132          metadata={},
133          tags=[],
134      )
135      asyncio.run(dataset_metadata_storage.add_dataset_metadata(user_id, project_id, dataset))
136      return dataset
137  
138  
139  @pytest.fixture
140  def trace_id():
141      """Create a test trace ID."""
142      return uuid.uuid4()
143  
144  
145  def create_test_trace_request(
146      export_id: ExportID, trace_id: uuid.UUID, service_name: str = "test-service"
147  ) -> trace_service_pb2.ExportTraceServiceRequest:
148      """Create a test trace request."""
149      request = trace_service_pb2.ExportTraceServiceRequest()
150  
151      resource_span = request.resource_spans.add()
152      attr1 = resource_span.resource.attributes.add()
153      attr1.key = "service.name"
154      attr1.value.string_value = service_name
155      attr2 = resource_span.resource.attributes.add()
156      attr2.key = "evidently.export_id"
157      attr2.value.string_value = str(export_id)
158  
159      scope_span = resource_span.scope_spans.add()
160      scope_span.scope.name = "test-scope"
161  
162      span = scope_span.spans.add()
163      span.trace_id = trace_id.bytes[:16]
164      span_id_bytes = uuid.uuid4().bytes[:8]
165      span.span_id = span_id_bytes
166      span.parent_span_id = b""
167      span.name = "test-span"
168      span.kind = trace_pb2.Span.SpanKind.SPAN_KIND_SERVER
169      now = datetime.datetime.now()
170      span.start_time_unix_nano = int(now.timestamp() * 1000000000)
171      span.end_time_unix_nano = int((now.timestamp() + 0.1) * 1000000000)
172  
173      attr1 = span.attributes.add()
174      attr1.key = "input"
175      attr1.value.string_value = "test input"
176      attr2 = span.attributes.add()
177      attr2.key = "output"
178      attr2.value.string_value = "test output"
179      attr3 = span.attributes.add()
180      attr3.key = "session_id"
181      attr3.value.string_value = "session-123"
182      attr4 = span.attributes.add()
183      attr4.key = "user_id"
184      attr4.value.string_value = "user-456"
185  
186      return request
187  
188  
189  def create_test_trace_request_with_multiple_spans(
190      export_id: ExportID, trace_id: uuid.UUID, service_name: str = "test-service"
191  ) -> trace_service_pb2.ExportTraceServiceRequest:
192      """Create a test trace request with multiple spans."""
193      request = trace_service_pb2.ExportTraceServiceRequest()
194  
195      resource_span = request.resource_spans.add()
196      attr1 = resource_span.resource.attributes.add()
197      attr1.key = "service.name"
198      attr1.value.string_value = service_name
199      attr2 = resource_span.resource.attributes.add()
200      attr2.key = "evidently.export_id"
201      attr2.value.string_value = str(export_id)
202  
203      scope_span = resource_span.scope_spans.add()
204      scope_span.scope.name = "test-scope"
205  
206      base_time = datetime.datetime.now()
207      parent_span_id = uuid.uuid4().bytes[:8]
208  
209      span1 = scope_span.spans.add()
210      span1.trace_id = trace_id.bytes[:16]
211      span1.span_id = parent_span_id
212      span1.parent_span_id = b""
213      span1.name = "parent-span"
214      span1.kind = trace_pb2.Span.SpanKind.SPAN_KIND_SERVER
215      span1.start_time_unix_nano = int(base_time.timestamp() * 1000000000)
216      span1.end_time_unix_nano = int((base_time.timestamp() + 0.2) * 1000000000)
217      attr1 = span1.attributes.add()
218      attr1.key = "session_id"
219      attr1.value.string_value = "session-123"
220  
221      span2 = scope_span.spans.add()
222      span2.trace_id = trace_id.bytes[:16]
223      span2.span_id = uuid.uuid4().bytes[:8]
224      span2.parent_span_id = parent_span_id
225      span2.name = "child-span"
226      span2.kind = trace_pb2.Span.SpanKind.SPAN_KIND_INTERNAL
227      span2.start_time_unix_nano = int((base_time.timestamp() + 0.05) * 1000000000)
228      span2.end_time_unix_nano = int((base_time.timestamp() + 0.15) * 1000000000)
229      attr1 = span2.attributes.add()
230      attr1.key = "input"
231      attr1.value.string_value = "test input"
232      attr2 = span2.attributes.add()
233      attr2.key = "output"
234      attr2.value.string_value = "test output"
235  
236      return request
237  
238  
239  @pytest.mark.parametrize("storage", ["file_storage", "sql_storage"])
240  def test_save_trace(request, storage, export_id, trace_id, create_tracing_dataset):
241      """Test saving a trace."""
242      storage_instance = request.getfixturevalue(storage)
243      trace_request = create_test_trace_request(export_id, trace_id)
244  
245      storage_instance.save(export_id, "test-service", trace_request)
246  
247      traces = storage_instance.read_traces_with_filter(export_id, None, None)
248      assert len(traces) == 1
249      assert traces[0].trace_id == str(trace_id)
250      assert len(traces[0].spans) == 1
251      assert traces[0].spans[0].span_name == "test-span"
252  
253  
254  @pytest.mark.parametrize("storage", ["file_storage", "sql_storage"])
255  def test_save_multiple_traces(request, storage, export_id, create_tracing_dataset):
256      """Test saving multiple traces."""
257      storage_instance = request.getfixturevalue(storage)
258      trace_id1 = uuid.uuid4()
259      trace_id2 = uuid.uuid4()
260  
261      trace_request1 = create_test_trace_request(export_id, trace_id1)
262      trace_request2 = create_test_trace_request(export_id, trace_id2)
263  
264      storage_instance.save(export_id, "test-service", trace_request1)
265      storage_instance.save(export_id, "test-service", trace_request2)
266  
267      traces = storage_instance.read_traces_with_filter(export_id, None, None)
268      assert len(traces) == 2
269      trace_ids = {t.trace_id for t in traces}
270      assert str(trace_id1) in trace_ids
271      assert str(trace_id2) in trace_ids
272  
273  
274  @pytest.mark.parametrize("storage", ["file_storage", "sql_storage"])
275  def test_read_as_dataframe(request, storage, export_id, trace_id, create_tracing_dataset):
276      """Test reading traces as DataFrame."""
277      storage_instance = request.getfixturevalue(storage)
278      trace_request = create_test_trace_request(export_id, trace_id)
279  
280      storage_instance.save(export_id, "test-service", trace_request)
281  
282      df = storage_instance.read_as_dataframe(export_id)
283      assert not df.empty
284      assert "id" in df.columns
285      assert "timestamp" in df.columns
286      assert len(df) == 1
287      assert str(df.iloc[0]["id"]) == str(trace_id)
288  
289  
290  @pytest.mark.asyncio
291  @pytest.mark.parametrize("storage", ["file_storage", "sql_storage"])
292  async def test_get_data_definition(request, storage, export_id, trace_id, create_tracing_dataset):
293      """Test getting data definition."""
294      storage_instance = request.getfixturevalue(storage)
295      trace_request = create_test_trace_request(export_id, trace_id)
296  
297      storage_instance.save(export_id, "test-service", trace_request)
298  
299      data_definition, columns = await storage_instance.get_data_definition(export_id)
300      assert data_definition is not None
301      assert len(columns) > 0
302      assert "id" in columns
303      assert "timestamp" in columns
304  
305  
306  @pytest.mark.parametrize("storage", ["file_storage", "sql_storage"])
307  def test_read_with_filter_timestamp(request, storage, export_id, create_tracing_dataset):
308      """Test reading traces with timestamp filter."""
309      storage_instance = request.getfixturevalue(storage)
310      trace_id1 = uuid.uuid4()
311      trace_id2 = uuid.uuid4()
312  
313      base_time = datetime.datetime.now()
314      trace_request1 = create_test_trace_request(export_id, trace_id1)
315      trace_request2 = create_test_trace_request(export_id, trace_id2)
316  
317      storage_instance.save(export_id, "test-service", trace_request1)
318      storage_instance.save(export_id, "test-service", trace_request2)
319  
320      timestamp_from = base_time - datetime.timedelta(seconds=1)
321      timestamp_to = base_time + datetime.timedelta(seconds=1)
322  
323      df = storage_instance.read_with_filter(export_id, timestamp_from, timestamp_to, None, None)
324      assert len(df) >= 1
325  
326  
327  @pytest.mark.parametrize("storage", ["file_storage", "sql_storage"])
328  def test_read_traces_with_filter(request, storage, export_id, create_tracing_dataset):
329      """Test reading traces with filter."""
330      storage_instance = request.getfixturevalue(storage)
331      trace_id1 = uuid.uuid4()
332      trace_id2 = uuid.uuid4()
333  
334      base_time = datetime.datetime.now()
335      trace_request1 = create_test_trace_request(export_id, trace_id1)
336      trace_request2 = create_test_trace_request(export_id, trace_id2)
337  
338      storage_instance.save(export_id, "test-service", trace_request1)
339      storage_instance.save(export_id, "test-service", trace_request2)
340  
341      timestamp_from = base_time - datetime.timedelta(seconds=1)
342      timestamp_to = base_time + datetime.timedelta(seconds=1)
343  
344      traces = storage_instance.read_traces_with_filter(export_id, timestamp_from, timestamp_to)
345      assert len(traces) >= 1
346  
347  
348  @pytest.mark.parametrize("storage", ["file_storage", "sql_storage"])
349  def test_read_traces_with_multiple_spans(request, storage, export_id, trace_id, create_tracing_dataset):
350      """Test reading traces with multiple spans."""
351      storage_instance = request.getfixturevalue(storage)
352      trace_request = create_test_trace_request_with_multiple_spans(export_id, trace_id)
353  
354      storage_instance.save(export_id, "test-service", trace_request)
355  
356      traces = storage_instance.read_traces_with_filter(export_id, None, None)
357      assert len(traces) == 1
358      assert len(traces[0].spans) == 2
359  
360      span_names = {span.span_name for span in traces[0].spans}
361      assert "parent-span" in span_names
362      assert "child-span" in span_names
363  
364  
365  @pytest.mark.asyncio
366  @pytest.mark.parametrize("storage", ["file_storage", "sql_storage"])
367  async def test_delete_trace(request, storage, export_id, trace_id, create_tracing_dataset):
368      """Test deleting a trace."""
369      storage_instance = request.getfixturevalue(storage)
370      trace_request = create_test_trace_request(export_id, trace_id)
371  
372      storage_instance.save(export_id, "test-service", trace_request)
373  
374      traces_before = storage_instance.read_traces_with_filter(export_id, None, None)
375      assert len(traces_before) == 1
376  
377      await storage_instance.delete_trace(export_id, str(trace_id))
378  
379      traces_after = storage_instance.read_traces_with_filter(export_id, None, None)
380      assert len(traces_after) == 0
381  
382  
383  @pytest.mark.parametrize("storage", ["file_storage", "sql_storage"])
384  def test_get_trace_range_for_run(request, storage, export_id, create_tracing_dataset):
385      """Test getting trace range for a run."""
386      storage_instance = request.getfixturevalue(storage)
387      trace_id1 = uuid.uuid4()
388      trace_id2 = uuid.uuid4()
389  
390      base_time = datetime.datetime.now()
391      trace_request1 = create_test_trace_request(export_id, trace_id1)
392      trace_request2 = create_test_trace_request(export_id, trace_id2)
393  
394      storage_instance.save(export_id, "test-service", trace_request1)
395      storage_instance.save(export_id, "test-service", trace_request2)
396  
397      start_time = base_time - datetime.timedelta(seconds=1)
398      end_time = base_time + datetime.timedelta(seconds=1)
399  
400      max_trace_id = storage_instance.get_trace_range_for_run(0, start_time, end_time)
401      assert max_trace_id is not None
402  
403  
404  @pytest.mark.parametrize("storage", ["file_storage", "sql_storage"])
405  def test_read_empty_export_id(request, storage, export_id):
406      """Test reading from non-existent export ID."""
407      storage_instance = request.getfixturevalue(storage)
408  
409      df = storage_instance.read_as_dataframe(export_id)
410      assert df.empty
411  
412      traces = storage_instance.read_traces_with_filter(export_id, None, None)
413      assert len(traces) == 0
414  
415  
416  @pytest.mark.parametrize("storage", ["file_storage", "sql_storage"])
417  def test_append_mode(request, storage, export_id, create_tracing_dataset):
418      """Test that file storage supports append mode."""
419      storage_instance = request.getfixturevalue(storage)
420      trace_id1 = uuid.uuid4()
421      trace_id2 = uuid.uuid4()
422  
423      trace_request1 = create_test_trace_request(export_id, trace_id1)
424      trace_request2 = create_test_trace_request(export_id, trace_id2)
425  
426      storage_instance.save(export_id, "test-service", trace_request1)
427      storage_instance.save(export_id, "test-service", trace_request2)
428  
429      traces = storage_instance.read_traces_with_filter(export_id, None, None)
430      assert len(traces) == 2
431  
432  
433  @pytest.mark.parametrize("storage", ["file_storage", "sql_storage"])
434  def test_trace_attributes_preserved(request, storage, export_id, trace_id, create_tracing_dataset):
435      """Test that trace attributes are preserved."""
436      storage_instance = request.getfixturevalue(storage)
437      trace_request = create_test_trace_request(export_id, trace_id)
438  
439      storage_instance.save(export_id, "test-service", trace_request)
440  
441      traces = storage_instance.read_traces_with_filter(export_id, None, None)
442      assert len(traces) == 1
443      trace = traces[0]
444  
445      assert trace.trace_id == str(trace_id)
446      assert len(trace.spans) == 1
447      span = trace.spans[0]
448  
449      assert span.span_name == "test-span"
450      assert "input" in span.attributes
451      assert "output" in span.attributes
452      assert span.attributes["input"] == "test input"
453      assert span.attributes["output"] == "test output"
454      assert span.attributes["session_id"] == "session-123"
455      assert span.attributes["user_id"] == "user-456"