/ src / evidently / legacy / ui / base.py
base.py
  1  import contextlib
  2  import datetime
  3  import json
  4  from abc import ABC
  5  from abc import abstractmethod
  6  from typing import IO
  7  from typing import TYPE_CHECKING
  8  from typing import Any
  9  from typing import ClassVar
 10  from typing import Dict
 11  from typing import Iterator
 12  from typing import List
 13  from typing import Optional
 14  from typing import Set
 15  from typing import Type
 16  from typing import Union
 17  
 18  import uuid6
 19  
 20  from evidently._pydantic_compat import BaseModel
 21  from evidently._pydantic_compat import Field
 22  from evidently._pydantic_compat import PrivateAttr
 23  from evidently._pydantic_compat import parse_obj_as
 24  from evidently.core.report import Snapshot as SnapshotV2
 25  from evidently.legacy.core import new_id
 26  from evidently.legacy.model.dashboard import DashboardInfo
 27  from evidently.legacy.suite.base_suite import MetadataValueType
 28  from evidently.legacy.suite.base_suite import ReportBase
 29  from evidently.legacy.suite.base_suite import Snapshot
 30  from evidently.legacy.suite.base_suite import SnapshotLinks
 31  from evidently.legacy.ui.dashboards.base import DashboardConfig
 32  from evidently.legacy.ui.dashboards.base import PanelValue
 33  from evidently.legacy.ui.dashboards.base import ReportFilter
 34  from evidently.legacy.ui.dashboards.test_suites import TestFilter
 35  from evidently.legacy.ui.type_aliases import BlobID
 36  from evidently.legacy.ui.type_aliases import DataPoints
 37  from evidently.legacy.ui.type_aliases import DataPointsAsType
 38  from evidently.legacy.ui.type_aliases import OrgID
 39  from evidently.legacy.ui.type_aliases import PointType
 40  from evidently.legacy.ui.type_aliases import ProjectID
 41  from evidently.legacy.ui.type_aliases import SnapshotID
 42  from evidently.legacy.ui.type_aliases import TeamID
 43  from evidently.legacy.ui.type_aliases import TestResultPoints
 44  from evidently.legacy.ui.type_aliases import UserID
 45  from evidently.legacy.utils import NumpyEncoder
 46  from evidently.legacy.utils.dashboard import TemplateParams
 47  from evidently.legacy.utils.dashboard import inline_iframe_html_template
 48  from evidently.legacy.utils.sync import sync_api
 49  from evidently.ui.service.base import Entity
 50  from evidently.ui.service.base import EntityType
 51  from evidently.ui.service.base import Org
 52  from evidently.ui.service.base import Team
 53  from evidently.ui.service.base import User
 54  from evidently.ui.service.base import _default_dashboard
 55  
 56  if TYPE_CHECKING:
 57      from evidently.legacy.ui.managers.projects import ProjectManager
 58  
 59  AnySnapshot = Union[Snapshot, SnapshotV2]
 60  
 61  
 62  class BlobMetadata(BaseModel):
 63      id: BlobID
 64      size: Optional[int]
 65  
 66  
 67  class SnapshotMetadata(BaseModel):
 68      id: SnapshotID
 69      name: Optional[str] = None
 70      timestamp: datetime.datetime
 71      metadata: Dict[str, MetadataValueType]
 72      tags: List[str]
 73      is_report: bool
 74      blob: "BlobMetadata"
 75      links: SnapshotLinks = SnapshotLinks()  # links to datasets and stuff
 76  
 77      _project: "Project" = PrivateAttr(None)
 78      _dashboard_info: "DashboardInfo" = PrivateAttr(None)
 79      _additional_graphs: Dict[str, dict] = PrivateAttr(None)
 80  
 81      @property
 82      def project(self):
 83          return self._project
 84  
 85      async def load(self) -> Snapshot:
 86          return await self.project.project_manager.load_snapshot(self.project._user_id, self.project.id, self.id)
 87  
 88      async def as_report_base(self) -> ReportBase:
 89          value = await self.load()
 90          return value.as_report() if value.is_report else value.as_test_suite()
 91  
 92      def bind(self, project: "Project"):
 93          self._project = project
 94          return self
 95  
 96      @classmethod
 97      def from_snapshot(cls, snapshot: Snapshot, blob: "BlobMetadata") -> "SnapshotMetadata":
 98          return SnapshotMetadata(
 99              id=snapshot.id,
100              name=snapshot.name,
101              timestamp=snapshot.timestamp,
102              metadata=snapshot.metadata,
103              tags=snapshot.tags,
104              is_report=snapshot.is_report,
105              blob=blob,
106          )
107  
108      async def get_dashboard_info(self):
109          if self._dashboard_info is None:
110              report = await self.as_report_base()
111              _, self._dashboard_info, self._additional_graphs = report._build_dashboard_info()
112          return self._dashboard_info
113  
114      async def get_additional_graphs(self):
115          if self._additional_graphs is None:
116              report = await self.as_report_base()
117              _, self._dashboard_info, self._additional_graphs = report._build_dashboard_info()
118          return self._additional_graphs
119  
120  
121  class Project(Entity):
122      entity_type: ClassVar[EntityType] = EntityType.Project
123  
124      class Config:
125          underscore_attrs_are_private = True
126  
127      id: ProjectID = Field(default_factory=new_id)
128      name: str
129      description: Optional[str] = None
130      dashboard: "DashboardConfig" = Field(default_factory=_default_dashboard)
131  
132      team_id: Optional[TeamID] = None
133      org_id: Optional[OrgID] = None
134  
135      date_from: Optional[datetime.datetime] = None
136      date_to: Optional[datetime.datetime] = None
137      created_at: Optional[datetime.datetime] = Field(default=None)
138      version: str = "1"
139      # Field(default=datetime.datetime.fromisoformat("1900-01-01T00:00:00"))
140  
141      _project_manager: Optional["ProjectManager"] = PrivateAttr(default=None)
142      _user_id: Optional[UserID] = PrivateAttr(default=None)
143  
144      def bind(self, project_manager: Optional["ProjectManager"], user_id: Optional[UserID]):
145          self._project_manager = project_manager
146          self._user_id = user_id
147          return self
148  
149      @property
150      def project_manager(self) -> "ProjectManager":
151          if self._project_manager is None:
152              raise ValueError("Project is not binded")
153          return self._project_manager
154  
155      async def save_async(self):
156          await self.project_manager.update_project(self._user_id, self)  # type: ignore[arg-type]
157          return self
158  
159      async def load_snapshot_async(self, snapshot_id: SnapshotID) -> Snapshot:
160          return await self.project_manager.load_snapshot(self._user_id, self.id, snapshot_id)  # type: ignore[arg-type]
161  
162      async def add_snapshot_async(self, snapshot: AnySnapshot):
163          if not isinstance(snapshot, Snapshot):
164              from evidently.ui.backport import snapshot_v2_to_v1
165  
166              snapshot = snapshot_v2_to_v1(snapshot)
167          await self.project_manager.add_snapshot(self._user_id, self.id, snapshot)  # type: ignore[arg-type]
168  
169      async def delete_snapshot_async(self, snapshot_id: Union[str, SnapshotID]):
170          if isinstance(snapshot_id, str):
171              snapshot_id = uuid6.UUID(snapshot_id)
172          await self.project_manager.delete_snapshot(self._user_id, self.id, snapshot_id)  # type: ignore[arg-type]
173  
174      async def list_snapshots_async(
175          self, include_reports: bool = True, include_test_suites: bool = True
176      ) -> List[SnapshotMetadata]:
177          return await self.project_manager.list_snapshots(self._user_id, self.id, include_reports, include_test_suites)  # type: ignore[arg-type]
178  
179      async def get_snapshot_metadata_async(self, id: SnapshotID) -> SnapshotMetadata:
180          return await self.project_manager.get_snapshot_metadata(self._user_id, self.id, id)  # type: ignore[arg-type]
181  
182      async def build_dashboard_info_async(
183          self,
184          timestamp_start: Optional[datetime.datetime],
185          timestamp_end: Optional[datetime.datetime],
186      ) -> DashboardInfo:
187          return await self.dashboard.build(self.project_manager.data_storage, self.id, timestamp_start, timestamp_end)
188  
189      async def show_dashboard_async(
190          self,
191          timestamp_start: Optional[datetime.datetime] = None,
192          timestamp_end: Optional[datetime.datetime] = None,
193      ):
194          dashboard_info = await self.build_dashboard_info_async(timestamp_start, timestamp_end)
195          template_params = TemplateParams(
196              dashboard_id="pd_" + str(new_id()).replace("-", ""),
197              dashboard_info=dashboard_info,
198              additional_graphs={},
199          )
200          # pylint: disable=import-outside-toplevel
201          try:
202              from IPython.display import HTML
203  
204              return HTML(inline_iframe_html_template(params=template_params))
205          except ImportError as err:
206              raise Exception("Cannot import HTML from IPython.display, no way to show html") from err
207  
208      async def reload_async(self, reload_snapshots: bool = False):
209          # fixme: reload snapshots
210          project = await self.project_manager.get_project(self._user_id, self.id)  # type: ignore[arg-type]
211          self.__dict__.update(project.__dict__)
212  
213          if reload_snapshots:
214              await self.project_manager.reload_snapshots(self._user_id, self.id)  # type: ignore[arg-type]
215  
216      save = sync_api(save_async)
217      load_snapshot = sync_api(load_snapshot_async)
218      delete_snapshot = sync_api(delete_snapshot_async)
219      list_snapshots = sync_api(list_snapshots_async)
220      show_dashboard = sync_api(show_dashboard_async)
221      build_dashboard_info = sync_api(build_dashboard_info_async)
222      get_snapshot_metadata = sync_api(get_snapshot_metadata_async)
223      add_snapshot = sync_api(add_snapshot_async)
224      reload = sync_api(reload_async)
225  
226  
227  class ProjectMetadataStorage(ABC):
228      @abstractmethod
229      async def add_project(
230          self, project: Project, user: User, team: Optional[Team], org_id: Optional[OrgID] = None
231      ) -> Project:
232          raise NotImplementedError
233  
234      @abstractmethod
235      async def get_project(self, project_id: ProjectID) -> Optional[Project]:
236          raise NotImplementedError
237  
238      @abstractmethod
239      async def delete_project(self, project_id: ProjectID):
240          raise NotImplementedError
241  
242      @abstractmethod
243      async def list_projects(self, project_ids: Optional[Set[ProjectID]]) -> List[Project]:
244          raise NotImplementedError
245  
246      @abstractmethod
247      async def add_snapshot(self, project_id: ProjectID, snapshot: Snapshot, blob: "BlobMetadata"):
248          raise NotImplementedError
249  
250      @abstractmethod
251      async def delete_snapshot(self, project_id: ProjectID, snapshot_id: SnapshotID):
252          raise NotImplementedError
253  
254      @abstractmethod
255      async def search_project(self, project_name: str, project_ids: Optional[Set[ProjectID]]) -> List[Project]:
256          raise NotImplementedError
257  
258      @abstractmethod
259      async def list_snapshots(
260          self,
261          project_id: ProjectID,
262          include_reports: bool = True,
263          include_test_suites: bool = True,
264      ) -> List[SnapshotMetadata]:
265          raise NotImplementedError
266  
267      @abstractmethod
268      async def get_snapshot_metadata(self, project_id: ProjectID, snapshot_id: SnapshotID) -> SnapshotMetadata:
269          raise NotImplementedError
270  
271      @abstractmethod
272      async def update_project(self, project: Project) -> Project:
273          raise NotImplementedError
274  
275      @abstractmethod
276      async def reload_snapshots(self, project_id: ProjectID):
277          raise NotImplementedError
278  
279  
280  class BlobStorage(ABC):
281      @abstractmethod
282      @contextlib.contextmanager
283      def open_blob(self, id: BlobID) -> Iterator[IO]:
284          raise NotImplementedError
285  
286      @abstractmethod
287      async def put_blob(self, blob_id: str, obj):
288          raise NotImplementedError
289  
290      def get_snapshot_blob_id(self, project_id: ProjectID, snapshot: Snapshot) -> BlobID:
291          raise NotImplementedError
292  
293      async def put_snapshot(self, project_id: ProjectID, snapshot: Snapshot) -> BlobMetadata:
294          id = self.get_snapshot_blob_id(project_id, snapshot)
295          await self.put_blob(id, json.dumps(snapshot.dict(), cls=NumpyEncoder))
296          return await self.get_blob_metadata(id)
297  
298      async def get_blob_metadata(self, blob_id: BlobID) -> BlobMetadata:
299          raise NotImplementedError
300  
301  
302  class DataStorage(ABC):
303      @abstractmethod
304      async def extract_points(self, project_id: ProjectID, snapshot: Snapshot):
305          raise NotImplementedError
306  
307      async def load_points(
308          self,
309          project_id: ProjectID,
310          filter: "ReportFilter",
311          values: List["PanelValue"],
312          timestamp_start: Optional[datetime.datetime],
313          timestamp_end: Optional[datetime.datetime],
314      ) -> DataPoints:
315          return await self.load_points_as_type(float, project_id, filter, values, timestamp_start, timestamp_end)
316  
317      @staticmethod
318      def parse_value(cls: Type[PointType], value: Any) -> PointType:
319          if isinstance(value, cls):
320              return value
321          if isinstance(value, str):
322              value = json.loads(value)
323          return parse_obj_as(cls, value)
324  
325      @abstractmethod
326      async def load_test_results(
327          self,
328          project_id: ProjectID,
329          filter: "ReportFilter",
330          test_filters: List["TestFilter"],
331          time_agg: Optional[str],
332          timestamp_start: Optional[datetime.datetime],
333          timestamp_end: Optional[datetime.datetime],
334      ) -> TestResultPoints:
335          raise NotImplementedError
336  
337      @abstractmethod
338      async def load_points_as_type(
339          self,
340          cls: Type[PointType],
341          project_id: ProjectID,
342          filter: "ReportFilter",
343          values: List["PanelValue"],
344          timestamp_start: Optional[datetime.datetime],
345          timestamp_end: Optional[datetime.datetime],
346      ) -> DataPointsAsType[PointType]:
347          raise NotImplementedError
348  
349  
350  __all__ = [
351      "EntityType",
352      "Entity",
353      "Org",
354      "Project",
355      "Team",
356      "User",
357      "DataStorage",
358      "ProjectMetadataStorage",
359      "Snapshot",
360      "BlobMetadata",
361      "AnySnapshot",
362  ]