cloud.py
1 import dataclasses 2 import json 3 from io import BytesIO 4 from typing import BinaryIO 5 from typing import Dict 6 from typing import List 7 from typing import Literal 8 from typing import NamedTuple 9 from typing import Optional 10 from typing import Tuple 11 from typing import Type 12 from typing import Union 13 from typing import overload 14 15 import pandas as pd 16 from requests import HTTPError 17 from requests import Response 18 19 from evidently._pydantic_compat import parse_obj_as 20 from evidently.core.datasets import DataDefinition 21 from evidently.core.datasets import Dataset 22 from evidently.legacy.pipeline.column_mapping import ColumnMapping 23 from evidently.legacy.report import Report 24 from evidently.legacy.test_suite import TestSuite 25 from evidently.legacy.ui.api.models import OrgModel 26 from evidently.legacy.ui.api.models import TeamModel 27 from evidently.legacy.ui.base import Org 28 from evidently.legacy.ui.base import Team 29 from evidently.legacy.ui.datasets import DatasetSourceType 30 from evidently.legacy.ui.managers.projects import ProjectManager 31 from evidently.legacy.ui.storage.common import NoopAuthManager 32 from evidently.legacy.ui.type_aliases import STR_UUID 33 from evidently.legacy.ui.type_aliases import ZERO_UUID 34 from evidently.legacy.ui.type_aliases import DatasetID 35 from evidently.legacy.ui.type_aliases import OrgID 36 from evidently.legacy.ui.type_aliases import ProjectID 37 from evidently.legacy.ui.type_aliases import TeamID 38 from evidently.legacy.ui.workspace.remote import NoopBlobStorage 39 from evidently.legacy.ui.workspace.remote import NoopDataStorage 40 from evidently.legacy.ui.workspace.remote import RemoteProjectMetadataStorage 41 from evidently.legacy.ui.workspace.remote import T 42 from evidently.legacy.ui.workspace.view import WorkspaceView 43 from evidently.pydantic_utils import get_classpath 44 45 TOKEN_HEADER_NAME = "X-Evidently-Token" 46 47 48 class Cookie(NamedTuple): 49 key: str 50 description: str 51 httponly: bool 52 53 54 ACCESS_TOKEN_COOKIE = Cookie( 55 key="app.at", 56 description="", 57 httponly=True, 58 ) 59 60 61 class CloudMetadataStorage(RemoteProjectMetadataStorage): 62 def __init__(self, base_url: str, token: str, token_cookie_name: str): 63 self.token = token 64 self.token_cookie_name = token_cookie_name 65 self._api_key = None 66 if token.startswith("sk_") and len(token.split(".")) >= 3: 67 self._api_key = token 68 self._jwt_token: Optional[str] = None 69 self._logged_in: bool = False 70 super().__init__(base_url=base_url) 71 72 def _get_jwt_token(self): 73 return super()._request("/api/users/login", "GET", headers={TOKEN_HEADER_NAME: self.token}).text 74 75 @property 76 def jwt_token(self): 77 if self._jwt_token is None: 78 self._jwt_token = self._get_jwt_token() 79 80 return self._jwt_token 81 82 def _prepare_request( 83 self, 84 path: str, 85 method: str, 86 query_params: Optional[dict] = None, 87 body: Optional[dict] = None, 88 cookies=None, 89 headers: Dict[str, str] = None, 90 form_data: bool = False, 91 ): 92 r = super()._prepare_request( 93 path=path, 94 method=method, 95 query_params=query_params, 96 body=body, 97 cookies=cookies, 98 headers=headers, 99 form_data=form_data, 100 ) 101 if path == "/api/users/login": 102 return r 103 if self._api_key is not None: 104 r.headers["Authorization"] = f"Bearer {self._api_key}" 105 else: 106 r.cookies[self.token_cookie_name] = self.jwt_token 107 return r 108 109 @overload 110 def _request( 111 self, 112 path: str, 113 method: str, 114 query_params: Optional[dict] = None, 115 body: Optional[dict] = None, 116 response_model: Type[T] = ..., 117 cookies=None, 118 headers: Dict[str, str] = None, 119 form_data: bool = False, 120 ) -> T: 121 pass 122 123 @overload 124 def _request( 125 self, 126 path: str, 127 method: str, 128 query_params: Optional[dict] = None, 129 body: Optional[dict] = None, 130 response_model: Literal[None] = None, 131 cookies=None, 132 headers: Dict[str, str] = None, 133 form_data: bool = False, 134 ) -> Response: 135 pass 136 137 def _request( 138 self, 139 path: str, 140 method: str, 141 query_params: Optional[dict] = None, 142 body: Optional[dict] = None, 143 response_model: Optional[Type[T]] = None, 144 cookies=None, 145 headers: Dict[str, str] = None, 146 form_data: bool = False, 147 ) -> Union[Response, T]: 148 try: 149 res = super()._request( 150 path=path, 151 method=method, 152 query_params=query_params, 153 body=body, 154 response_model=response_model, 155 cookies=cookies, 156 headers=headers, 157 form_data=form_data, 158 ) 159 self._logged_in = True 160 return res 161 except HTTPError as e: 162 if self._logged_in and e.response.status_code == 401: 163 # renew token and retry 164 self._jwt_token = self._get_jwt_token() 165 cookies[self.token_cookie_name] = self.jwt_token 166 return super()._request( 167 path, 168 method, 169 query_params, 170 body, 171 response_model, 172 cookies=cookies, 173 headers=headers, 174 form_data=form_data, 175 ) 176 raise 177 178 def create_org(self, org: Org) -> OrgModel: 179 return self._request("/api/orgs", "POST", body=org.dict(), response_model=OrgModel) 180 181 def list_orgs(self) -> List[OrgModel]: 182 return self._request("/api/orgs", "GET", response_model=List[OrgModel]) 183 184 def create_team(self, team: Team, org_id: OrgID = None) -> TeamModel: 185 return self._request( 186 "/api/teams", 187 "POST", 188 query_params={"name": team.name, "org_id": org_id}, 189 response_model=TeamModel, 190 ) 191 192 def add_dataset( 193 self, 194 file: BinaryIO, 195 name: str, 196 project_id: ProjectID, 197 description: Optional[str], 198 column_mapping: Optional[ColumnMapping], 199 dataset_source: DatasetSourceType = DatasetSourceType.file, 200 ) -> DatasetID: 201 cm_payload = json.dumps(dataclasses.asdict(column_mapping)) if column_mapping is not None else None 202 response: Response = self._request( 203 "/api/datasets/", 204 "POST", 205 body={ 206 "name": name, 207 "description": description, 208 "file": file, 209 "column_mapping": cm_payload, 210 "source_type": dataset_source.value, 211 }, 212 query_params={"project_id": project_id}, 213 form_data=True, 214 ) 215 return DatasetID(response.json()["dataset_id"]) 216 217 def load_dataset(self, dataset_id: DatasetID) -> pd.DataFrame: 218 response: Response = self._request(f"/api/datasets/{dataset_id}/download", "GET") 219 return pd.read_parquet(BytesIO(response.content)) 220 221 def add_dataset_v2( 222 self, project_id: ProjectID, dataset: Dataset, name: str, description: Optional[str] 223 ) -> DatasetID: 224 data_definition = json.dumps(dataset.data_definition.dict()) 225 file = NamedBytesIO(b"", "data.parquet") 226 dataset.as_dataframe().to_parquet(file) 227 file.seek(0) 228 response: Response = self._request( 229 "/api/v2/datasets/upload", 230 "POST", 231 body={ 232 "name": name, 233 "description": description, 234 "file": file, 235 "data_definition": data_definition, 236 }, 237 query_params={"project_id": project_id}, 238 form_data=True, 239 ) 240 return DatasetID(response.json()["dataset"]["id"]) 241 242 def load_dataset_v2(self, dataset_id: DatasetID) -> Dataset: 243 response: Response = self._request(f"/api/v2/datasets/{dataset_id}/download", "GET") 244 245 metadata, file_content = read_multipart_response(response) 246 247 df = pd.read_parquet(BytesIO(file_content)) 248 data_def = parse_obj_as(DataDefinition, metadata["data_definition"]) 249 return Dataset.from_pandas(df, data_definition=data_def) 250 251 252 def read_multipart_response(response: Response) -> Tuple[Dict, bytes]: 253 content_type = response.headers.get("Content-Type", "") 254 boundary = content_type.split("boundary=")[-1] 255 256 if not boundary: 257 raise ValueError("No boundary found in Content-Type header") 258 259 parts = response.content.split(f"--{boundary}".encode()) 260 261 metadata = None 262 file_content = None 263 264 for part in parts: 265 if b"Content-Type: application/json" in part: 266 json_start = part.find(b"\r\n\r\n") + 4 267 metadata = json.loads(part[json_start:].decode()) 268 elif b"Content-Type: application/octet-stream" in part: 269 file_start = part.find(b"\r\n\r\n") + 4 270 file_content = part[file_start:-2] 271 272 if metadata is None or file_content is None: 273 raise ValueError("Wrong response from server") 274 return metadata, file_content 275 276 277 class NamedBytesIO(BytesIO): 278 def __init__(self, initial_bytes: bytes, name: str): 279 super().__init__(initial_bytes=initial_bytes) 280 self.name = name 281 282 283 class CloudWorkspace(WorkspaceView): 284 token: str 285 URL: str = "https://app.evidently.cloud" 286 287 def __init__( 288 self, 289 token: Optional[str] = None, 290 url: str = None, 291 ): 292 if token is None: 293 import os 294 295 token = os.environ.get("EVIDENTLY_API_KEY", default=None) 296 if token is None: 297 raise ValueError( 298 "To use CloudWorkspace you must provide a token through argument or env variable EVIDENTLY_API_KEY" 299 ) 300 self.token = token 301 if token.startswith("sk_") and len(token.split(".")) >= 3: 302 self._api_key = token 303 self.url = url if url is not None else self.URL 304 305 # todo: default org if user have only one 306 user_id = ZERO_UUID # todo: get from /me 307 meta = CloudMetadataStorage( 308 base_url=self.url, 309 token=self.token, 310 token_cookie_name=ACCESS_TOKEN_COOKIE.key, 311 ) 312 313 pm = ProjectManager( 314 project_metadata=meta, 315 blob_storage=(NoopBlobStorage()), 316 data_storage=(NoopDataStorage()), 317 auth_manager=(CloudAuthManager()), 318 ) 319 super().__init__( 320 user_id, 321 pm, 322 ) 323 324 def create_org(self, name: str) -> Org: 325 assert isinstance(self.project_manager.project_metadata, CloudMetadataStorage) 326 return self.project_manager.project_metadata.create_org(Org(name=name)).to_org() 327 328 def list_orgs(self) -> List[Org]: 329 assert isinstance(self.project_manager.project_metadata, CloudMetadataStorage) 330 return [o.to_org() for o in self.project_manager.project_metadata.list_orgs()] 331 332 def create_team(self, name: str, org_id: OrgID) -> Team: 333 assert isinstance(self.project_manager.project_metadata, CloudMetadataStorage) 334 return self.project_manager.project_metadata.create_team(Team(name=name, org_id=org_id), org_id).to_team() 335 336 def add_dataset( 337 self, 338 data_or_path: Union[str, pd.DataFrame], 339 name: str, 340 project_id: STR_UUID, 341 description: Optional[str] = None, 342 column_mapping: Optional[ColumnMapping] = None, 343 dataset_source: DatasetSourceType = DatasetSourceType.file, 344 ) -> DatasetID: 345 file: Union[NamedBytesIO, BinaryIO] 346 assert isinstance(self.project_manager.project_metadata, CloudMetadataStorage) 347 if isinstance(data_or_path, str): 348 file = open(data_or_path, "rb") 349 elif isinstance(data_or_path, pd.DataFrame): 350 file = NamedBytesIO(b"", "data.parquet") 351 data_or_path.to_parquet(file) 352 file.seek(0) 353 else: 354 raise NotImplementedError(f"Add datasets is not implemented for {get_classpath(data_or_path.__class__)}") 355 if isinstance(project_id, str): 356 project_id = ProjectID(project_id) 357 try: 358 return self.project_manager.project_metadata.add_dataset( 359 file, name, project_id, description, column_mapping, dataset_source 360 ) 361 finally: 362 file.close() 363 364 def load_dataset(self, dataset_id: DatasetID) -> pd.DataFrame: 365 assert isinstance(self.project_manager.project_metadata, CloudMetadataStorage) 366 return self.project_manager.project_metadata.load_dataset(dataset_id) 367 368 def add_report_with_data(self, project_id: STR_UUID, report: Report): 369 self.add_report(project_id, report) 370 371 def add_test_suite_with_data(self, project_id: STR_UUID, test_suite: TestSuite): 372 self.add_test_suite(project_id, test_suite) 373 374 def add_dataset_v2( 375 self, project_id: STR_UUID, dataset: Dataset, name: str, description: Optional[str] = None 376 ) -> DatasetID: 377 assert isinstance(self.project_manager.project_metadata, CloudMetadataStorage) 378 return self.project_manager.project_metadata.add_dataset_v2( 379 project_id if isinstance(project_id, ProjectID) else ProjectID(project_id), dataset, name, description 380 ) 381 382 def load_dataset_v2(self, dataset_id: STR_UUID) -> Dataset: 383 assert isinstance(self.project_manager.project_metadata, CloudMetadataStorage) 384 return self.project_manager.project_metadata.load_dataset_v2( 385 dataset_id if isinstance(dataset_id, DatasetID) else DatasetID(dataset_id) 386 ) 387 388 389 class CloudAuthManager(NoopAuthManager): 390 async def get_team(self, team_id: TeamID) -> Optional[Team]: 391 return Team(id=team_id, name="", org_id=ZERO_UUID)