/ src / evidently / ui / service / base.py
base.py
  1  import contextlib
  2  import datetime
  3  import json
  4  from abc import ABC
  5  from abc import abstractmethod
  6  from enum import Enum
  7  from typing import IO
  8  from typing import TYPE_CHECKING
  9  from typing import ClassVar
 10  from typing import Dict
 11  from typing import Iterator
 12  from typing import List
 13  from typing import Optional
 14  from typing import Set
 15  from typing import TypeVar
 16  from typing import Union
 17  
 18  import uuid6
 19  
 20  from evidently._pydantic_compat import BaseModel
 21  from evidently._pydantic_compat import Field
 22  from evidently._pydantic_compat import PrivateAttr
 23  from evidently.core.report import Snapshot as SnapshotV2
 24  from evidently.core.serialization import SnapshotModel
 25  from evidently.legacy.core import new_id
 26  from evidently.legacy.suite.base_suite import Snapshot
 27  from evidently.legacy.ui.type_aliases import BlobID
 28  from evidently.legacy.ui.type_aliases import EntityID
 29  from evidently.legacy.ui.type_aliases import OrgID
 30  from evidently.legacy.ui.type_aliases import ProjectID
 31  from evidently.legacy.ui.type_aliases import SnapshotID
 32  from evidently.legacy.ui.type_aliases import TeamID
 33  from evidently.legacy.ui.type_aliases import UserID
 34  from evidently.legacy.utils import NumpyEncoder
 35  from evidently.legacy.utils.sync import sync_api
 36  from evidently.sdk.models import DashboardModel
 37  from evidently.sdk.models import SnapshotMetadataModel
 38  
 39  if TYPE_CHECKING:
 40      from evidently.ui.service.managers.projects import ProjectManager
 41  
 42  AnySnapshot = Union[Snapshot, SnapshotV2]
 43  
 44  
 45  class SeriesFilter(BaseModel):
 46      tags: List[str]
 47      metadata: Dict[str, str]
 48      metric: str
 49      metric_labels: Dict[str, str]
 50  
 51  
 52  class BatchMetricData(BaseModel):
 53      timestamp_start: Optional[datetime.datetime] = None
 54      timestamp_end: Optional[datetime.datetime] = None
 55      series_filter: Optional[List[SeriesFilter]] = None
 56  
 57  
 58  class Series(BaseModel):
 59      metric_type: str
 60      filter_index: int
 61      params: Dict[str, str]
 62      values: List[Optional[float]]
 63  
 64  
 65  class SeriesSource(BaseModel):
 66      snapshot_id: SnapshotID
 67      timestamp: datetime.datetime
 68      tags: List[str]
 69      metadata: Dict[str, str]
 70  
 71  
 72  class SeriesResponse(BaseModel):
 73      sources: List[SeriesSource]
 74      series: List[Series]
 75  
 76  
 77  class BlobMetadata(BaseModel):
 78      id: BlobID
 79      size: Optional[int]
 80  
 81  
 82  class EntityType(Enum):
 83      Config = "config"
 84      ConfigVersion = "config_version"
 85      Prompt = "prompt"
 86      PromptVersion = "prompt_version"
 87      Dataset = "dataset"
 88      Project = "project"
 89      Team = "team"
 90      Org = "org"
 91  
 92  
 93  class Entity(BaseModel):
 94      entity_type: ClassVar[EntityType]
 95      id: EntityID
 96  
 97  
 98  class Org(Entity):
 99      entity_type: ClassVar[EntityType] = EntityType.Org
