/ src / evidently / legacy / ui / workspace / cloud.py
cloud.py
  1  import dataclasses
  2  import json
  3  from io import BytesIO
  4  from typing import BinaryIO
  5  from typing import Dict
  6  from typing import List
  7  from typing import Literal
  8  from typing import NamedTuple
  9  from typing import Optional
 10  from typing import Tuple
 11  from typing import Type
 12  from typing import Union
 13  from typing import overload
 14  
 15  import pandas as pd
 16  from requests import HTTPError
 17  from requests import Response
 18  
 19  from evidently._pydantic_compat import parse_obj_as
 20  from evidently.core.datasets import DataDefinition
 21  from evidently.core.datasets import Dataset
 22  from evidently.legacy.pipeline.column_mapping import ColumnMapping
 23  from evidently.legacy.report import Report
 24  from evidently.legacy.test_suite import TestSuite
 25  from evidently.legacy.ui.api.models import OrgModel
 26  from evidently.legacy.ui.api.models import TeamModel
 27  from evidently.legacy.ui.base import Org
 28  from evidently.legacy.ui.base import Team
 29  from evidently.legacy.ui.datasets import DatasetSourceType
 30  from evidently.legacy.ui.managers.projects import ProjectManager
 31  from evidently.legacy.ui.storage.common import NoopAuthManager
 32  from evidently.legacy.ui.type_aliases import STR_UUID
 33  from evidently.legacy.ui.type_aliases import ZERO_UUID
 34  from evidently.legacy.ui.type_aliases import DatasetID
 35  from evidently.legacy.ui.type_aliases import OrgID
 36  from evidently.legacy.ui.type_aliases import ProjectID
 37  from evidently.legacy.ui.type_aliases import TeamID
 38  from evidently.legacy.ui.workspace.remote import NoopBlobStorage
 39  from evidently.legacy.ui.workspace.remote import NoopDataStorage
 40  from evidently.legacy.ui.workspace.remote import RemoteProjectMetadataStorage
 41  from evidently.legacy.ui.workspace.remote import T
 42  from evidently.legacy.ui.workspace.view import WorkspaceView
 43  from evidently.pydantic_utils import get_classpath
 44  
 45  TOKEN_HEADER_NAME = "X-Evidently-Token"
 46  
 47  
 48  class Cookie(NamedTuple):
 49      key: str
 50      description: str
 51      httponly: bool
 52  
 53  
 54  ACCESS_TOKEN_COOKIE = Cookie(
 55      key="app.at",
 56      description="",
 57      httponly=True,
 58  )
 59  
 60  
 61  class CloudMetadataStorage(RemoteProjectMetadataStorage):
 62      def __init__(self, base_url: str, token: str, token_cookie_name: str):
 63          self.token = token
 64          self.token_cookie_name = token_cookie_name
 65          self._api_key = None
 66          if token.startswith("sk_") and len(token.split(".")) >= 3:
 67              self._api_key = token
 68          self._jwt_token: Optional[str] = None
 69          self._logged_in: bool = False
 70          super().__init__(base_url=base_url)
 71  
 72      def _get_jwt_token(self):
 73          return super()._request("/api/users/login", "GET", headers={TOKEN_HEADER_NAME: self.token}).text
 74  
 75      @property
 76      def jwt_token(self):
 77          if self._jwt_token is None:
 78              self._jwt_token = self._get_jwt_token()
 79  
 80          return self._jwt_token
 81  
 82      def _prepare_request(
 83          self,
 84          path: str,
 85          method: str,
 86          query_params: Optional[dict] = None,
 87          body: Optional[dict] = None,
 88          cookies=None,
 89          headers: Dict[str, str] = None,
 90          form_data: bool = False,
 91      ):
 92          r = super()._prepare_request(
 93              path=path,
 94              method=method,
 95              query_params=query_params,
 96              body=body,
 97              cookies=cookies,
 98              headers=headers,
 99              form_data=form_data,
100          )
101          if path == "/api/users/login":
102              return r
103          if self._api_key is not None:
104              r.headers["Authorization"] = f"Bearer {self._api_key}"
105          else:
106              r.cookies[self.token_cookie_name] = self.jwt_token
107          return r
108  
109      @overload
110      def _request(
111          self,
112          path: str,
113          method: str,
114          query_params: Optional[dict] = None,
115          body: Optional[dict] = None,
116          response_model: Type[T] = ...,
117          cookies=None,
118          headers: Dict[str, str] = None,
119          form_data: bool = False,
120      ) -> T:
121          pass
122  
123      @overload
124      def _request(
125          self,
126          path: str,
127          method: str,
128          query_params: Optional[dict] = None,
129          body: Optional[dict] = None,
130          response_model: Literal[None] = None,
131          cookies=None,
132          headers: Dict[str, str] = None,
133          form_data: bool = False,
134      ) -> Response:
135          pass
136  
137      def _request(
138          self,
139          path: str,
140          method: str,
141          query_params: Optional[dict] = None,
142          body: Optional[dict] = None,
143          response_model: Optional[Type[T]] = None,
144          cookies=None,
145          headers: Dict[str, str] = None,
146          form_data: bool = False,
147      ) -> Union[Response, T]:
148          try:
149              res = super()._request(
150                  path=path,
151                  method=method,
152                  query_params=query_params,
153                  body=body,
154                  response_model=response_model,
155                  cookies=cookies,
156                  headers=headers,
157                  form_data=form_data,
158              )
159              self._logged_in = True
160              return res
161          except HTTPError as e:
162              if self._logged_in and e.response.status_code == 401:
163                  # renew token and retry
164                  self._jwt_token = self._get_jwt_token()
165                  cookies[self.token_cookie_name] = self.jwt_token
166                  return super()._request(
167                      path,
168                      method,
169                      query_params,
170                      body,
171                      response_model,
172                      cookies=cookies,
173                      headers=headers,
174                      form_data=form_data,
175                  )
176              raise
177  
178      def create_org(self, org: Org) -> OrgModel:
179          return self._request("/api/orgs", "POST", body=org.dict(), response_model=OrgModel)
180  
181      def list_orgs(self) -> List[OrgModel]:
182          return self._request("/api/orgs", "GET", response_model=List[OrgModel])
183  
184      def create_team(self, team: Team, org_id: OrgID = None) -> TeamModel:
185          return self._request(
186              "/api/teams",
187              "POST",
188              query_params={"name": team.name, "org_id": org_id},
189              response_model=TeamModel,
190          )
191  
192      def add_dataset(
193          self,
194          file: BinaryIO,
195          name: str,
196          project_id: ProjectID,
197          description: Optional[str],
198          column_mapping: Optional[ColumnMapping],
199          dataset_source: DatasetSourceType = DatasetSourceType.file,
200      ) -> DatasetID:
201          cm_payload = json.dumps(dataclasses.asdict(column_mapping)) if column_mapping is not None else None
202          response: Response = self._request(
203              "/api/datasets/",
204              "POST",
205              body={
206                  "name": name,
207                  "description": description,
208                  "file": file,
209                  "column_mapping": cm_payload,
210                  "source_type": dataset_source.value,
211              },
212              query_params={"project_id": project_id},
213              form_data=True,
214          )
215          return DatasetID(response.json()["dataset_id"])
216  
217      def load_dataset(self, dataset_id: DatasetID) -> pd.DataFrame:
218          response: Response = self._request(f"/api/datasets/{dataset_id}/download", "GET")
219          return pd.read_parquet(BytesIO(response.content))
220  
221      def add_dataset_v2(
222          self, project_id: ProjectID, dataset: Dataset, name: str, description: Optional[str]
223      ) -> DatasetID:
224          data_definition = json.dumps(dataset.data_definition.dict())
225          file = NamedBytesIO(b"", "data.parquet")
226          dataset.as_dataframe().to_parquet(file)
227          file.seek(0)
228          response: Response = self._request(
229              "/api/v2/datasets/upload",
230              "POST",
231              body={
232                  "name": name,
233                  "description": description,
234                  "file": file,
235                  "data_definition": data_definition,
236              },
237              query_params={"project_id": project_id},
238              form_data=True,
239          )
240          return DatasetID(response.json()["dataset"]["id"])
241  
242      def load_dataset_v2(self, dataset_id: DatasetID) -> Dataset:
243          response: Response = self._request(f"/api/v2/datasets/{dataset_id}/download", "GET")
244  
245          metadata, file_content = read_multipart_response(response)
246  
247          df = pd.read_parquet(BytesIO(file_content))
248          data_def = parse_obj_as(DataDefinition, metadata["data_definition"])
249          return Dataset.from_pandas(df, data_definition=data_def)
250  
251  
252  def read_multipart_response(response: Response) -> Tuple[Dict, bytes]:
253      content_type = response.headers.get("Content-Type", "")
254      boundary = content_type.split("boundary=")[-1]
255  
256      if not boundary:
257          raise ValueError("No boundary found in Content-Type header")
258  
259      parts = response.content.split(f"--{boundary}".encode())
260  
261      metadata = None
262      file_content = None
263  
264      for part in parts:
265          if b"Content-Type: application/json" in part:
266              json_start = part.find(b"\r\n\r\n") + 4
267              metadata = json.loads(part[json_start:].decode())
268          elif b"Content-Type: application/octet-stream" in part:
269              file_start = part.find(b"\r\n\r\n") + 4
270              file_content = part[file_start:-2]
271  
272      if metadata is None or file_content is None:
273          raise ValueError("Wrong response from server")
274      return metadata, file_content
275  
276  
277  class NamedBytesIO(BytesIO):
278      def __init__(self, initial_bytes: bytes, name: str):
279          super().__init__(initial_bytes=initial_bytes)
280          self.name = name
281  
282  
283  class CloudWorkspace(WorkspaceView):
284      token: str
285      URL: str = "https://app.evidently.cloud"
286  
287      def __init__(
288          self,
289          token: Optional[str] = None,
290          url: str = None,
291      ):
292          if token is None:
293              import os
294  
295              token = os.environ.get("EVIDENTLY_API_KEY", default=None)
296          if token is None:
297              raise ValueError(
298                  "To use CloudWorkspace you must provide a token through argument or env variable EVIDENTLY_API_KEY"
299              )
300          self.token = token
301          if token.startswith("sk_") and len(token.split(".")) >= 3:
302              self._api_key = token
303          self.url = url if url is not None else self.URL
304  
305          # todo: default org if user have only one
306          user_id = ZERO_UUID  # todo: get from /me
307          meta = CloudMetadataStorage(
308              base_url=self.url,
309              token=self.token,
310              token_cookie_name=ACCESS_TOKEN_COOKIE.key,
311          )
312  
313          pm = ProjectManager(
314              project_metadata=meta,
315              blob_storage=(NoopBlobStorage()),
316              data_storage=(NoopDataStorage()),
317              auth_manager=(CloudAuthManager()),
318          )
319          super().__init__(
320              user_id,
321              pm,
322          )
323  
324      def create_org(self, name: str) -> Org:
325          assert isinstance(self.project_manager.project_metadata, CloudMetadataStorage)
326          return self.project_manager.project_metadata.create_org(Org(name=name)).to_org()
327  
328      def list_orgs(self) -> List[Org]:
329          assert isinstance(self.project_manager.project_metadata, CloudMetadataStorage)
330          return [o.to_org() for o in self.project_manager.project_metadata.list_orgs()]
331  
332      def create_team(self, name: str, org_id: OrgID) -> Team:
333          assert isinstance(self.project_manager.project_metadata, CloudMetadataStorage)
334          return self.project_manager.project_metadata.create_team(Team(name=name, org_id=org_id), org_id).to_team()
335  
336      def add_dataset(
337          self,
338          data_or_path: Union[str, pd.DataFrame],
339          name: str,
340          project_id: STR_UUID,
341          description: Optional[str] = None,
342          column_mapping: Optional[ColumnMapping] = None,
343          dataset_source: DatasetSourceType = DatasetSourceType.file,
344      ) -> DatasetID:
345          file: Union[NamedBytesIO, BinaryIO]
346          assert isinstance(self.project_manager.project_metadata, CloudMetadataStorage)
347          if isinstance(data_or_path, str):
348              file = open(data_or_path, "rb")
349          elif isinstance(data_or_path, pd.DataFrame):
350              file = NamedBytesIO(b"", "data.parquet")
351              data_or_path.to_parquet(file)
352              file.seek(0)
353          else:
354              raise NotImplementedError(f"Add datasets is not implemented for {get_classpath(data_or_path.__class__)}")
355          if isinstance(project_id, str):
356              project_id = ProjectID(project_id)
357          try:
358              return self.project_manager.project_metadata.add_dataset(
359                  file, name, project_id, description, column_mapping, dataset_source
360              )
361          finally:
362              file.close()
363  
364      def load_dataset(self, dataset_id: DatasetID) -> pd.DataFrame:
365          assert isinstance(self.project_manager.project_metadata, CloudMetadataStorage)
366          return self.project_manager.project_metadata.load_dataset(dataset_id)
367  
368      def add_report_with_data(self, project_id: STR_UUID, report: Report):
369          self.add_report(project_id, report)
370  
371      def add_test_suite_with_data(self, project_id: STR_UUID, test_suite: TestSuite):
372          self.add_test_suite(project_id, test_suite)
373  
374      def add_dataset_v2(
375          self, project_id: STR_UUID, dataset: Dataset, name: str, description: Optional[str] = None
376      ) -> DatasetID:
377          assert isinstance(self.project_manager.project_metadata, CloudMetadataStorage)
378          return self.project_manager.project_metadata.add_dataset_v2(
379              project_id if isinstance(project_id, ProjectID) else ProjectID(project_id), dataset, name, description
380          )
381  
382      def load_dataset_v2(self, dataset_id: STR_UUID) -> Dataset:
383          assert isinstance(self.project_manager.project_metadata, CloudMetadataStorage)
384          return self.project_manager.project_metadata.load_dataset_v2(
385              dataset_id if isinstance(dataset_id, DatasetID) else DatasetID(dataset_id)
386          )
387  
388  
389  class CloudAuthManager(NoopAuthManager):
390      async def get_team(self, team_id: TeamID) -> Optional[Team]:
391          return Team(id=team_id, name="", org_id=ZERO_UUID)