base.py
1 import contextlib 2 import datetime 3 import json 4 from abc import ABC 5 from abc import abstractmethod 6 from typing import IO 7 from typing import TYPE_CHECKING 8 from typing import Any 9 from typing import ClassVar 10 from typing import Dict 11 from typing import Iterator 12 from typing import List 13 from typing import Optional 14 from typing import Set 15 from typing import Type 16 from typing import Union 17 18 import uuid6 19 20 from evidently._pydantic_compat import BaseModel 21 from evidently._pydantic_compat import Field 22 from evidently._pydantic_compat import PrivateAttr 23 from evidently._pydantic_compat import parse_obj_as 24 from evidently.core.report import Snapshot as SnapshotV2 25 from evidently.legacy.core import new_id 26 from evidently.legacy.model.dashboard import DashboardInfo 27 from evidently.legacy.suite.base_suite import MetadataValueType 28 from evidently.legacy.suite.base_suite import ReportBase 29 from evidently.legacy.suite.base_suite import Snapshot 30 from evidently.legacy.suite.base_suite import SnapshotLinks 31 from evidently.legacy.ui.dashboards.base import DashboardConfig 32 from evidently.legacy.ui.dashboards.base import PanelValue 33 from evidently.legacy.ui.dashboards.base import ReportFilter 34 from evidently.legacy.ui.dashboards.test_suites import TestFilter 35 from evidently.legacy.ui.type_aliases import BlobID 36 from evidently.legacy.ui.type_aliases import DataPoints 37 from evidently.legacy.ui.type_aliases import DataPointsAsType 38 from evidently.legacy.ui.type_aliases import OrgID 39 from evidently.legacy.ui.type_aliases import PointType 40 from evidently.legacy.ui.type_aliases import ProjectID 41 from evidently.legacy.ui.type_aliases import SnapshotID 42 from evidently.legacy.ui.type_aliases import TeamID 43 from evidently.legacy.ui.type_aliases import TestResultPoints 44 from evidently.legacy.ui.type_aliases import UserID 45 from evidently.legacy.utils import NumpyEncoder 46 from evidently.legacy.utils.dashboard import TemplateParams 47 from evidently.legacy.utils.dashboard import inline_iframe_html_template 48 from evidently.legacy.utils.sync import sync_api 49 from evidently.ui.service.base import Entity 50 from evidently.ui.service.base import EntityType 51 from evidently.ui.service.base import Org 52 from evidently.ui.service.base import Team 53 from evidently.ui.service.base import User 54 from evidently.ui.service.base import _default_dashboard 55 56 if TYPE_CHECKING: 57 from evidently.legacy.ui.managers.projects import ProjectManager 58 59 AnySnapshot = Union[Snapshot, SnapshotV2] 60 61 62 class BlobMetadata(BaseModel): 63 id: BlobID 64 size: Optional[int] 65 66 67 class SnapshotMetadata(BaseModel): 68 id: SnapshotID 69 name: Optional[str] = None 70 timestamp: datetime.datetime 71 metadata: Dict[str, MetadataValueType] 72 tags: List[str] 73 is_report: bool 74 blob: "BlobMetadata" 75 links: SnapshotLinks = SnapshotLinks() # links to datasets and stuff 76 77 _project: "Project" = PrivateAttr(None) 78 _dashboard_info: "DashboardInfo" = PrivateAttr(None) 79 _additional_graphs: Dict[str, dict] = PrivateAttr(None) 80 81 @property 82 def project(self): 83 return self._project 84 85 async def load(self) -> Snapshot: 86 return await self.project.project_manager.load_snapshot(self.project._user_id, self.project.id, self.id) 87 88 async def as_report_base(self) -> ReportBase: 89 value = await self.load() 90 return value.as_report() if value.is_report else value.as_test_suite() 91 92 def bind(self, project: "Project"): 93 self._project = project 94 return self 95 96 @classmethod 97 def from_snapshot(cls, snapshot: Snapshot, blob: "BlobMetadata") -> "SnapshotMetadata": 98 return SnapshotMetadata( 99 id=snapshot.id, 100 name=snapshot.name, 101 timestamp=snapshot.timestamp, 102 metadata=snapshot.metadata, 103 tags=snapshot.tags, 104 is_report=snapshot.is_report, 105 blob=blob, 106 ) 107 108 async def get_dashboard_info(self): 109 if self._dashboard_info is None: 110 report = await self.as_report_base() 111 _, self._dashboard_info, self._additional_graphs = report._build_dashboard_info() 112 return self._dashboard_info 113 114 async def get_additional_graphs(self): 115 if self._additional_graphs is None: 116 report = await self.as_report_base() 117 _, self._dashboard_info, self._additional_graphs = report._build_dashboard_info() 118 return self._additional_graphs 119 120 121 class Project(Entity): 122 entity_type: ClassVar[EntityType] = EntityType.Project 123 124 class Config: 125 underscore_attrs_are_private = True 126 127 id: ProjectID = Field(default_factory=new_id) 128 name: str 129 description: Optional[str] = None 130 dashboard: "DashboardConfig" = Field(default_factory=_default_dashboard) 131 132 team_id: Optional[TeamID] = None 133 org_id: Optional[OrgID] = None 134 135 date_from: Optional[datetime.datetime] = None 136 date_to: Optional[datetime.datetime] = None 137 created_at: Optional[datetime.datetime] = Field(default=None) 138 version: str = "1" 139 # Field(default=datetime.datetime.fromisoformat("1900-01-01T00:00:00")) 140 141 _project_manager: Optional["ProjectManager"] = PrivateAttr(default=None) 142 _user_id: Optional[UserID] = PrivateAttr(default=None) 143 144 def bind(self, project_manager: Optional["ProjectManager"], user_id: Optional[UserID]): 145 self._project_manager = project_manager 146 self._user_id = user_id 147 return self 148 149 @property 150 def project_manager(self) -> "ProjectManager": 151 if self._project_manager is None: 152 raise ValueError("Project is not binded") 153 return self._project_manager 154 155 async def save_async(self): 156 await self.project_manager.update_project(self._user_id, self) # type: ignore[arg-type] 157 return self 158 159 async def load_snapshot_async(self, snapshot_id: SnapshotID) -> Snapshot: 160 return await self.project_manager.load_snapshot(self._user_id, self.id, snapshot_id) # type: ignore[arg-type] 161 162 async def add_snapshot_async(self, snapshot: AnySnapshot): 163 if not isinstance(snapshot, Snapshot): 164 from evidently.ui.backport import snapshot_v2_to_v1 165 166 snapshot = snapshot_v2_to_v1(snapshot) 167 await self.project_manager.add_snapshot(self._user_id, self.id, snapshot) # type: ignore[arg-type] 168 169 async def delete_snapshot_async(self, snapshot_id: Union[str, SnapshotID]): 170 if isinstance(snapshot_id, str): 171 snapshot_id = uuid6.UUID(snapshot_id) 172 await self.project_manager.delete_snapshot(self._user_id, self.id, snapshot_id) # type: ignore[arg-type] 173 174 async def list_snapshots_async( 175 self, include_reports: bool = True, include_test_suites: bool = True 176 ) -> List[SnapshotMetadata]: 177 return await self.project_manager.list_snapshots(self._user_id, self.id, include_reports, include_test_suites) # type: ignore[arg-type] 178 179 async def get_snapshot_metadata_async(self, id: SnapshotID) -> SnapshotMetadata: 180 return await self.project_manager.get_snapshot_metadata(self._user_id, self.id, id) # type: ignore[arg-type] 181 182 async def build_dashboard_info_async( 183 self, 184 timestamp_start: Optional[datetime.datetime], 185 timestamp_end: Optional[datetime.datetime], 186 ) -> DashboardInfo: 187 return await self.dashboard.build(self.project_manager.data_storage, self.id, timestamp_start, timestamp_end) 188 189 async def show_dashboard_async( 190 self, 191 timestamp_start: Optional[datetime.datetime] = None, 192 timestamp_end: Optional[datetime.datetime] = None, 193 ): 194 dashboard_info = await self.build_dashboard_info_async(timestamp_start, timestamp_end) 195 template_params = TemplateParams( 196 dashboard_id="pd_" + str(new_id()).replace("-", ""), 197 dashboard_info=dashboard_info, 198 additional_graphs={}, 199 ) 200 # pylint: disable=import-outside-toplevel 201 try: 202 from IPython.display import HTML 203 204 return HTML(inline_iframe_html_template(params=template_params)) 205 except ImportError as err: 206 raise Exception("Cannot import HTML from IPython.display, no way to show html") from err 207 208 async def reload_async(self, reload_snapshots: bool = False): 209 # fixme: reload snapshots 210 project = await self.project_manager.get_project(self._user_id, self.id) # type: ignore[arg-type] 211 self.__dict__.update(project.__dict__) 212 213 if reload_snapshots: 214 await self.project_manager.reload_snapshots(self._user_id, self.id) # type: ignore[arg-type] 215 216 save = sync_api(save_async) 217 load_snapshot = sync_api(load_snapshot_async) 218 delete_snapshot = sync_api(delete_snapshot_async) 219 list_snapshots = sync_api(list_snapshots_async) 220 show_dashboard = sync_api(show_dashboard_async) 221 build_dashboard_info = sync_api(build_dashboard_info_async) 222 get_snapshot_metadata = sync_api(get_snapshot_metadata_async) 223 add_snapshot = sync_api(add_snapshot_async) 224 reload = sync_api(reload_async) 225 226 227 class ProjectMetadataStorage(ABC): 228 @abstractmethod 229 async def add_project( 230 self, project: Project, user: User, team: Optional[Team], org_id: Optional[OrgID] = None 231 ) -> Project: 232 raise NotImplementedError 233 234 @abstractmethod 235 async def get_project(self, project_id: ProjectID) -> Optional[Project]: 236 raise NotImplementedError 237 238 @abstractmethod 239 async def delete_project(self, project_id: ProjectID): 240 raise NotImplementedError 241 242 @abstractmethod 243 async def list_projects(self, project_ids: Optional[Set[ProjectID]]) -> List[Project]: 244 raise NotImplementedError 245 246 @abstractmethod 247 async def add_snapshot(self, project_id: ProjectID, snapshot: Snapshot, blob: "BlobMetadata"): 248 raise NotImplementedError 249 250 @abstractmethod 251 async def delete_snapshot(self, project_id: ProjectID, snapshot_id: SnapshotID): 252 raise NotImplementedError 253 254 @abstractmethod 255 async def search_project(self, project_name: str, project_ids: Optional[Set[ProjectID]]) -> List[Project]: 256 raise NotImplementedError 257 258 @abstractmethod 259 async def list_snapshots( 260 self, 261 project_id: ProjectID, 262 include_reports: bool = True, 263 include_test_suites: bool = True, 264 ) -> List[SnapshotMetadata]: 265 raise NotImplementedError 266 267 @abstractmethod 268 async def get_snapshot_metadata(self, project_id: ProjectID, snapshot_id: SnapshotID) -> SnapshotMetadata: 269 raise NotImplementedError 270 271 @abstractmethod 272 async def update_project(self, project: Project) -> Project: 273 raise NotImplementedError 274 275 @abstractmethod 276 async def reload_snapshots(self, project_id: ProjectID): 277 raise NotImplementedError 278 279 280 class BlobStorage(ABC): 281 @abstractmethod 282 @contextlib.contextmanager 283 def open_blob(self, id: BlobID) -> Iterator[IO]: 284 raise NotImplementedError 285 286 @abstractmethod 287 async def put_blob(self, blob_id: str, obj): 288 raise NotImplementedError 289 290 def get_snapshot_blob_id(self, project_id: ProjectID, snapshot: Snapshot) -> BlobID: 291 raise NotImplementedError 292 293 async def put_snapshot(self, project_id: ProjectID, snapshot: Snapshot) -> BlobMetadata: 294 id = self.get_snapshot_blob_id(project_id, snapshot) 295 await self.put_blob(id, json.dumps(snapshot.dict(), cls=NumpyEncoder)) 296 return await self.get_blob_metadata(id) 297 298 async def get_blob_metadata(self, blob_id: BlobID) -> BlobMetadata: 299 raise NotImplementedError 300 301 302 class DataStorage(ABC): 303 @abstractmethod 304 async def extract_points(self, project_id: ProjectID, snapshot: Snapshot): 305 raise NotImplementedError 306 307 async def load_points( 308 self, 309 project_id: ProjectID, 310 filter: "ReportFilter", 311 values: List["PanelValue"], 312 timestamp_start: Optional[datetime.datetime], 313 timestamp_end: Optional[datetime.datetime], 314 ) -> DataPoints: 315 return await self.load_points_as_type(float, project_id, filter, values, timestamp_start, timestamp_end) 316 317 @staticmethod 318 def parse_value(cls: Type[PointType], value: Any) -> PointType: 319 if isinstance(value, cls): 320 return value 321 if isinstance(value, str): 322 value = json.loads(value) 323 return parse_obj_as(cls, value) 324 325 @abstractmethod 326 async def load_test_results( 327 self, 328 project_id: ProjectID, 329 filter: "ReportFilter", 330 test_filters: List["TestFilter"], 331 time_agg: Optional[str], 332 timestamp_start: Optional[datetime.datetime], 333 timestamp_end: Optional[datetime.datetime], 334 ) -> TestResultPoints: 335 raise NotImplementedError 336 337 @abstractmethod 338 async def load_points_as_type( 339 self, 340 cls: Type[PointType], 341 project_id: ProjectID, 342 filter: "ReportFilter", 343 values: List["PanelValue"], 344 timestamp_start: Optional[datetime.datetime], 345 timestamp_end: Optional[datetime.datetime], 346 ) -> DataPointsAsType[PointType]: 347 raise NotImplementedError 348 349 350 __all__ = [ 351 "EntityType", 352 "Entity", 353 "Org", 354 "Project", 355 "Team", 356 "User", 357 "DataStorage", 358 "ProjectMetadataStorage", 359 "Snapshot", 360 "BlobMetadata", 361 "AnySnapshot", 362 ]