test_tracing_storage.py
1 import datetime 2 import tempfile 3 import uuid 4 5 import pytest 6 from opentelemetry.proto.collector.trace.v1 import trace_service_pb2 7 from opentelemetry.proto.trace.v1 import trace_pb2 8 from sqlalchemy import create_engine 9 from sqlalchemy.pool import NullPool 10 11 from evidently.core.datasets import DataDefinition 12 from evidently.legacy.core import new_id 13 from evidently.ui.service.datasets.data_source import TracingDataSource 14 from evidently.ui.service.datasets.metadata import DatasetMetadata 15 from evidently.ui.service.datasets.metadata import DatasetOrigin 16 from evidently.ui.service.datasets.metadata import FileDatasetMetadataStorage 17 from evidently.ui.service.tracing.storage.base import ExportID 18 from evidently.ui.service.tracing.storage.file import FileTracingStorage 19 from evidently.ui.service.tracing.storage.sql import SQLTracingStorage 20 21 22 @pytest.fixture 23 def tmp_path(): 24 """Create a temporary directory.""" 25 with tempfile.TemporaryDirectory() as tmpdir: 26 yield tmpdir 27 28 29 @pytest.fixture 30 def sqlite_engine(): 31 """Create a temporary SQLite database for testing.""" 32 import gc 33 import os 34 import sys 35 import time 36 37 with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f: 38 db_path = f.name 39 40 engine = create_engine(f"sqlite:///{db_path}", poolclass=NullPool) 41 from evidently.ui.service.storage.sql.utils import migrate_database 42 43 try: 44 migrate_database(f"sqlite:///{db_path}") 45 except Exception: 46 from evidently.ui.service.tracing.storage.sql import TraceSpanModel 47 48 TraceSpanModel.__table__.create(engine, checkfirst=True) 49 50 yield engine 51 52 engine.dispose(close=True) 53 gc.collect() 54 55 if sys.platform == "win32": 56 time.sleep(0.2) 57 58 max_retries = 10 59 for attempt in range(max_retries): 60 try: 61 os.unlink(db_path) 62 break 63 except PermissionError: 64 if attempt < max_retries - 1: 65 time.sleep(0.1 * (attempt + 1)) 66 gc.collect() 67 else: 68 if sys.platform == "win32": 69 import warnings 70 71 warnings.warn(f"Could not delete SQLite database file {db_path} on Windows.") 72 else: 73 raise 74 75 76 @pytest.fixture 77 def file_storage(tmp_path, dataset_metadata_storage): 78 """Create file-based tracing storage.""" 79 return FileTracingStorage(base_path=tmp_path) 80 81 82 @pytest.fixture 83 def sql_storage(sqlite_engine): 84 """Create SQL-based tracing storage.""" 85 return SQLTracingStorage(engine=sqlite_engine) 86 87 88 @pytest.fixture 89 def project_id(): 90 """Create a test project ID.""" 91 return new_id() 92 93 94 @pytest.fixture 95 def user_id(): 96 """Create a test user ID.""" 97 return new_id() 98 99 100 @pytest.fixture 101 def export_id(): 102 """Create a test export ID.""" 103 return new_id() 104 105 106 @pytest.fixture 107 def dataset_metadata_storage(tmp_path): 108 """Create dataset metadata storage.""" 109 return FileDatasetMetadataStorage(base_path=tmp_path) 110 111 112 @pytest.fixture 113 def create_tracing_dataset(dataset_metadata_storage, project_id, user_id, export_id): 114 """Create a tracing dataset metadata.""" 115 import asyncio 116 117 dataset = DatasetMetadata( 118 id=export_id, 119 project_id=project_id, 120 author_id=user_id, 121 name="test-tracing-dataset", 122 source=TracingDataSource(export_id=export_id), 123 data_definition=DataDefinition(), 124 description="", 125 size_bytes=0, 126 row_count=0, 127 column_count=0, 128 all_columns=[], 129 is_draft=False, 130 draft_params=None, 131 origin=DatasetOrigin.tracing, 132 metadata={}, 133 tags=[], 134 ) 135 asyncio.run(dataset_metadata_storage.add_dataset_metadata(user_id, project_id, dataset)) 136 return dataset 137 138 139 @pytest.fixture 140 def trace_id(): 141 """Create a test trace ID.""" 142 return uuid.uuid4() 143 144 145 def create_test_trace_request( 146 export_id: ExportID, trace_id: uuid.UUID, service_name: str = "test-service" 147 ) -> trace_service_pb2.ExportTraceServiceRequest: 148 """Create a test trace request.""" 149 request = trace_service_pb2.ExportTraceServiceRequest() 150 151 resource_span = request.resource_spans.add() 152 attr1 = resource_span.resource.attributes.add() 153 attr1.key = "service.name" 154 attr1.value.string_value = service_name 155 attr2 = resource_span.resource.attributes.add() 156 attr2.key = "evidently.export_id" 157 attr2.value.string_value = str(export_id) 158 159 scope_span = resource_span.scope_spans.add() 160 scope_span.scope.name = "test-scope" 161 162 span = scope_span.spans.add() 163 span.trace_id = trace_id.bytes[:16] 164 span_id_bytes = uuid.uuid4().bytes[:8] 165 span.span_id = span_id_bytes 166 span.parent_span_id = b"" 167 span.name = "test-span" 168 span.kind = trace_pb2.Span.SpanKind.SPAN_KIND_SERVER 169 now = datetime.datetime.now() 170 span.start_time_unix_nano = int(now.timestamp() * 1000000000) 171 span.end_time_unix_nano = int((now.timestamp() + 0.1) * 1000000000) 172 173 attr1 = span.attributes.add() 174 attr1.key = "input" 175 attr1.value.string_value = "test input" 176 attr2 = span.attributes.add() 177 attr2.key = "output" 178 attr2.value.string_value = "test output" 179 attr3 = span.attributes.add() 180 attr3.key = "session_id" 181 attr3.value.string_value = "session-123" 182 attr4 = span.attributes.add() 183 attr4.key = "user_id" 184 attr4.value.string_value = "user-456" 185 186 return request 187 188 189 def create_test_trace_request_with_multiple_spans( 190 export_id: ExportID, trace_id: uuid.UUID, service_name: str = "test-service" 191 ) -> trace_service_pb2.ExportTraceServiceRequest: 192 """Create a test trace request with multiple spans.""" 193 request = trace_service_pb2.ExportTraceServiceRequest() 194 195 resource_span = request.resource_spans.add() 196 attr1 = resource_span.resource.attributes.add() 197 attr1.key = "service.name" 198 attr1.value.string_value = service_name 199 attr2 = resource_span.resource.attributes.add() 200 attr2.key = "evidently.export_id" 201 attr2.value.string_value = str(export_id) 202 203 scope_span = resource_span.scope_spans.add() 204 scope_span.scope.name = "test-scope" 205 206 base_time = datetime.datetime.now() 207 parent_span_id = uuid.uuid4().bytes[:8] 208 209 span1 = scope_span.spans.add() 210 span1.trace_id = trace_id.bytes[:16] 211 span1.span_id = parent_span_id 212 span1.parent_span_id = b"" 213 span1.name = "parent-span" 214 span1.kind = trace_pb2.Span.SpanKind.SPAN_KIND_SERVER 215 span1.start_time_unix_nano = int(base_time.timestamp() * 1000000000) 216 span1.end_time_unix_nano = int((base_time.timestamp() + 0.2) * 1000000000) 217 attr1 = span1.attributes.add() 218 attr1.key = "session_id" 219 attr1.value.string_value = "session-123" 220 221 span2 = scope_span.spans.add() 222 span2.trace_id = trace_id.bytes[:16] 223 span2.span_id = uuid.uuid4().bytes[:8] 224 span2.parent_span_id = parent_span_id 225 span2.name = "child-span" 226 span2.kind = trace_pb2.Span.SpanKind.SPAN_KIND_INTERNAL 227 span2.start_time_unix_nano = int((base_time.timestamp() + 0.05) * 1000000000) 228 span2.end_time_unix_nano = int((base_time.timestamp() + 0.15) * 1000000000) 229 attr1 = span2.attributes.add() 230 attr1.key = "input" 231 attr1.value.string_value = "test input" 232 attr2 = span2.attributes.add() 233 attr2.key = "output" 234 attr2.value.string_value = "test output" 235 236 return request 237 238 239 @pytest.mark.parametrize("storage", ["file_storage", "sql_storage"]) 240 def test_save_trace(request, storage, export_id, trace_id, create_tracing_dataset): 241 """Test saving a trace.""" 242 storage_instance = request.getfixturevalue(storage) 243 trace_request = create_test_trace_request(export_id, trace_id) 244 245 storage_instance.save(export_id, "test-service", trace_request) 246 247 traces = storage_instance.read_traces_with_filter(export_id, None, None) 248 assert len(traces) == 1 249 assert traces[0].trace_id == str(trace_id) 250 assert len(traces[0].spans) == 1 251 assert traces[0].spans[0].span_name == "test-span" 252 253 254 @pytest.mark.parametrize("storage", ["file_storage", "sql_storage"]) 255 def test_save_multiple_traces(request, storage, export_id, create_tracing_dataset): 256 """Test saving multiple traces.""" 257 storage_instance = request.getfixturevalue(storage) 258 trace_id1 = uuid.uuid4() 259 trace_id2 = uuid.uuid4() 260 261 trace_request1 = create_test_trace_request(export_id, trace_id1) 262 trace_request2 = create_test_trace_request(export_id, trace_id2) 263 264 storage_instance.save(export_id, "test-service", trace_request1) 265 storage_instance.save(export_id, "test-service", trace_request2) 266 267 traces = storage_instance.read_traces_with_filter(export_id, None, None) 268 assert len(traces) == 2 269 trace_ids = {t.trace_id for t in traces} 270 assert str(trace_id1) in trace_ids 271 assert str(trace_id2) in trace_ids 272 273 274 @pytest.mark.parametrize("storage", ["file_storage", "sql_storage"]) 275 def test_read_as_dataframe(request, storage, export_id, trace_id, create_tracing_dataset): 276 """Test reading traces as DataFrame.""" 277 storage_instance = request.getfixturevalue(storage) 278 trace_request = create_test_trace_request(export_id, trace_id) 279 280 storage_instance.save(export_id, "test-service", trace_request) 281 282 df = storage_instance.read_as_dataframe(export_id) 283 assert not df.empty 284 assert "id" in df.columns 285 assert "timestamp" in df.columns 286 assert len(df) == 1 287 assert str(df.iloc[0]["id"]) == str(trace_id) 288 289 290 @pytest.mark.asyncio 291 @pytest.mark.parametrize("storage", ["file_storage", "sql_storage"]) 292 async def test_get_data_definition(request, storage, export_id, trace_id, create_tracing_dataset): 293 """Test getting data definition.""" 294 storage_instance = request.getfixturevalue(storage) 295 trace_request = create_test_trace_request(export_id, trace_id) 296 297 storage_instance.save(export_id, "test-service", trace_request) 298 299 data_definition, columns = await storage_instance.get_data_definition(export_id) 300 assert data_definition is not None 301 assert len(columns) > 0 302 assert "id" in columns 303 assert "timestamp" in columns 304 305 306 @pytest.mark.parametrize("storage", ["file_storage", "sql_storage"]) 307 def test_read_with_filter_timestamp(request, storage, export_id, create_tracing_dataset): 308 """Test reading traces with timestamp filter.""" 309 storage_instance = request.getfixturevalue(storage) 310 trace_id1 = uuid.uuid4() 311 trace_id2 = uuid.uuid4() 312 313 base_time = datetime.datetime.now() 314 trace_request1 = create_test_trace_request(export_id, trace_id1) 315 trace_request2 = create_test_trace_request(export_id, trace_id2) 316 317 storage_instance.save(export_id, "test-service", trace_request1) 318 storage_instance.save(export_id, "test-service", trace_request2) 319 320 timestamp_from = base_time - datetime.timedelta(seconds=1) 321 timestamp_to = base_time + datetime.timedelta(seconds=1) 322 323 df = storage_instance.read_with_filter(export_id, timestamp_from, timestamp_to, None, None) 324 assert len(df) >= 1 325 326 327 @pytest.mark.parametrize("storage", ["file_storage", "sql_storage"]) 328 def test_read_traces_with_filter(request, storage, export_id, create_tracing_dataset): 329 """Test reading traces with filter.""" 330 storage_instance = request.getfixturevalue(storage) 331 trace_id1 = uuid.uuid4() 332 trace_id2 = uuid.uuid4() 333 334 base_time = datetime.datetime.now() 335 trace_request1 = create_test_trace_request(export_id, trace_id1) 336 trace_request2 = create_test_trace_request(export_id, trace_id2) 337 338 storage_instance.save(export_id, "test-service", trace_request1) 339 storage_instance.save(export_id, "test-service", trace_request2) 340 341 timestamp_from = base_time - datetime.timedelta(seconds=1) 342 timestamp_to = base_time + datetime.timedelta(seconds=1) 343 344 traces = storage_instance.read_traces_with_filter(export_id, timestamp_from, timestamp_to) 345 assert len(traces) >= 1 346 347 348 @pytest.mark.parametrize("storage", ["file_storage", "sql_storage"]) 349 def test_read_traces_with_multiple_spans(request, storage, export_id, trace_id, create_tracing_dataset): 350 """Test reading traces with multiple spans.""" 351 storage_instance = request.getfixturevalue(storage) 352 trace_request = create_test_trace_request_with_multiple_spans(export_id, trace_id) 353 354 storage_instance.save(export_id, "test-service", trace_request) 355 356 traces = storage_instance.read_traces_with_filter(export_id, None, None) 357 assert len(traces) == 1 358 assert len(traces[0].spans) == 2 359 360 span_names = {span.span_name for span in traces[0].spans} 361 assert "parent-span" in span_names 362 assert "child-span" in span_names 363 364 365 @pytest.mark.asyncio 366 @pytest.mark.parametrize("storage", ["file_storage", "sql_storage"]) 367 async def test_delete_trace(request, storage, export_id, trace_id, create_tracing_dataset): 368 """Test deleting a trace.""" 369 storage_instance = request.getfixturevalue(storage) 370 trace_request = create_test_trace_request(export_id, trace_id) 371 372 storage_instance.save(export_id, "test-service", trace_request) 373 374 traces_before = storage_instance.read_traces_with_filter(export_id, None, None) 375 assert len(traces_before) == 1 376 377 await storage_instance.delete_trace(export_id, str(trace_id)) 378 379 traces_after = storage_instance.read_traces_with_filter(export_id, None, None) 380 assert len(traces_after) == 0 381 382 383 @pytest.mark.parametrize("storage", ["file_storage", "sql_storage"]) 384 def test_get_trace_range_for_run(request, storage, export_id, create_tracing_dataset): 385 """Test getting trace range for a run.""" 386 storage_instance = request.getfixturevalue(storage) 387 trace_id1 = uuid.uuid4() 388 trace_id2 = uuid.uuid4() 389 390 base_time = datetime.datetime.now() 391 trace_request1 = create_test_trace_request(export_id, trace_id1) 392 trace_request2 = create_test_trace_request(export_id, trace_id2) 393 394 storage_instance.save(export_id, "test-service", trace_request1) 395 storage_instance.save(export_id, "test-service", trace_request2) 396 397 start_time = base_time - datetime.timedelta(seconds=1) 398 end_time = base_time + datetime.timedelta(seconds=1) 399 400 max_trace_id = storage_instance.get_trace_range_for_run(0, start_time, end_time) 401 assert max_trace_id is not None 402 403 404 @pytest.mark.parametrize("storage", ["file_storage", "sql_storage"]) 405 def test_read_empty_export_id(request, storage, export_id): 406 """Test reading from non-existent export ID.""" 407 storage_instance = request.getfixturevalue(storage) 408 409 df = storage_instance.read_as_dataframe(export_id) 410 assert df.empty 411 412 traces = storage_instance.read_traces_with_filter(export_id, None, None) 413 assert len(traces) == 0 414 415 416 @pytest.mark.parametrize("storage", ["file_storage", "sql_storage"]) 417 def test_append_mode(request, storage, export_id, create_tracing_dataset): 418 """Test that file storage supports append mode.""" 419 storage_instance = request.getfixturevalue(storage) 420 trace_id1 = uuid.uuid4() 421 trace_id2 = uuid.uuid4() 422 423 trace_request1 = create_test_trace_request(export_id, trace_id1) 424 trace_request2 = create_test_trace_request(export_id, trace_id2) 425 426 storage_instance.save(export_id, "test-service", trace_request1) 427 storage_instance.save(export_id, "test-service", trace_request2) 428 429 traces = storage_instance.read_traces_with_filter(export_id, None, None) 430 assert len(traces) == 2 431 432 433 @pytest.mark.parametrize("storage", ["file_storage", "sql_storage"]) 434 def test_trace_attributes_preserved(request, storage, export_id, trace_id, create_tracing_dataset): 435 """Test that trace attributes are preserved.""" 436 storage_instance = request.getfixturevalue(storage) 437 trace_request = create_test_trace_request(export_id, trace_id) 438 439 storage_instance.save(export_id, "test-service", trace_request) 440 441 traces = storage_instance.read_traces_with_filter(export_id, None, None) 442 assert len(traces) == 1 443 trace = traces[0] 444 445 assert trace.trace_id == str(trace_id) 446 assert len(trace.spans) == 1 447 span = trace.spans[0] 448 449 assert span.span_name == "test-span" 450 assert "input" in span.attributes 451 assert "output" in span.attributes 452 assert span.attributes["input"] == "test input" 453 assert span.attributes["output"] == "test output" 454 assert span.attributes["session_id"] == "session-123" 455 assert span.attributes["user_id"] == "user-456"