/ tests / future / test_ui / test_sql_data_storage.py
test_sql_data_storage.py
  1  import datetime
  2  
  3  import pytest
  4  from sqlalchemy import select
  5  from sqlalchemy.orm import Session
  6  
  7  from evidently.core.metric_types import ByLabelValue
  8  from evidently.core.metric_types import CountValue
  9  from evidently.core.metric_types import MetricConfig
 10  from evidently.core.metric_types import SingleValue
 11  from evidently.core.serialization import ReportModel
 12  from evidently.core.serialization import SnapshotModel
 13  from evidently.legacy.core import new_id
 14  from evidently.ui.service.base import Project
 15  from evidently.ui.service.base import SeriesFilter
 16  from evidently.ui.service.storage.sql.data import SQLDataStorage
 17  from evidently.ui.service.storage.sql.metadata import SQLProjectMetadataStorage
 18  from evidently.ui.service.storage.sql.models import PointSQLModel
 19  
 20  
 21  @pytest.fixture
 22  def data_storage(sqlite_engine):
 23      """Create SQL data storage instance."""
 24      return SQLDataStorage(sqlite_engine)
 25  
 26  
 27  @pytest.fixture
 28  def metadata_storage(sqlite_engine):
 29      """Create SQL metadata storage instance."""
 30      return SQLProjectMetadataStorage(sqlite_engine)
 31  
 32  
 33  @pytest.fixture
 34  def test_snapshot_id():
 35      """Create a test snapshot ID."""
 36      return new_id()
 37  
 38  
 39  def create_single_value_snapshot(
 40      metric_id: str = "test-metric", value: float = 42.0, metric_type: str = "test_type"
 41  ) -> SnapshotModel:
 42      """Create a snapshot with a single value metric."""
 43      metric_result = SingleValue(value=value, display_name="Test Metric")
 44      metric_result.set_metric_location(
 45          MetricConfig(metric_id=metric_id, params={"type": metric_type, "column": "test_column"})
 46      )
 47  
 48      return SnapshotModel(
 49          report=ReportModel(items=[]),
 50          name="Test Snapshot",
 51          timestamp=datetime.datetime(2023, 1, 1, 0, 0, 0),
 52          metadata={},
 53          tags=[],
 54          metric_results={metric_id: metric_result},
 55          top_level_metrics=[],
 56          widgets=[],
 57          tests_widgets=[],
 58      )
 59  
 60  
 61  def create_by_label_value_snapshot(metric_id: str = "test-metric") -> SnapshotModel:
 62      """Create a snapshot with a by-label value metric."""
 63      metric_result = ByLabelValue(
 64          display_name="Test By Label Metric",
 65          values={
 66              "label1": SingleValue(value=10.0, display_name="Label 1"),
 67              "label2": SingleValue(value=20.0, display_name="Label 2"),
 68          },
 69      )
 70      for label, val in metric_result.values.items():
 71          val.set_metric_location(
 72              MetricConfig(metric_id=metric_id, params={"type": "by_label", "column": "test_column", "label": label})
 73          )
 74      metric_result.set_metric_location(
 75          MetricConfig(metric_id=metric_id, params={"type": "by_label", "column": "test_column"})
 76      )
 77  
 78      return SnapshotModel(
 79          report=ReportModel(items=[]),
 80          name="Test Snapshot",
 81          timestamp=datetime.datetime(2023, 1, 1, 0, 0, 0),
 82          metadata={"env": "test"},
 83          tags=["prod"],
 84          metric_results={metric_id: metric_result},
 85          top_level_metrics=[],
 86          widgets=[],
 87          tests_widgets=[],
 88      )
 89  
 90  
 91  def create_count_value_snapshot(metric_id: str = "test-metric") -> SnapshotModel:
 92      """Create a snapshot with a count value metric."""
 93      count = SingleValue(value=100.0, display_name="Count")
 94      share = SingleValue(value=0.5, display_name="Share")
 95      count.set_metric_location(MetricConfig(metric_id=metric_id, params={"type": "count", "column": "test_column"}))
 96      share.set_metric_location(MetricConfig(metric_id=metric_id, params={"type": "count", "column": "test_column"}))
 97  
 98      metric_result = CountValue(display_name="Test Count Metric", count=count, share=share)
 99      metric_result.set_metric_location(
100          MetricConfig(metric_id=metric_id, params={"type": "count", "column": "test_column"})
101      )
102  
103      return SnapshotModel(
104          report=ReportModel(items=[]),
105          name="Test Snapshot",
106          timestamp=datetime.datetime(2023, 1, 2, 0, 0, 0),
107          metadata={},
108          tags=[],
109          metric_results={metric_id: metric_result},
110          top_level_metrics=[],
111          widgets=[],
112          tests_widgets=[],
113      )
114  
115  
116  @pytest.mark.asyncio
117  async def test_add_snapshot_points_single_value(
118      data_storage, sqlite_engine, metadata_storage, test_user, test_project_id
119  ):
120      """Test adding snapshot points with single value metric."""
121      # Create project and snapshot first (required for foreign key constraints)
122      project = Project(id=test_project_id, name="Test Project")
123      await metadata_storage.add_project(project, test_user, org_id=None)
124  
125      snapshot = create_single_value_snapshot()
126      snapshot_id = await metadata_storage.add_snapshot(test_project_id, snapshot)
127  
128      await data_storage.add_snapshot_points(test_project_id, snapshot_id, snapshot)
129  
130      # Verify point was stored
131      with Session(sqlite_engine) as session:
132          points = (
133              session.execute(select(PointSQLModel).where(PointSQLModel.project_id == test_project_id)).scalars().all()
134          )
135          assert len(points) == 1
136          assert points[0].value == 42.0
137          assert points[0].metric_type == "test_type"
138          assert points[0].snapshot_id == snapshot_id
139  
140  
141  @pytest.mark.asyncio
142  async def test_add_snapshot_points_by_label_value(
143      data_storage, sqlite_engine, metadata_storage, test_user, test_project_id
144  ):
145      """Test adding snapshot points with by-label value metric."""
146      # Create project and snapshot first (required for foreign key constraints)
147      project = Project(id=test_project_id, name="Test Project")
148      await metadata_storage.add_project(project, test_user, org_id=None)
149  
150      snapshot = create_by_label_value_snapshot()
151      snapshot_id = await metadata_storage.add_snapshot(test_project_id, snapshot)
152  
153      await data_storage.add_snapshot_points(test_project_id, snapshot_id, snapshot)
154  
155      # Verify points were stored
156      with Session(sqlite_engine) as session:
157          points = (
158              session.execute(select(PointSQLModel).where(PointSQLModel.project_id == test_project_id)).scalars().all()
159          )
160          assert len(points) == 2
161          values = {p.value for p in points}
162          assert values == {10.0, 20.0}
163  
164  
165  @pytest.mark.asyncio
166  async def test_add_snapshot_points_count_value(
167      data_storage, sqlite_engine, metadata_storage, test_user, test_project_id
168  ):
169      """Test adding snapshot points with count value metric."""
170      # Create project and snapshot first (required for foreign key constraints)
171      project = Project(id=test_project_id, name="Test Project")
172      await metadata_storage.add_project(project, test_user, org_id=None)
173  
174      snapshot = create_count_value_snapshot()
175      snapshot_id = await metadata_storage.add_snapshot(test_project_id, snapshot)
176  
177      await data_storage.add_snapshot_points(test_project_id, snapshot_id, snapshot)
178  
179      # Verify points were stored (count and share)
180      with Session(sqlite_engine) as session:
181          points = (
182              session.execute(select(PointSQLModel).where(PointSQLModel.project_id == test_project_id)).scalars().all()
183          )
184          assert len(points) == 2
185          values = {p.value for p in points}
186          assert values == {100.0, 0.5}
187  
188  
189  @pytest.mark.asyncio
190  async def test_get_metrics(data_storage, sqlite_engine, metadata_storage, test_user, test_project):
191      """Test getting available metrics."""
192      await metadata_storage.add_project(test_project, test_user, org_id=None)
193  
194      snapshot1 = create_single_value_snapshot(metric_id="metric1", metric_type="type1")
195      snapshot2 = create_single_value_snapshot(metric_id="metric2", metric_type="type2")
196  
197      snapshot_id1 = await metadata_storage.add_snapshot(test_project.id, snapshot1)
198      snapshot_id2 = await metadata_storage.add_snapshot(test_project.id, snapshot2)
199  
200      await data_storage.add_snapshot_points(test_project.id, snapshot_id1, snapshot1)
201      await data_storage.add_snapshot_points(test_project.id, snapshot_id2, snapshot2)
202  
203      metrics = await data_storage.get_metrics(test_project.id, tags=[], metadata={})
204      assert len(metrics) == 2
205      assert "type1" in metrics
206      assert "type2" in metrics
207  
208  
209  @pytest.mark.asyncio
210  async def test_get_metrics_with_tags_filter(data_storage, sqlite_engine, metadata_storage, test_user, test_project):
211      """Test getting metrics filtered by tags."""
212      await metadata_storage.add_project(test_project, test_user, org_id=None)
213  
214      snapshot1 = create_single_value_snapshot(metric_id="metric1", metric_type="type1")
215      snapshot2 = create_by_label_value_snapshot(metric_id="metric2")
216  
217      snapshot_id1 = await metadata_storage.add_snapshot(test_project.id, snapshot1)
218      snapshot_id2 = await metadata_storage.add_snapshot(test_project.id, snapshot2)
219  
220      await data_storage.add_snapshot_points(test_project.id, snapshot_id1, snapshot1)
221      await data_storage.add_snapshot_points(test_project.id, snapshot_id2, snapshot2)
222  
223      # Filter by tags
224      metrics = await data_storage.get_metrics(test_project.id, tags=["prod"], metadata={})
225      assert len(metrics) == 1
226      assert "by_label" in metrics
227  
228  
229  @pytest.mark.asyncio
230  async def test_get_metrics_with_metadata_filter(data_storage, sqlite_engine, metadata_storage, test_user, test_project):
231      """Test getting metrics filtered by metadata."""
232      await metadata_storage.add_project(test_project, test_user, org_id=None)
233  
234      snapshot1 = create_single_value_snapshot(metric_id="metric1", metric_type="type1")
235      snapshot2 = create_by_label_value_snapshot(metric_id="metric2")
236  
237      snapshot_id1 = await metadata_storage.add_snapshot(test_project.id, snapshot1)
238      snapshot_id2 = await metadata_storage.add_snapshot(test_project.id, snapshot2)
239  
240      await data_storage.add_snapshot_points(test_project.id, snapshot_id1, snapshot1)
241      await data_storage.add_snapshot_points(test_project.id, snapshot_id2, snapshot2)
242  
243      # Filter by metadata
244      metrics = await data_storage.get_metrics(test_project.id, tags=[], metadata={"env": "test"})
245      assert len(metrics) == 1
246      assert "by_label" in metrics
247  
248  
249  @pytest.mark.asyncio
250  async def test_get_metric_labels(data_storage, sqlite_engine, metadata_storage, test_user, test_project):
251      """Test getting metric labels."""
252      await metadata_storage.add_project(test_project, test_user, org_id=None)
253  
254      snapshot = create_by_label_value_snapshot(metric_id="metric1")
255      snapshot_id = await metadata_storage.add_snapshot(test_project.id, snapshot)
256      await data_storage.add_snapshot_points(test_project.id, snapshot_id, snapshot)
257  
258      labels = await data_storage.get_metric_labels(test_project.id, tags=[], metadata={}, metric="by_label")
259      assert "label" in labels
260      assert "column" in labels
261  
262  
263  @pytest.mark.asyncio
264  async def test_get_metric_label_values(data_storage, sqlite_engine, metadata_storage, test_user, test_project):
265      """Test getting metric label values."""
266      await metadata_storage.add_project(test_project, test_user, org_id=None)
267  
268      snapshot = create_by_label_value_snapshot(metric_id="metric1")
269      snapshot_id = await metadata_storage.add_snapshot(test_project.id, snapshot)
270      await data_storage.add_snapshot_points(test_project.id, snapshot_id, snapshot)
271  
272      values = await data_storage.get_metric_label_values(
273          test_project.id, tags=[], metadata={}, metric="by_label", label="label"
274      )
275      assert "label1" in values
276      assert "label2" in values
277  
278  
279  @pytest.mark.asyncio
280  async def test_get_data_series(data_storage, sqlite_engine, metadata_storage, test_user, test_project):
281      """Test getting data series."""
282      await metadata_storage.add_project(test_project, test_user, org_id=None)
283  
284      snapshot1 = create_single_value_snapshot(metric_id="metric1", metric_type="type1", value=10.0)
285      snapshot2 = create_single_value_snapshot(metric_id="metric1", metric_type="type1", value=20.0)
286  
287      snapshot_id1 = await metadata_storage.add_snapshot(test_project.id, snapshot1)
288      snapshot_id2 = await metadata_storage.add_snapshot(test_project.id, snapshot2)
289  
290      await data_storage.add_snapshot_points(test_project.id, snapshot_id1, snapshot1)
291      await data_storage.add_snapshot_points(test_project.id, snapshot_id2, snapshot2)
292  
293      series_filter = [SeriesFilter(tags=[], metadata={}, metric="type1", metric_labels={})]
294  
295      response = await data_storage.get_data_series(test_project.id, series_filter, None, None)
296      assert len(response.series) == 1
297      assert len(response.series[0].values) == 2
298      assert response.series[0].values == [10.0, 20.0]
299      assert len(response.sources) == 2
300  
301  
302  @pytest.mark.asyncio
303  async def test_get_data_series_with_time_filter(data_storage, sqlite_engine, metadata_storage, test_user, test_project):
304      """Test getting data series with time filter."""
305      await metadata_storage.add_project(test_project, test_user, org_id=None)
306  
307      snapshot1 = create_single_value_snapshot(metric_id="metric1", metric_type="type1", value=10.0)
308      snapshot1.timestamp = datetime.datetime(2023, 1, 1, 0, 0, 0)
309      snapshot2 = create_single_value_snapshot(metric_id="metric1", metric_type="type1", value=20.0)
310      snapshot2.timestamp = datetime.datetime(2023, 1, 3, 0, 0, 0)
311  
312      snapshot_id1 = await metadata_storage.add_snapshot(test_project.id, snapshot1)
313      snapshot_id2 = await metadata_storage.add_snapshot(test_project.id, snapshot2)
314  
315      await data_storage.add_snapshot_points(test_project.id, snapshot_id1, snapshot1)
316      await data_storage.add_snapshot_points(test_project.id, snapshot_id2, snapshot2)
317  
318      series_filter = [SeriesFilter(tags=[], metadata={}, metric="type1", metric_labels={})]
319  
320      # Filter by start time
321      response = await data_storage.get_data_series(
322          test_project.id, series_filter, datetime.datetime(2023, 1, 2, 0, 0, 0), None
323      )
324      assert len(response.series) == 1
325      assert len(response.series[0].values) == 1
326      assert response.series[0].values == [20.0]