projects.py
1 import asyncio 2 import datetime 3 import json 4 from typing import Callable 5 from typing import Dict 6 from typing import List 7 from typing import Optional 8 from typing import Sequence 9 from typing import Union 10 11 from litestar import Response 12 from litestar import Router 13 from litestar import delete 14 from litestar import get 15 from litestar import patch 16 from litestar import post 17 from litestar.di import Provide 18 from litestar.exceptions import HTTPException 19 from litestar.params import Dependency 20 from litestar.params import Parameter 21 from typing_extensions import Annotated 22 23 from evidently._pydantic_compat import BaseModel 24 from evidently._pydantic_compat import parse_obj_as 25 from evidently.core.report import Snapshot 26 from evidently.core.serialization import SnapshotModel 27 from evidently.legacy.model.dashboard import DashboardInfo 28 from evidently.legacy.model.widget import BaseWidgetInfo 29 from evidently.legacy.report.report import METRIC_GENERATORS 30 from evidently.legacy.report.report import METRIC_PRESETS 31 from evidently.legacy.test_suite.test_suite import TEST_GENERATORS 32 from evidently.legacy.test_suite.test_suite import TEST_PRESETS 33 from evidently.legacy.ui.api.models import DashboardInfoModel 34 from evidently.legacy.utils import NumpyEncoder 35 from evidently.sdk.models import DashboardModel 36 from evidently.sdk.models import SnapshotMetadataModel 37 from evidently.ui.service.api.models import ReportModel 38 from evidently.ui.service.base import BatchMetricData 39 from evidently.ui.service.base import Project 40 from evidently.ui.service.base import SeriesResponse 41 from evidently.ui.service.datasets.snapshot_links import SnapshotDatasetLinksManager 42 from evidently.ui.service.managers.projects import ProjectManager 43 from evidently.ui.service.type_aliases import OrgID 44 from evidently.ui.service.type_aliases import ProjectID 45 from evidently.ui.service.type_aliases import SnapshotID 46 from evidently.ui.service.type_aliases import TeamID 47 from evidently.ui.service.type_aliases import UserID 48 49 50 async def path_project_dependency( 51 project_id: ProjectID, 52 project_manager: Annotated[ProjectManager, Dependency(skip_validation=True)], 53 user_id: UserID, 54 ): 55 project = await project_manager.get_project(user_id, project_id) 56 if project is None: 57 raise HTTPException(status_code=404, detail="project not found") 58 return project 59 60 61 @get("/{project_id:uuid}/reports") 62 async def list_reports( 63 user_id: UserID, 64 project: Annotated[Project, Dependency()], 65 project_manager: Annotated[ProjectManager, Dependency()], 66 log_event: Callable, 67 ) -> List[ReportModel]: 68 snapshots = await project_manager.list_snapshots(user_id, project.id) 69 reports = [ReportModel.from_snapshot(s) for s in snapshots] 70 log_event("list_reports", reports_count=len(reports)) 71 return reports 72 73 74 @get("/{project_id:uuid}/snapshots") 75 async def list_snapshots( 76 project: Annotated[Project, Dependency()], 77 project_manager: Annotated[ProjectManager, Dependency(skip_validation=True)], 78 log_event: Callable, 79 user_id: UserID, 80 snapshot_dataset_links: Annotated[ 81 SnapshotDatasetLinksManager, Dependency(skip_validation=True, default=None) 82 ] = None, 83 ) -> List[SnapshotMetadataModel]: 84 snapshots = await project_manager.list_snapshots(user_id, project.id) 85 if snapshot_dataset_links is not None: 86 coroutines = (snapshot_dataset_links.get_links(project.id, snapshot.id) for snapshot in snapshots) 87 links_results = await asyncio.gather(*coroutines) 88 for snapshot, links in zip(snapshots, links_results): 89 snapshot.links = links 90 log_event("list_snapshots", reports_count=len(snapshots)) 91 return snapshots 92 93 94 @get("/") 95 async def list_projects( 96 project_manager: Annotated[ProjectManager, Dependency(skip_validation=True)], 97 log_event: Callable, 98 user_id: UserID, 99 team_id: Annotated[Optional[TeamID], Parameter(title="filter by team")] = None, 100 org_id: Annotated[Optional[OrgID], Parameter(title="filter by org")] = None, 101 ) -> Sequence[Project]: 102 projects = await project_manager.list_projects(user_id, team_id, org_id) 103 log_event("list_projects", project_count=len(projects)) 104 return projects 105 106 107 @get("/{project_id:uuid}/info") 108 async def get_project_info( 109 project: Annotated[Project, Dependency()], 110 log_event: Callable, 111 ) -> Project: 112 log_event("get_project_info") 113 return project 114 115 116 @get("/search/{project_name:str}") 117 async def search_projects( 118 project_name: Annotated[str, Parameter(title="Name of the project to search")], 119 project_manager: Annotated[ProjectManager, Dependency(skip_validation=True)], 120 log_event: Callable, 121 user_id: UserID, 122 team_id: Annotated[Optional[TeamID], Parameter(title="filter by team")] = None, 123 org_id: Annotated[Optional[OrgID], Parameter(title="filter by org")] = None, 124 ) -> List[Project]: 125 log_event("search_projects") 126 return await project_manager.search_project(user_id, team_id=team_id, org_id=org_id, project_name=project_name) 127 128 129 @post("/{project_id:uuid}/info") 130 async def update_project_info( 131 project: Annotated[Project, Dependency()], 132 data: Project, 133 project_manager: Annotated[ProjectManager, Dependency(skip_validation=True)], 134 log_event: Callable, 135 user_id: UserID, 136 ) -> Project: 137 data.id = project.id 138 await project_manager.update_project(user_id, data) 139 log_event("update_project_info") 140 return project 141 142 143 @get("/{project_id:uuid}/reload") 144 async def reload_project_snapshots( 145 project: Annotated[Project, Dependency()], 146 log_event: Callable, 147 ) -> None: 148 await project.reload_async(reload_snapshots=True) 149 log_event("reload_project_snapshots") 150 151 152 async def path_snapshot_metadata_dependency( 153 project: Annotated[Project, Dependency()], 154 snapshot_id: SnapshotID, 155 ): 156 snapshot = await project.get_snapshot_metadata_async(snapshot_id) 157 if snapshot is None: 158 raise HTTPException(status_code=404, detail="Snapshot not found") 159 return snapshot 160 161 162 @get("/{project_id:uuid}/{snapshot_id:uuid}/graphs_data/{graph_id:str}") 163 async def get_snapshot_graph_data( 164 user_id: UserID, 165 project_id: ProjectID, 166 snapshot_id: SnapshotID, 167 project_manager: Annotated[ProjectManager, Dependency(skip_validation=True)], 168 graph_id: Annotated[str, Parameter(title="id of graph in snapshot")], 169 log_event: Callable, 170 ) -> str: 171 snapshot = await project_manager.load_snapshot(user_id, project_id, snapshot_id) 172 log_event("get_snapshot_graph_data") 173 for widget in snapshot.widgets: 174 for graph in widget.additionalGraphs: 175 if graph_id == graph.id: 176 return json.dumps(graph.dict() if not isinstance(graph, dict) else graph, cls=NumpyEncoder) 177 raise HTTPException(status_code=404, detail="Graph not found") 178 179 180 @get("/{project_id:uuid}/{snapshot_id:uuid}/download") 181 async def get_snapshot_download( 182 user_id: UserID, 183 project_id: ProjectID, 184 project_manager: Annotated[ProjectManager, Dependency(skip_validation=True)], 185 snapshot_metadata: Annotated[SnapshotMetadataModel, Dependency()], 186 log_event: Callable, 187 report_format: str = "html", 188 ) -> Response: 189 snapshot = await project_manager.load_snapshot(user_id, project_id, snapshot_metadata.id) 190 s = Snapshot.load_dict(snapshot.dict()) 191 if report_format == "html": 192 return Response( 193 s.get_html_str(as_iframe=False).encode("utf8"), 194 headers={"content-disposition": f"attachment;filename={snapshot_metadata.id}.html"}, 195 ) 196 if report_format == "json": 197 return Response( 198 s.json().encode("utf8"), 199 headers={"content-disposition": f"attachment;filename={snapshot_metadata.id}.json"}, 200 ) 201 log_event("get_snapshot_download") 202 raise HTTPException(status_code=400, detail=f"Unknown format {report_format}") 203 204 205 @get("/{project_id:uuid}/{snapshot_id:uuid}/data") 206 async def get_snapshot_data( 207 user_id: UserID, 208 project_id: ProjectID, 209 project_manager: Annotated[ProjectManager, Dependency(skip_validation=True)], 210 snapshot_metadata: Annotated[SnapshotMetadataModel, Dependency()], 211 log_event: Callable, 212 ) -> str: 213 snapshot = await project_manager.load_snapshot(user_id, project_id, snapshot_metadata.id) 214 info = DashboardInfo(name="", widgets=snapshot.widgets + snapshot.tests_widgets) 215 216 log_event( 217 "get_snapshot_data", 218 snapshot_type="report", 219 metrics=[m for m in snapshot.top_level_metrics], 220 metric_presets=snapshot.metadata.get(METRIC_PRESETS, []), 221 metric_generators=snapshot.metadata.get(METRIC_GENERATORS, []), 222 tests=[], 223 test_presets=snapshot.metadata.get(TEST_PRESETS, []), 224 test_generators=snapshot.metadata.get(TEST_GENERATORS, []), 225 ) 226 return json.dumps(info.dict(), cls=NumpyEncoder) 227 228 229 @get("/{project_id:uuid}/{snapshot_id:uuid}/metadata") 230 async def get_snapshot_metadata( 231 log_event: Callable, 232 snapshot_metadata: Annotated[SnapshotMetadataModel, Dependency()], 233 project: Annotated[Project, Dependency()], 234 snapshot_dataset_links: Annotated[ 235 SnapshotDatasetLinksManager, Dependency(skip_validation=True, default=None) 236 ] = None, 237 ) -> SnapshotMetadataModel: 238 log_event( 239 "get_snapshot_metadata", 240 snapshot_type="report", 241 metric_presets=snapshot_metadata.metadata.get(METRIC_PRESETS, []), 242 metric_generators=snapshot_metadata.metadata.get(METRIC_GENERATORS, []), 243 test_presets=snapshot_metadata.metadata.get(TEST_PRESETS, []), 244 test_generators=snapshot_metadata.metadata.get(TEST_GENERATORS, []), 245 ) 246 247 if snapshot_dataset_links is not None: 248 snapshot_metadata.links = await snapshot_dataset_links.get_links(project.id, snapshot_metadata.id) 249 250 return snapshot_metadata 251 252 253 @get("/{project_id:uuid}") 254 async def get_dashboard( 255 user_id: UserID, 256 project_id: ProjectID, 257 project_manager: Annotated[ProjectManager, Dependency(skip_validation=True)], 258 log_event: Callable, 259 ) -> DashboardModel: 260 dashboard = await project_manager.get_project_dashboard(user_id, project_id) 261 log_event("project_dashboard") 262 return dashboard 263 264 265 @post("/{project_id:uuid}") 266 async def update_dashboard( 267 user_id: UserID, 268 project_id: ProjectID, 269 data: DashboardModel, 270 project_manager: Annotated[ProjectManager, Dependency(skip_validation=True)], 271 log_event: Callable, 272 ) -> DashboardModel: 273 dashboard = await project_manager.save_project_dashboard(user_id, project_id, data) 274 log_event("save_project_dashboard") 275 return dashboard 276 277 278 @post("/") 279 async def add_project( 280 data: Project, 281 project_manager: Annotated[ProjectManager, Dependency(skip_validation=True)], 282 log_event: Callable, 283 user_id: UserID, 284 org_id: Optional[OrgID] = None, 285 ) -> ProjectID: 286 if org_id is None and data.org_id is not None: 287 org_id = data.org_id 288 289 p = await project_manager.add_project(data, user_id, org_id) 290 log_event("add_project") 291 return p.id 292 293 294 @patch("/{project_id:uuid}") 295 async def update_project( 296 user_id: UserID, 297 project_id: ProjectID, 298 data: Project, 299 project_manager: Annotated[ProjectManager, Dependency(skip_validation=True)], 300 ) -> ProjectID: 301 if data.id != project_id: 302 raise ValueError("id in data must be equal to project id") 303 await project_manager.update_project(user_id, data) 304 return data.id 305 306 307 @delete("/{project_id:uuid}") 308 async def delete_project( 309 project_id: Annotated[ProjectID, Parameter(title="id of project")], 310 project_manager: Annotated[ProjectManager, Dependency(skip_validation=True)], 311 log_event: Callable, 312 user_id: UserID, 313 ) -> None: 314 await project_manager.delete_project(user_id, project_id) 315 log_event("delete_project") 316 317 318 class AddSnapshotResponse(BaseModel): 319 snapshot_id: SnapshotID 320 321 322 @post("/{project_id:uuid}") 323 async def add_snapshot( 324 project: Annotated[Project, Dependency()], 325 body: bytes, 326 project_manager: Annotated[ProjectManager, Dependency(skip_validation=True)], 327 log_event: Annotated[Callable, Dependency()], 328 user_id: UserID, 329 ) -> AddSnapshotResponse: 330 model = parse_obj_as(SnapshotModel, json.loads(body)) 331 snapshot_id = await project_manager.add_snapshot(user_id, project.id, model) 332 log_event("add_snapshot") 333 return AddSnapshotResponse(snapshot_id=snapshot_id) 334 335 336 @delete("/{project_id:uuid}/{snapshot_id:uuid}") 337 async def delete_snapshot( 338 project: Annotated[Project, Dependency()], 339 snapshot_id: Annotated[SnapshotID, Parameter(title="id of snapshot")], 340 project_manager: Annotated[ProjectManager, Dependency(skip_validation=True)], 341 log_event: Callable, 342 user_id: UserID, 343 ) -> None: 344 await project_manager.delete_snapshot(user_id, project.id, snapshot_id) 345 log_event("delete_snapshot") 346 347 348 class MetricsList(BaseModel): 349 metrics: List[str] 350 351 352 class LabelsList(BaseModel): 353 labels: List[str] 354 355 356 class LabelValuesList(BaseModel): 357 label_values: List[str] 358 359 360 @get("/{project_id:uuid}/metrics") 361 async def get_snapshots_metrics( 362 project_id: ProjectID, 363 user_id: UserID, 364 tags: Optional[str], 365 metadata: Optional[str], 366 project_manager: Annotated[ProjectManager, Dependency(skip_validation=True)], 367 ) -> MetricsList: 368 _tags: List[str] = [] if tags is None else json.loads(tags) 369 _metadata: Dict[str, str] = {} if metadata is None else json.loads(metadata) 370 metrics = await project_manager.get_metrics(user_id, project_id, _tags, _metadata) 371 return MetricsList(metrics=metrics) 372 373 374 @get("/{project_id:uuid}/labels") 375 async def get_snapshots_metric_labels( 376 project_id: ProjectID, 377 user_id: UserID, 378 tags: Optional[str], 379 metadata: Optional[str], 380 metric_type: str, 381 project_manager: Annotated[ProjectManager, Dependency(skip_validation=True)], 382 ) -> LabelsList: 383 _tags: List[str] = [] if tags is None else json.loads(tags) 384 _metadata: Dict[str, str] = {} if metadata is None else json.loads(metadata) 385 labels = await project_manager.get_metric_labels(user_id, project_id, _tags, _metadata, metric_type) 386 return LabelsList(labels=labels) 387 388 389 @get("/{project_id:uuid}/label_values") 390 async def get_snapshots_metric_label_values( 391 project_id: ProjectID, 392 user_id: UserID, 393 tags: Optional[str], 394 metadata: Optional[str], 395 metric_type: str, 396 label: str, 397 project_manager: Annotated[ProjectManager, Dependency(skip_validation=True)], 398 ) -> LabelValuesList: 399 _tags: List[str] = [] if tags is None else json.loads(tags) 400 _metadata: Dict[str, str] = {} if metadata is None else json.loads(metadata) 401 values = await project_manager.get_metric_label_values(user_id, project_id, _tags, _metadata, metric_type, label) 402 return LabelValuesList(label_values=values) 403 404 405 @post("/{project_id:uuid}/data_series_batch") 406 async def get_snapshots_metrics_data_batch( 407 project_id: ProjectID, 408 user_id: UserID, 409 data: BatchMetricData, 410 project_manager: Annotated[ProjectManager, Dependency(skip_validation=True)], 411 timestamp_start: Optional[str] = None, 412 timestamp_end: Optional[str] = None, 413 ) -> SeriesResponse: 414 timestamp_start_ = datetime.datetime.fromisoformat(timestamp_start) if timestamp_start else None 415 timestamp_end_ = datetime.datetime.fromisoformat(timestamp_end) if timestamp_end else None 416 series = await project_manager.get_data_series( 417 user_id, 418 project_id, 419 data.series_filter or [], 420 timestamp_start_, 421 timestamp_end_, 422 ) 423 return series 424 425 426 # We need this endpoint to export 427 # some additional models to open api schema 428 @get("/models/additional") 429 async def additional_models() -> ( 430 List[ 431 Union[ 432 BaseWidgetInfo, 433 DashboardInfoModel, 434 ] 435 ] 436 ): 437 return [] 438 439 440 def create_projects_api(guard: Callable) -> Router: 441 projects_router_v1 = Router( 442 "/projects", 443 route_handlers=[ 444 # read 445 Router( 446 "", 447 route_handlers=[ 448 list_projects, 449 list_reports, 450 get_project_info, 451 search_projects, 452 get_snapshot_graph_data, 453 get_snapshot_data, 454 get_snapshot_download, 455 list_snapshots, 456 get_snapshot_metadata, 457 ], 458 ), 459 # write 460 Router( 461 "", 462 route_handlers=[ 463 add_project, 464 delete_project, 465 update_project_info, 466 reload_project_snapshots, 467 delete_snapshot, 468 ], 469 guards=[guard], 470 ), 471 ], 472 ) 473 projects_router_v2 = Router( 474 "/projects", 475 route_handlers=[ 476 # read 477 Router( 478 "", 479 route_handlers=[ 480 list_projects, 481 additional_models, 482 ], 483 ), 484 # write 485 Router( 486 "", 487 route_handlers=[ 488 add_project, 489 update_project, 490 delete_project, 491 ], 492 guards=[guard], 493 ), 494 ], 495 ) 496 497 dashboard_router_v2 = Router( 498 "/dashboards", 499 route_handlers=[ 500 # read 501 Router( 502 "", 503 route_handlers=[ 504 get_dashboard, 505 ], 506 ), 507 # write 508 Router( 509 "", 510 route_handlers=[ 511 update_dashboard, 512 ], 513 guards=[guard], 514 ), 515 ], 516 ) 517 snapshots_router_v2 = Router( 518 "/snapshots", 519 route_handlers=[ 520 # read 521 Router( 522 "", 523 route_handlers=[ 524 get_snapshots_metrics_data_batch, 525 get_snapshots_metrics, 526 get_snapshots_metric_labels, 527 get_snapshots_metric_label_values, 528 ], 529 ), 530 # write 531 Router( 532 "", 533 route_handlers=[add_snapshot], 534 guards=[guard], 535 ), 536 ], 537 ) 538 return Router( 539 "", 540 route_handlers=[ 541 Router("", route_handlers=[projects_router_v1]), 542 Router("v2", route_handlers=[projects_router_v2, snapshots_router_v2, dashboard_router_v2]), 543 ], 544 ) 545 546 547 projects_api_dependencies: Dict[str, Provide] = { 548 "project": Provide(path_project_dependency), 549 "snapshot_metadata": Provide(path_snapshot_metadata_dependency), 550 }