base.py
1 import datetime 2 import traceback 3 import warnings 4 from functools import wraps 5 from typing import TYPE_CHECKING 6 from typing import Any 7 from typing import Dict 8 from typing import List 9 from typing import Optional 10 from typing import Union 11 12 import uuid6 13 14 from evidently._pydantic_compat import BaseModel 15 from evidently._pydantic_compat import Field 16 from evidently._pydantic_compat import validator 17 from evidently.legacy.base_metric import Metric 18 from evidently.legacy.core import new_id 19 from evidently.legacy.model.dashboard import DashboardInfo 20 from evidently.legacy.model.widget import BaseWidgetInfo 21 from evidently.legacy.renderers.html_widgets import CounterData 22 from evidently.legacy.renderers.html_widgets import WidgetSize 23 from evidently.legacy.renderers.html_widgets import counter 24 from evidently.legacy.report import Report 25 from evidently.legacy.suite.base_suite import ReportBase 26 from evidently.legacy.test_suite import TestSuite 27 from evidently.legacy.ui.type_aliases import PanelID 28 from evidently.legacy.ui.type_aliases import ProjectID 29 from evidently.legacy.ui.type_aliases import TabID 30 from evidently.pydantic_utils import EnumValueMixin 31 from evidently.pydantic_utils import EvidentlyBaseModel 32 from evidently.pydantic_utils import FieldPath 33 from evidently.pydantic_utils import PolymorphicModel 34 from evidently.pydantic_utils import register_type_alias 35 36 from .utils import getattr_nested 37 38 if TYPE_CHECKING: 39 from evidently.legacy.ui.base import DataStorage 40 41 42 class ReportFilter(BaseModel): 43 metadata_values: Dict[str, str] 44 tag_values: List[str] 45 include_test_suites: bool = False 46 47 def filter(self, report: ReportBase): 48 if not self.include_test_suites and isinstance(report, TestSuite): 49 return False 50 return all(report.metadata.get(key) == value for key, value in self.metadata_values.items()) and all( 51 tag in report.tags for tag in self.tag_values 52 ) 53 54 55 class PanelValue(BaseModel): 56 field_path: Union[str, FieldPath] 57 metric_id: Optional[str] = None 58 metric_fingerprint: Optional[str] = None 59 metric_args: Dict[str, Union[EvidentlyBaseModel, Any]] = {} 60 legend: Optional[str] = None 61 62 def __init__( 63 self, 64 *, 65 field_path: Union[str, FieldPath], 66 metric_id: Optional[str] = None, 67 metric_fingerprint: Optional[str] = None, 68 metric_args: Dict[str, Union[EvidentlyBaseModel, Any]] = None, 69 legend: Optional[str] = None, 70 metric_hash: Optional[str] = None, 71 ): 72 # this __init__ is needed to support old-style metric_hash arg 73 if metric_hash is not None: 74 warnings.warn("metric_hash arg is deperecated, please use metric_fingerprint") 75 metric_fingerprint = metric_hash 76 super().__init__( 77 field_path=field_path, 78 metric_id=metric_id, 79 metric_fingerprint=metric_fingerprint, 80 metric_args=metric_args or {}, 81 legend=legend, 82 ) 83 84 @property 85 def field_path_str(self): 86 if isinstance(self.field_path, FieldPath): 87 return self.field_path.get_path() 88 return self.field_path 89 90 @validator("field_path") 91 def validate_field_path(cls, value): 92 if isinstance(value, FieldPath): 93 value = value.get_path() 94 return value 95 96 def metric_matched(self, metric: Metric) -> bool: 97 if self.metric_fingerprint is not None: 98 return metric.get_fingerprint() == self.metric_fingerprint 99 if self.metric_id is not None and self.metric_id != metric.get_id(): 100 return False 101 for field, value in self.metric_args.items(): 102 try: 103 if getattr_nested(metric, field.split(".")) != value: 104 return False 105 except AttributeError: 106 return False 107 return True 108 109 def get(self, report: ReportBase) -> Dict[Metric, Any]: 110 results = {} 111 metrics = [] 112 if isinstance(report, Report): 113 metrics = report._first_level_metrics 114 elif isinstance(report, TestSuite): 115 metrics = report._inner_suite.context.metrics 116 for metric in metrics: 117 if self.metric_matched(metric): 118 try: 119 results[metric] = getattr_nested(metric.get_result(), self.field_path_str.split(".")) 120 except AttributeError: 121 pass 122 return results 123 124 125 def assign_panel_id(f): 126 @wraps(f) 127 async def inner(self: "DashboardPanel", *args, **kwargs) -> BaseWidgetInfo: 128 r = await f(self, *args, **kwargs) 129 r.id = str(self.id) 130 return r 131 132 return inner 133 134 135 class DashboardPanel(EnumValueMixin, PolymorphicModel): 136 class Config: 137 type_alias = "evidently:dashboard_panel:DashboardPanel" 138 is_base_type = True 139 140 id: PanelID = Field(default_factory=new_id) 141 title: str 142 filter: ReportFilter 143 size: WidgetSize = WidgetSize.FULL 144 145 async def build( 146 self, 147 data_storage: "DataStorage", 148 project_id: ProjectID, 149 timestamp_start: Optional[datetime.datetime], 150 timestamp_end: Optional[datetime.datetime], 151 ) -> BaseWidgetInfo: 152 raise NotImplementedError 153 154 async def safe_build( 155 self, 156 data_storage: "DataStorage", 157 project_id: ProjectID, 158 timestamp_start: Optional[datetime.datetime], 159 timestamp_end: Optional[datetime.datetime], 160 ) -> BaseWidgetInfo: 161 try: 162 return await self.build(data_storage, project_id, timestamp_start, timestamp_end) 163 except Exception as e: 164 traceback.print_exc() 165 c = counter(counters=[CounterData(f"{e.__class__.__name__}: {e.args[0]}", "Error")]) 166 c.id = str(self.id) 167 return c 168 169 170 class DashboardTab(BaseModel): 171 id: TabID = Field(default_factory=new_id) 172 title: Optional[str] = "Untitled" 173 174 175 class DashboardConfig(BaseModel): 176 name: str 177 panels: List[DashboardPanel] 178 tabs: List[DashboardTab] = [] 179 tab_id_to_panel_ids: Dict[str, List[str]] = {} 180 181 def add_panel( 182 self, 183 panel: DashboardPanel, 184 *, 185 tab: Optional[Union[str, TabID, DashboardTab]] = None, 186 create_if_not_exists=True, 187 ): 188 self.panels.append(panel) 189 190 if tab is None: 191 return 192 193 result_tab = self._get_or_create_tab(tab, create_if_not_exists) 194 195 tab_id_str = str(result_tab.id) 196 panel_id_str = str(panel.id) 197 198 tab_panel_ids = self.tab_id_to_panel_ids.get(tab_id_str, []) 199 200 if panel_id_str not in tab_panel_ids: 201 tab_panel_ids.append(panel_id_str) 202 self.tab_id_to_panel_ids[tab_id_str] = tab_panel_ids 203 204 def create_tab(self, title) -> DashboardTab: 205 return self._get_or_create_tab(title) 206 207 def _raise_if_tab_title_exists(self, tab_title: Optional[str]): 208 if any(tab.title == tab_title for tab in self.tabs): 209 raise ValueError(f"""tab with title "{tab_title}" already exists""") 210 211 def _find_tab_by_id(self, tab_id: TabID) -> Optional[DashboardTab]: 212 tabs = [t for t in self.tabs if t.id == tab_id] 213 if len(tabs) == 0: 214 return None 215 return tabs[0] 216 217 def _find_tab_by_title(self, title: str) -> Optional[DashboardTab]: 218 tabs = [t for t in self.tabs if t.title == title] 219 if len(tabs) == 0: 220 return None 221 return tabs[0] 222 223 def _get_or_create_tab( 224 self, 225 tab_descriptor: Union[DashboardTab, TabID, str], 226 create_if_not_exists=True, 227 ) -> DashboardTab: 228 tab: Optional[DashboardTab] = None 229 to_create: Optional[DashboardTab] = None 230 if isinstance(tab_descriptor, DashboardTab): 231 tab = self._find_tab_by_id(tab_descriptor.id) 232 to_create = tab_descriptor 233 if isinstance(tab_descriptor, str): 234 try: 235 tab = self._find_tab_by_id(uuid6.UUID(tab_descriptor)) 236 except ValueError: 237 tab = self._find_tab_by_title(tab_descriptor) 238 to_create = DashboardTab(title=tab_descriptor) 239 if isinstance(tab_descriptor, TabID): 240 tab = self._find_tab_by_id(tab_descriptor) 241 242 if tab is not None: 243 return tab 244 245 if not create_if_not_exists or to_create is None: 246 raise ValueError(f"""tab "{tab_descriptor}" not found""") 247 248 self.tabs.append(to_create) 249 return to_create 250 251 async def build( 252 self, 253 data_storage: "DataStorage", 254 project_id: ProjectID, 255 timestamp_start: Optional[datetime.datetime], 256 timestamp_end: Optional[datetime.datetime], 257 ): 258 widgets = [await p.safe_build(data_storage, project_id, timestamp_start, timestamp_end) for p in self.panels] 259 260 return DashboardInfo(name=self.name, widgets=widgets) 261 262 263 register_type_alias( 264 DashboardPanel, "evidently.ui.backport.DashboardPanelV2", "evidently:dashboard_panel:DashboardPanelV2" 265 ) 266 register_type_alias( 267 DashboardPanel, 268 "evidently.ui.backport.SingleValueDashboardPanel", 269 "evidently:dashboard_panel:SingleValueDashboardPanel", 270 )