base.py
1 import contextlib 2 import datetime 3 import json 4 from abc import ABC 5 from abc import abstractmethod 6 from enum import Enum 7 from typing import IO 8 from typing import TYPE_CHECKING 9 from typing import ClassVar 10 from typing import Dict 11 from typing import Iterator 12 from typing import List 13 from typing import Optional 14 from typing import Set 15 from typing import TypeVar 16 from typing import Union 17 18 import uuid6 19 20 from evidently._pydantic_compat import BaseModel 21 from evidently._pydantic_compat import Field 22 from evidently._pydantic_compat import PrivateAttr 23 from evidently.core.report import Snapshot as SnapshotV2 24 from evidently.core.serialization import SnapshotModel 25 from evidently.legacy.core import new_id 26 from evidently.legacy.suite.base_suite import Snapshot 27 from evidently.legacy.ui.type_aliases import BlobID 28 from evidently.legacy.ui.type_aliases import EntityID 29 from evidently.legacy.ui.type_aliases import OrgID 30 from evidently.legacy.ui.type_aliases import ProjectID 31 from evidently.legacy.ui.type_aliases import SnapshotID 32 from evidently.legacy.ui.type_aliases import TeamID 33 from evidently.legacy.ui.type_aliases import UserID 34 from evidently.legacy.utils import NumpyEncoder 35 from evidently.legacy.utils.sync import sync_api 36 from evidently.sdk.models import DashboardModel 37 from evidently.sdk.models import SnapshotMetadataModel 38 39 if TYPE_CHECKING: 40 from evidently.ui.service.managers.projects import ProjectManager 41 42 AnySnapshot = Union[Snapshot, SnapshotV2] 43 44 45 class SeriesFilter(BaseModel): 46 tags: List[str] 47 metadata: Dict[str, str] 48 metric: str 49 metric_labels: Dict[str, str] 50 51 52 class BatchMetricData(BaseModel): 53 timestamp_start: Optional[datetime.datetime] = None 54 timestamp_end: Optional[datetime.datetime] = None 55 series_filter: Optional[List[SeriesFilter]] = None 56 57 58 class Series(BaseModel): 59 metric_type: str 60 filter_index: int 61 params: Dict[str, str] 62 values: List[Optional[float]] 63 64 65 class SeriesSource(BaseModel): 66 snapshot_id: SnapshotID 67 timestamp: datetime.datetime 68 tags: List[str] 69 metadata: Dict[str, str] 70 71 72 class SeriesResponse(BaseModel): 73 sources: List[SeriesSource] 74 series: List[Series] 75 76 77 class BlobMetadata(BaseModel): 78 id: BlobID 79 size: Optional[int] 80 81 82 class EntityType(Enum): 83 Config = "config" 84 ConfigVersion = "config_version" 85 Prompt = "prompt" 86 PromptVersion = "prompt_version" 87 Dataset = "dataset" 88 Project = "project" 89 Team = "team" 90 Org = "org" 91 92 93 class Entity(BaseModel): 94 entity_type: ClassVar[EntityType] 95 id: EntityID 96 97 98 class Org(Entity): 99 entity_type: ClassVar[EntityType] = EntityType.Org 100 id: OrgID = Field(default_factory=new_id) 101 name: str 102 103 104 class Team(Entity): 105 entity_type: ClassVar[EntityType] = EntityType.Team 106 id: TeamID = Field(default_factory=new_id) 107 name: str 108 org_id: Optional[OrgID] 109 110 111 UT = TypeVar("UT", bound="User") 112 113 114 class User(BaseModel): 115 id: UserID = Field(default_factory=new_id) 116 name: str 117 email: str = "" 118 119 def merge(self: UT, other: "User") -> UT: 120 kwargs = {f: getattr(other, f, None) or getattr(self, f) for f in self.__fields__} 121 return self.__class__(**kwargs) 122 123 124 def _default_dashboard(): 125 from evidently.legacy.ui.dashboards import DashboardConfig 126 127 return DashboardConfig(name="", panels=[]) 128 129 130 class Project(Entity): 131 entity_type: ClassVar[EntityType] = EntityType.Project 132 133 class Config: 134 underscore_attrs_are_private = True 135 136 id: ProjectID = Field(default_factory=new_id) 137 name: str 138 description: Optional[str] = None 139 140 team_id: Optional[TeamID] = None 141 org_id: Optional[OrgID] = None 142 143 date_from: Optional[datetime.datetime] = None 144 date_to: Optional[datetime.datetime] = None 145 created_at: Optional[datetime.datetime] = Field(default=None) 146 version: str = "1" 147 # Field(default=datetime.datetime.fromisoformat("1900-01-01T00:00:00")) 148 149 _project_manager: Optional["ProjectManager"] = PrivateAttr(default=None) 150 _user_id: Optional[UserID] = PrivateAttr(default=None) 151 152 def bind(self, project_manager: Optional["ProjectManager"], user_id: Optional[UserID]): 153 self._project_manager = project_manager 154 self._user_id = user_id 155 return self 156 157 @property 158 def project_manager(self) -> "ProjectManager": 159 if self._project_manager is None: 160 raise ValueError("Project is not binded") 161 return self._project_manager 162 163 async def save_async(self): 164 await self.project_manager.update_project(self._user_id, self) # type: ignore[arg-type] 165 return self 166 167 async def load_snapshot_async(self, snapshot_id: SnapshotID) -> SnapshotModel: 168 return await self.project_manager.load_snapshot(self._user_id, self.id, snapshot_id) # type: ignore[arg-type] 169 170 async def add_snapshot_async(self, snapshot: SnapshotModel): 171 await self.project_manager.add_snapshot(self._user_id, self.id, snapshot) # type: ignore[arg-type] 172 173 async def delete_snapshot_async(self, snapshot_id: Union[str, SnapshotID]): 174 if isinstance(snapshot_id, str): 175 snapshot_id = uuid6.UUID(snapshot_id) 176 await self.project_manager.delete_snapshot(self._user_id, self.id, snapshot_id) # type: ignore[arg-type] 177 178 async def list_snapshots_async(self) -> List[SnapshotMetadataModel]: 179 return await self.project_manager.list_snapshots(self._user_id, self.id) # type: ignore[arg-type] 180 181 async def get_snapshot_metadata_async(self, id: SnapshotID) -> SnapshotMetadataModel: 182 return await self.project_manager.get_snapshot_metadata(self._user_id, self.id, id) # type: ignore[arg-type] 183 184 async def reload_async(self, reload_snapshots: bool = False): 185 # fixme: reload snapshots 186 project = await self.project_manager.get_project(self._user_id, self.id) # type: ignore[arg-type] 187 self.__dict__.update(project.__dict__) 188 189 if reload_snapshots: 190 await self.project_manager.reload_snapshots(self._user_id, self.id) # type: ignore[arg-type] 191 192 save = sync_api(save_async) 193 load_snapshot = sync_api(load_snapshot_async) 194 delete_snapshot = sync_api(delete_snapshot_async) 195 list_snapshots = sync_api(list_snapshots_async) 196 get_snapshot_metadata = sync_api(get_snapshot_metadata_async) 197 add_snapshot = sync_api(add_snapshot_async) 198 reload = sync_api(reload_async) 199 200 201 class ProjectMetadataStorage(ABC): 202 @abstractmethod 203 async def add_project(self, project: Project, user: User, org_id: Optional[OrgID] = None) -> Project: 204 raise NotImplementedError 205 206 @abstractmethod 207 async def get_project(self, project_id: ProjectID) -> Optional[Project]: 208 raise NotImplementedError 209 210 @abstractmethod 211 async def delete_project(self, project_id: ProjectID): 212 raise NotImplementedError 213 214 @abstractmethod 215 async def list_projects(self, project_ids: Optional[Set[ProjectID]]) -> List[Project]: 216 raise NotImplementedError 217 218 @abstractmethod 219 async def add_snapshot(self, project_id: ProjectID, snapshot: SnapshotModel) -> SnapshotID: 220 raise NotImplementedError 221 222 @abstractmethod 223 async def delete_snapshot(self, project_id: ProjectID, snapshot_id: SnapshotID): 224 raise NotImplementedError 225 226 @abstractmethod 227 async def search_project(self, project_name: str, project_ids: Optional[Set[ProjectID]]) -> List[Project]: 228 raise NotImplementedError 229 230 @abstractmethod 231 async def list_snapshots(self, project_id: ProjectID) -> List[SnapshotMetadataModel]: 232 raise NotImplementedError 233 234 @abstractmethod 235 async def get_snapshot_metadata(self, project_id: ProjectID, snapshot_id: SnapshotID) -> SnapshotMetadataModel: 236 raise NotImplementedError 237 238 @abstractmethod 239 async def update_project(self, project: Project) -> Project: 240 raise NotImplementedError 241 242 @abstractmethod 243 async def reload_snapshots(self, project_id: ProjectID): 244 raise NotImplementedError 245 246 247 class BlobStorage(ABC): 248 @abstractmethod 249 @contextlib.contextmanager 250 def open_blob(self, id: BlobID) -> Iterator[IO]: 251 raise NotImplementedError 252 253 @abstractmethod 254 async def put_blob(self, blob_id: str, obj): 255 raise NotImplementedError 256 257 def get_snapshot_blob_id(self, project_id: ProjectID, snapshot: Snapshot) -> BlobID: 258 raise NotImplementedError 259 260 async def put_snapshot(self, project_id: ProjectID, snapshot: Snapshot) -> BlobMetadata: 261 id = self.get_snapshot_blob_id(project_id, snapshot) 262 await self.put_blob(id, json.dumps(snapshot.dict(), cls=NumpyEncoder)) 263 return await self.get_blob_metadata(id) 264 265 async def get_blob_metadata(self, blob_id: BlobID) -> BlobMetadata: 266 raise NotImplementedError 267 268 def get_blob_data(self, id: BlobID) -> bytes: 269 with self.open_blob(id) as blob: 270 return blob.read() 271 272 @abstractmethod 273 def blob_exists(self, id: BlobID): 274 raise NotImplementedError 275 276 @abstractmethod 277 async def delete_blob(self, id: BlobID): 278 raise NotImplementedError 279 280 281 class ProjectDashboardStorage: 282 @abstractmethod 283 async def get_project_dashboard(self, project_id: ProjectID) -> DashboardModel: 284 raise NotImplementedError 285 286 287 class DataStorage(ABC): 288 @abstractmethod 289 async def add_snapshot_points(self, project_id: ProjectID, snapshot_id: SnapshotID, snapshot: SnapshotModel): 290 raise NotImplementedError 291 292 @abstractmethod 293 async def get_data_series( 294 self, 295 project_id: ProjectID, 296 series_filter: List[SeriesFilter], 297 start_time: Optional[datetime.datetime], 298 end_time: Optional[datetime.datetime], 299 ) -> SeriesResponse: 300 raise NotImplementedError 301 302 @abstractmethod 303 async def get_metrics(self, project_id: ProjectID, tags: List[str], metadata: Dict[str, str]) -> List[str]: 304 raise NotImplementedError 305 306 @abstractmethod 307 async def get_metric_labels( 308 self, 309 project_id: ProjectID, 310 tags: List[str], 311 metadata: Dict[str, str], 312 metric: str, 313 ) -> List[str]: 314 raise NotImplementedError 315 316 @abstractmethod 317 async def get_metric_label_values( 318 self, 319 project_id: ProjectID, 320 tags: List[str], 321 metadata: Dict[str, str], 322 metric: str, 323 label: str, 324 ) -> List[str]: 325 raise NotImplementedError