test_sql_data_storage.py
1 import datetime 2 3 import pytest 4 from sqlalchemy import select 5 from sqlalchemy.orm import Session 6 7 from evidently.core.metric_types import ByLabelValue 8 from evidently.core.metric_types import CountValue 9 from evidently.core.metric_types import MetricConfig 10 from evidently.core.metric_types import SingleValue 11 from evidently.core.serialization import ReportModel 12 from evidently.core.serialization import SnapshotModel 13 from evidently.legacy.core import new_id 14 from evidently.ui.service.base import Project 15 from evidently.ui.service.base import SeriesFilter 16 from evidently.ui.service.storage.sql.data import SQLDataStorage 17 from evidently.ui.service.storage.sql.metadata import SQLProjectMetadataStorage 18 from evidently.ui.service.storage.sql.models import PointSQLModel 19 20 21 @pytest.fixture 22 def data_storage(sqlite_engine): 23 """Create SQL data storage instance.""" 24 return SQLDataStorage(sqlite_engine) 25 26 27 @pytest.fixture 28 def metadata_storage(sqlite_engine): 29 """Create SQL metadata storage instance.""" 30 return SQLProjectMetadataStorage(sqlite_engine) 31 32 33 @pytest.fixture 34 def test_snapshot_id(): 35 """Create a test snapshot ID.""" 36 return new_id() 37 38 39 def create_single_value_snapshot( 40 metric_id: str = "test-metric", value: float = 42.0, metric_type: str = "test_type" 41 ) -> SnapshotModel: 42 """Create a snapshot with a single value metric.""" 43 metric_result = SingleValue(value=value, display_name="Test Metric") 44 metric_result.set_metric_location( 45 MetricConfig(metric_id=metric_id, params={"type": metric_type, "column": "test_column"}) 46 ) 47 48 return SnapshotModel( 49 report=ReportModel(items=[]), 50 name="Test Snapshot", 51 timestamp=datetime.datetime(2023, 1, 1, 0, 0, 0), 52 metadata={}, 53 tags=[], 54 metric_results={metric_id: metric_result}, 55 top_level_metrics=[], 56 widgets=[], 57 tests_widgets=[], 58 ) 59 60 61 def create_by_label_value_snapshot(metric_id: str = "test-metric") -> SnapshotModel: 62 """Create a snapshot with a by-label value metric.""" 63 metric_result = ByLabelValue( 64 display_name="Test By Label Metric", 65 values={ 66 "label1": SingleValue(value=10.0, display_name="Label 1"), 67 "label2": SingleValue(value=20.0, display_name="Label 2"), 68 }, 69 ) 70 for label, val in metric_result.values.items(): 71 val.set_metric_location( 72 MetricConfig(metric_id=metric_id, params={"type": "by_label", "column": "test_column", "label": label}) 73 ) 74 metric_result.set_metric_location( 75 MetricConfig(metric_id=metric_id, params={"type": "by_label", "column": "test_column"}) 76 ) 77 78 return SnapshotModel( 79 report=ReportModel(items=[]), 80 name="Test Snapshot", 81 timestamp=datetime.datetime(2023, 1, 1, 0, 0, 0), 82 metadata={"env": "test"}, 83 tags=["prod"], 84 metric_results={metric_id: metric_result}, 85 top_level_metrics=[], 86 widgets=[], 87 tests_widgets=[], 88 ) 89 90 91 def create_count_value_snapshot(metric_id: str = "test-metric") -> SnapshotModel: 92 """Create a snapshot with a count value metric.""" 93 count = SingleValue(value=100.0, display_name="Count") 94 share = SingleValue(value=0.5, display_name="Share") 95 count.set_metric_location(MetricConfig(metric_id=metric_id, params={"type": "count", "column": "test_column"})) 96 share.set_metric_location(MetricConfig(metric_id=metric_id, params={"type": "count", "column": "test_column"})) 97 98 metric_result = CountValue(display_name="Test Count Metric", count=count, share=share) 99 metric_result.set_metric_location( 100 MetricConfig(metric_id=metric_id, params={"type": "count", "column": "test_column"}) 101 ) 102 103 return SnapshotModel( 104 report=ReportModel(items=[]), 105 name="Test Snapshot", 106 timestamp=datetime.datetime(2023, 1, 2, 0, 0, 0), 107 metadata={}, 108 tags=[], 109 metric_results={metric_id: metric_result}, 110 top_level_metrics=[], 111 widgets=[], 112 tests_widgets=[], 113 ) 114 115 116 @pytest.mark.asyncio 117 async def test_add_snapshot_points_single_value( 118 data_storage, sqlite_engine, metadata_storage, test_user, test_project_id 119 ): 120 """Test adding snapshot points with single value metric.""" 121 # Create project and snapshot first (required for foreign key constraints) 122 project = Project(id=test_project_id, name="Test Project") 123 await metadata_storage.add_project(project, test_user, org_id=None) 124 125 snapshot = create_single_value_snapshot() 126 snapshot_id = await metadata_storage.add_snapshot(test_project_id, snapshot) 127 128 await data_storage.add_snapshot_points(test_project_id, snapshot_id, snapshot) 129 130 # Verify point was stored 131 with Session(sqlite_engine) as session: 132 points = ( 133 session.execute(select(PointSQLModel).where(PointSQLModel.project_id == test_project_id)).scalars().all() 134 ) 135 assert len(points) == 1 136 assert points[0].value == 42.0 137 assert points[0].metric_type == "test_type" 138 assert points[0].snapshot_id == snapshot_id 139 140 141 @pytest.mark.asyncio 142 async def test_add_snapshot_points_by_label_value( 143 data_storage, sqlite_engine, metadata_storage, test_user, test_project_id 144 ): 145 """Test adding snapshot points with by-label value metric.""" 146 # Create project and snapshot first (required for foreign key constraints) 147 project = Project(id=test_project_id, name="Test Project") 148 await metadata_storage.add_project(project, test_user, org_id=None) 149 150 snapshot = create_by_label_value_snapshot() 151 snapshot_id = await metadata_storage.add_snapshot(test_project_id, snapshot) 152 153 await data_storage.add_snapshot_points(test_project_id, snapshot_id, snapshot) 154 155 # Verify points were stored 156 with Session(sqlite_engine) as session: 157 points = ( 158 session.execute(select(PointSQLModel).where(PointSQLModel.project_id == test_project_id)).scalars().all() 159 ) 160 assert len(points) == 2 161 values = {p.value for p in points} 162 assert values == {10.0, 20.0} 163 164 165 @pytest.mark.asyncio 166 async def test_add_snapshot_points_count_value( 167 data_storage, sqlite_engine, metadata_storage, test_user, test_project_id 168 ): 169 """Test adding snapshot points with count value metric.""" 170 # Create project and snapshot first (required for foreign key constraints) 171 project = Project(id=test_project_id, name="Test Project") 172 await metadata_storage.add_project(project, test_user, org_id=None) 173 174 snapshot = create_count_value_snapshot() 175 snapshot_id = await metadata_storage.add_snapshot(test_project_id, snapshot) 176 177 await data_storage.add_snapshot_points(test_project_id, snapshot_id, snapshot) 178 179 # Verify points were stored (count and share) 180 with Session(sqlite_engine) as session: 181 points = ( 182 session.execute(select(PointSQLModel).where(PointSQLModel.project_id == test_project_id)).scalars().all() 183 ) 184 assert len(points) == 2 185 values = {p.value for p in points} 186 assert values == {100.0, 0.5} 187 188 189 @pytest.mark.asyncio 190 async def test_get_metrics(data_storage, sqlite_engine, metadata_storage, test_user, test_project): 191 """Test getting available metrics.""" 192 await metadata_storage.add_project(test_project, test_user, org_id=None) 193 194 snapshot1 = create_single_value_snapshot(metric_id="metric1", metric_type="type1") 195 snapshot2 = create_single_value_snapshot(metric_id="metric2", metric_type="type2") 196 197 snapshot_id1 = await metadata_storage.add_snapshot(test_project.id, snapshot1) 198 snapshot_id2 = await metadata_storage.add_snapshot(test_project.id, snapshot2) 199 200 await data_storage.add_snapshot_points(test_project.id, snapshot_id1, snapshot1) 201 await data_storage.add_snapshot_points(test_project.id, snapshot_id2, snapshot2) 202 203 metrics = await data_storage.get_metrics(test_project.id, tags=[], metadata={}) 204 assert len(metrics) == 2 205 assert "type1" in metrics 206 assert "type2" in metrics 207 208 209 @pytest.mark.asyncio 210 async def test_get_metrics_with_tags_filter(data_storage, sqlite_engine, metadata_storage, test_user, test_project): 211 """Test getting metrics filtered by tags.""" 212 await metadata_storage.add_project(test_project, test_user, org_id=None) 213 214 snapshot1 = create_single_value_snapshot(metric_id="metric1", metric_type="type1") 215 snapshot2 = create_by_label_value_snapshot(metric_id="metric2") 216 217 snapshot_id1 = await metadata_storage.add_snapshot(test_project.id, snapshot1) 218 snapshot_id2 = await metadata_storage.add_snapshot(test_project.id, snapshot2) 219 220 await data_storage.add_snapshot_points(test_project.id, snapshot_id1, snapshot1) 221 await data_storage.add_snapshot_points(test_project.id, snapshot_id2, snapshot2) 222 223 # Filter by tags 224 metrics = await data_storage.get_metrics(test_project.id, tags=["prod"], metadata={}) 225 assert len(metrics) == 1 226 assert "by_label" in metrics 227 228 229 @pytest.mark.asyncio 230 async def test_get_metrics_with_metadata_filter(data_storage, sqlite_engine, metadata_storage, test_user, test_project): 231 """Test getting metrics filtered by metadata.""" 232 await metadata_storage.add_project(test_project, test_user, org_id=None) 233 234 snapshot1 = create_single_value_snapshot(metric_id="metric1", metric_type="type1") 235 snapshot2 = create_by_label_value_snapshot(metric_id="metric2") 236 237 snapshot_id1 = await metadata_storage.add_snapshot(test_project.id, snapshot1) 238 snapshot_id2 = await metadata_storage.add_snapshot(test_project.id, snapshot2) 239 240 await data_storage.add_snapshot_points(test_project.id, snapshot_id1, snapshot1) 241 await data_storage.add_snapshot_points(test_project.id, snapshot_id2, snapshot2) 242 243 # Filter by metadata 244 metrics = await data_storage.get_metrics(test_project.id, tags=[], metadata={"env": "test"}) 245 assert len(metrics) == 1 246 assert "by_label" in metrics 247 248 249 @pytest.mark.asyncio 250 async def test_get_metric_labels(data_storage, sqlite_engine, metadata_storage, test_user, test_project): 251 """Test getting metric labels.""" 252 await metadata_storage.add_project(test_project, test_user, org_id=None) 253 254 snapshot = create_by_label_value_snapshot(metric_id="metric1") 255 snapshot_id = await metadata_storage.add_snapshot(test_project.id, snapshot) 256 await data_storage.add_snapshot_points(test_project.id, snapshot_id, snapshot) 257 258 labels = await data_storage.get_metric_labels(test_project.id, tags=[], metadata={}, metric="by_label") 259 assert "label" in labels 260 assert "column" in labels 261 262 263 @pytest.mark.asyncio 264 async def test_get_metric_label_values(data_storage, sqlite_engine, metadata_storage, test_user, test_project): 265 """Test getting metric label values.""" 266 await metadata_storage.add_project(test_project, test_user, org_id=None) 267 268 snapshot = create_by_label_value_snapshot(metric_id="metric1") 269 snapshot_id = await metadata_storage.add_snapshot(test_project.id, snapshot) 270 await data_storage.add_snapshot_points(test_project.id, snapshot_id, snapshot) 271 272 values = await data_storage.get_metric_label_values( 273 test_project.id, tags=[], metadata={}, metric="by_label", label="label" 274 ) 275 assert "label1" in values 276 assert "label2" in values 277 278 279 @pytest.mark.asyncio 280 async def test_get_data_series(data_storage, sqlite_engine, metadata_storage, test_user, test_project): 281 """Test getting data series.""" 282 await metadata_storage.add_project(test_project, test_user, org_id=None) 283 284 snapshot1 = create_single_value_snapshot(metric_id="metric1", metric_type="type1", value=10.0) 285 snapshot2 = create_single_value_snapshot(metric_id="metric1", metric_type="type1", value=20.0) 286 287 snapshot_id1 = await metadata_storage.add_snapshot(test_project.id, snapshot1) 288 snapshot_id2 = await metadata_storage.add_snapshot(test_project.id, snapshot2) 289 290 await data_storage.add_snapshot_points(test_project.id, snapshot_id1, snapshot1) 291 await data_storage.add_snapshot_points(test_project.id, snapshot_id2, snapshot2) 292 293 series_filter = [SeriesFilter(tags=[], metadata={}, metric="type1", metric_labels={})] 294 295 response = await data_storage.get_data_series(test_project.id, series_filter, None, None) 296 assert len(response.series) == 1 297 assert len(response.series[0].values) == 2 298 assert response.series[0].values == [10.0, 20.0] 299 assert len(response.sources) == 2 300 301 302 @pytest.mark.asyncio 303 async def test_get_data_series_with_time_filter(data_storage, sqlite_engine, metadata_storage, test_user, test_project): 304 """Test getting data series with time filter.""" 305 await metadata_storage.add_project(test_project, test_user, org_id=None) 306 307 snapshot1 = create_single_value_snapshot(metric_id="metric1", metric_type="type1", value=10.0) 308 snapshot1.timestamp = datetime.datetime(2023, 1, 1, 0, 0, 0) 309 snapshot2 = create_single_value_snapshot(metric_id="metric1", metric_type="type1", value=20.0) 310 snapshot2.timestamp = datetime.datetime(2023, 1, 3, 0, 0, 0) 311 312 snapshot_id1 = await metadata_storage.add_snapshot(test_project.id, snapshot1) 313 snapshot_id2 = await metadata_storage.add_snapshot(test_project.id, snapshot2) 314 315 await data_storage.add_snapshot_points(test_project.id, snapshot_id1, snapshot1) 316 await data_storage.add_snapshot_points(test_project.id, snapshot_id2, snapshot2) 317 318 series_filter = [SeriesFilter(tags=[], metadata={}, metric="type1", metric_labels={})] 319 320 # Filter by start time 321 response = await data_storage.get_data_series( 322 test_project.id, series_filter, datetime.datetime(2023, 1, 2, 0, 0, 0), None 323 ) 324 assert len(response.series) == 1 325 assert len(response.series[0].values) == 1 326 assert response.series[0].values == [20.0]