100      id: OrgID = Field(default_factory=new_id)
101      name: str
102  
103  
104  class Team(Entity):
105      entity_type: ClassVar[EntityType] = EntityType.Team
106      id: TeamID = Field(default_factory=new_id)
107      name: str
108      org_id: Optional[OrgID]
109  
110  
111  UT = TypeVar("UT", bound="User")
112  
113  
114  class User(BaseModel):
115      id: UserID = Field(default_factory=new_id)
116      name: str
117      email: str = ""
118  
119      def merge(self: UT, other: "User") -> UT:
120          kwargs = {f: getattr(other, f, None) or getattr(self, f) for f in self.__fields__}
121          return self.__class__(**kwargs)
122  
123  
124  def _default_dashboard():
125      from evidently.legacy.ui.dashboards import DashboardConfig
126  
127      return DashboardConfig(name="", panels=[])
128  
129  
130  class Project(Entity):
131      entity_type: ClassVar[EntityType] = EntityType.Project
132  
133      class Config:
134          underscore_attrs_are_private = True
135  
136      id: ProjectID = Field(default_factory=new_id)
137      name: str
138      description: Optional[str] = None
139  
140      team_id: Optional[TeamID] = None
141      org_id: Optional[OrgID] = None
142  
143      date_from: Optional[datetime.datetime] = None
144      date_to: Optional[datetime.datetime] = None
145      created_at: Optional[datetime.datetime] = Field(default=None)
146      version: str = "1"
147      # Field(default=datetime.datetime.fromisoformat("1900-01-01T00:00:00"))
148  
149      _project_manager: Optional["ProjectManager"] = PrivateAttr(default=None)
150      _user_id: Optional[UserID] = PrivateAttr(default=None)
151  
152      def bind(self, project_manager: Optional["ProjectManager"], user_id: Optional[UserID]):
153          self._project_manager = project_manager
154          self._user_id = user_id
155          return self
156  
157      @property
158      def project_manager(self) -> "ProjectManager":
159          if self._project_manager is None:
160              raise ValueError("Project is not binded")
161          return self._project_manager
162  
163      async def save_async(self):
164          await self.project_manager.update_project(self._user_id, self)  # type: ignore[arg-type]
165          return self
166  
167      async def load_snapshot_async(self, snapshot_id: SnapshotID) -> SnapshotModel:
168          return await self.project_manager.load_snapshot(self._user_id, self.id, snapshot_id)  # type: ignore[arg-type]
169  
170      async def add_snapshot_async(self, snapshot: SnapshotModel):
171          await self.project_manager.add_snapshot(self._user_id, self.id, snapshot)  # type: ignore[arg-type]
172  
173      async def delete_snapshot_async(self, snapshot_id: Union[str, SnapshotID]):
174          if isinstance(snapshot_id, str):
175              snapshot_id = uuid6.UUID(snapshot_id)
176          await self.project_manager.delete_snapshot(self._user_id, self.id, snapshot_id)  # type: ignore[arg-type]
177  
178      async def list_snapshots_async(self) -> List[SnapshotMetadataModel]:
179          return await self.project_manager.list_snapshots(self._user_id, self.id)  # type: ignore[arg-type]
180  
181      async def get_snapshot_metadata_async(self, id: SnapshotID) -> SnapshotMetadataModel:
182          return await self.project_manager.get_snapshot_metadata(self._user_id, self.id, id)  # type: ignore[arg-type]
183  
184      async def reload_async(self, reload_snapshots: bool = False):
185          # fixme: reload snapshots
186          project = await self.project_manager.get_project(self._user_id, self.id)  # type: ignore[arg-type]
187          self.__dict__.update(project.__dict__)
188  
189          if reload_snapshots:
190              await self.project_manager.reload_snapshots(self._user_id, self.id)  # type: ignore[arg-type]
191  
192      save = sync_api(save_async)
193      load_snapshot = sync_api(load_snapshot_async)
194      delete_snapshot = sync_api(delete_snapshot_async)
195      list_snapshots = sync_api(list_snapshots_async)
196      get_snapshot_metadata = sync_api(get_snapshot_metadata_async)
197      add_snapshot = sync_api(add_snapshot_async)
198      reload = sync_api(reload_async)
199  
200  
201  class ProjectMetadataStorage(ABC):
202      @abstractmethod
203      async def add_project(self, project: Project, user: User, org_id: Optional[OrgID] = None) -> Project:
204          raise NotImplementedError
205  
206      @abstractmethod
207      async def get_project(self, project_id: ProjectID) -> Optional[Project]:
208          raise NotImplementedError
209  
210      @abstractmethod
211      async def delete_project(self, project_id: ProjectID):
212          raise NotImplementedError
213  
214      @abstractmethod
215      async def list_projects(self, project_ids: Optional[Set[ProjectID]]) -> List[Project]:
216          raise NotImplementedError
217  
218      @abstractmethod
219      async def add_snapshot(self, project_id: ProjectID, snapshot: SnapshotModel) -> SnapshotID:
220          raise NotImplementedError
221  
222      @abstractmethod
223      async def delete_snapshot(self, project_id: ProjectID, snapshot_id: SnapshotID):
224          raise NotImplementedError
225  
226      @abstractmethod
227      async def search_project(self, project_name: str, project_ids: Optional[Set[ProjectID]]) -> List[Project]:
228          raise NotImplementedError
229  
230      @abstractmethod
231      async def list_snapshots(self, project_id: ProjectID) -> List[SnapshotMetadataModel]:
232          raise NotImplementedError
233  
234      @abstractmethod
235      async def get_snapshot_metadata(self, project_id: ProjectID, snapshot_id: SnapshotID) -> SnapshotMetadataModel:
236          raise NotImplementedError
237  
238      @abstractmethod
239      async def update_project(self, project: Project) -> Project:
240          raise NotImplementedError
241  
242      @abstractmethod
243      async def reload_snapshots(self, project_id: ProjectID):
244          raise NotImplementedError
245  
246  
247  class BlobStorage(ABC):
248      @abstractmethod
249      @contextlib.contextmanager
250      def open_blob(self, id: BlobID) -> Iterator[IO]:
251          raise NotImplementedError
252  
253      @abstractmethod
254      async def put_blob(self, blob_id: str, obj):
255          raise NotImplementedError
256  
257      def get_snapshot_blob_id(self, project_id: ProjectID, snapshot: Snapshot) -> BlobID:
258          raise NotImplementedError
259  
260      async def put_snapshot(self, project_id: ProjectID, snapshot: Snapshot) -> BlobMetadata:
261          id = self.get_snapshot_blob_id(project_id, snapshot)
262          await self.put_blob(id, json.dumps(snapshot.dict(), cls=NumpyEncoder))
263          return await self.get_blob_metadata(id)
264  
265      async def get_blob_metadata(self, blob_id: BlobID) -> BlobMetadata:
266          raise NotImplementedError
267  
268      def get_blob_data(self, id: BlobID) -> bytes:
269          with self.open_blob(id) as blob:
270              return blob.read()
271  
272      @abstractmethod
273      def blob_exists(self, id: BlobID):
274          raise NotImplementedError
275  
276      @abstractmethod
277      async def delete_blob(self, id: BlobID):
278          raise NotImplementedError
279  
280  
281  class ProjectDashboardStorage:
282      @abstractmethod
283      async def get_project_dashboard(self, project_id: ProjectID) -> DashboardModel:
284          raise NotImplementedError
285  
286  
287  class DataStorage(ABC):
288      @abstractmethod
289      async def add_snapshot_points(self, project_id: ProjectID, snapshot_id: SnapshotID, snapshot: SnapshotModel):
290          raise NotImplementedError
291  
292      @abstractmethod
293      async def get_data_series(
294          self,
295          project_id: ProjectID,
296          series_filter: List[SeriesFilter],
297          start_time: Optional[datetime.datetime],
298          end_time: Optional[datetime.datetime],
299      ) -> SeriesResponse:
300          raise NotImplementedError
301  
302      @abstractmethod
303      async def get_metrics(self, project_id: ProjectID, tags: List[str], metadata: Dict[str, str]) -> List[str]:
304          raise NotImplementedError
305  
306      @abstractmethod
307      async def get_metric_labels(
308          self,
309          project_id: ProjectID,
310          tags: List[str],
311          metadata: Dict[str, str],
312          metric: str,
313      ) -> List[str]:
314          raise NotImplementedError
315  
316      @abstractmethod
317      async def get_metric_label_values(
318          self,
319          project_id: ProjectID,
320          tags: List[str],
321          metadata: Dict[str, str],
322          metric: str,
323          label: str,
324      ) -> List[str]:
325          raise NotImplementedError