data_source.py
1 import datetime 2 from abc import ABC 3 from typing import TYPE_CHECKING 4 from typing import Any 5 from typing import ClassVar 6 from typing import List 7 from typing import Optional 8 from typing import Tuple 9 from typing import Type 10 from typing import cast 11 from uuid import UUID 12 13 import pandas as pd 14 from litestar import Response 15 from typing_extensions import TypeAlias 16 17 from evidently._pydantic_compat import BaseModel 18 from evidently.core.datasets import DataDefinition 19 from evidently.core.metric_types import AutoAliasMixin 20 from evidently.pydantic_utils import PolymorphicModel 21 from evidently.ui.service.datasets.filters import FilterBy 22 from evidently.ui.service.datasets.filters import filter_df 23 from evidently.ui.service.errors import DatasetNotFound 24 from evidently.ui.service.errors import EvidentlyServiceError 25 from evidently.ui.service.storage.local.dataset import DatasetFileStorage 26 from evidently.ui.service.type_aliases import DatasetID 27 from evidently.ui.service.type_aliases import ProjectID 28 from evidently.ui.service.type_aliases import UserID 29 30 if TYPE_CHECKING: 31 from evidently.ui.service.managers.datasets import DatasetManager 32 33 MaterializedDataset: TypeAlias = pd.DataFrame 34 35 36 class SortBy(BaseModel): 37 """Sorting configuration.""" 38 39 column: str 40 ascending: bool = True 41 42 43 class DatasetReadError(EvidentlyServiceError): 44 """Error reading dataset.""" 45 46 def to_response(self) -> Response: 47 return Response( 48 status_code=500, 49 content={"detail": "dataset read error"}, 50 ) 51 52 53 class DataSource(AutoAliasMixin, PolymorphicModel, ABC): 54 """Base class for data sources.""" 55 56 __alias_namespace__: ClassVar = "evidently" 57 __alias_type__: ClassVar = "data_source" 58 59 class Config: 60 is_base_type = True 61 alias_required = True 62 63 async def materialize(self, dataset_manager: "DatasetManager") -> MaterializedDataset: 64 """Materialize the data source into a DataFrame.""" 65 raise NotImplementedError(self.__class__.__name__) 66 67 async def materialize_with_data_definition( 68 self, dataset_manager: "DatasetManager", data_definition: DataDefinition 69 ) -> tuple[MaterializedDataset, DataDefinition]: 70 """Materialize with data definition applied.""" 71 df = await self.materialize(dataset_manager) 72 return self.apply_data_definition(df, data_definition), data_definition 73 74 def apply_data_definition(self, df: MaterializedDataset, data_definition: DataDefinition) -> MaterializedDataset: 75 """Apply data definition to dataframe.""" 76 return df 77 78 def get_original_dataset_id(self) -> Optional[DatasetID]: 79 """Get the original dataset ID if this is a dataset source.""" 80 raise NotImplementedError 81 82 83 class SortedFilteredDataSource(DataSource, ABC): 84 """Data source with filtering and sorting support.""" 85 86 filter_by: Optional[List[FilterBy]] = None 87 sort_by: Optional[SortBy] = None 88 89 def post_process(self, df: pd.DataFrame) -> pd.DataFrame: 90 """Apply filtering and sorting to the dataframe.""" 91 filtered_df = filter_df(df, self.filter_by) 92 if self.sort_by: 93 filtered_sorted_df = filtered_df.sort_values(by=self.sort_by.column, ascending=self.sort_by.ascending) 94 else: 95 filtered_sorted_df = filtered_df 96 return filtered_sorted_df 97 98 99 class FileDataSource(SortedFilteredDataSource): 100 """Data source that reads from a file.""" 101 102 project_id: ProjectID 103 filename: str 104 is_tmp: bool = False 105 106 def read(self, storage: DatasetFileStorage) -> pd.DataFrame: 107 """Read the file from storage.""" 108 try: 109 from evidently.ui.service.datasets.file_io import FileIO 110 111 df = FileIO(storage).read_file_from_storage(self.project_id, self.filename) 112 except FileNotFoundError: 113 raise DatasetReadError(f"No such file {self.filename}") 114 return df 115 116 async def materialize(self, dataset_manager: "DatasetManager") -> MaterializedDataset: 117 """Materialize the file data source.""" 118 df = self.read(dataset_manager.dataset_file_storage) 119 return self.post_process(df) 120 121 122 class DatasetDataSource(SortedFilteredDataSource): 123 """Data source that reads from another dataset.""" 124 125 user_id: UserID 126 dataset_id: DatasetID 127 128 async def materialize(self, dataset_manager: "DatasetManager") -> MaterializedDataset: 129 """Materialize the dataset data source.""" 130 dataset = await dataset_manager.get_dataset_metadata(self.user_id, self.dataset_id) 131 if not dataset: 132 raise DatasetNotFound() 133 df = await dataset.source.materialize(dataset_manager) 134 return self.post_process(df) 135 136 def get_original_dataset_id(self) -> Optional[DatasetID]: 137 """Get the original dataset ID.""" 138 return self.dataset_id 139 140 141 class DataSourceDTO(AutoAliasMixin, PolymorphicModel, ABC): 142 """DTO for data source serialization.""" 143 144 class Config: 145 is_base_type = True 146 147 __data_source_type__: ClassVar[Type[DataSource]] 148 __alias_type__: ClassVar = "data_source_dto" 149 150 def to_data_source(self, **kwargs) -> DataSource: 151 """Convert DTO to data source.""" 152 kwargs = {k: v for k, v in kwargs.items() if k in self.__data_source_type__.__fields__} 153 return self.__data_source_type__(**self.__dict__, **kwargs) 154 155 @staticmethod 156 def for_type( 157 data_source_type: Type[DataSource], __module__: str, exclude: Tuple[str, ...] 158 ) -> Type["DataSourceDTO"]: 159 """Create a DTO type for a data source type.""" 160 namespace = { 161 "__annotations__": {n: f.outer_type_ for n, f in data_source_type.__fields__.items() if n not in exclude}, 162 **{n: f.default for n, f in data_source_type.__fields__.items() if n not in exclude and not f.required}, 163 } 164 165 new_dto_type: Type[DataSourceDTO] = type(f"{data_source_type.__name__}DTO", (DataSourceDTO,), namespace) 166 new_dto_type.__data_source_type__ = data_source_type 167 new_dto_type.__module__ = __module__ 168 return new_dto_type 169 170 171 def convert_uuid_to_str_in_place(df: pd.DataFrame): 172 for col in df.columns: 173 if df[col].dtype == "object" and isinstance(df[col].iloc[0], UUID): 174 df[col] = df[col].astype(str) 175 176 177 class TracingDataSource(SortedFilteredDataSource): 178 """Data source that reads from tracing storage.""" 179 180 export_id: DatasetID 181 timestamp_from: Optional[datetime.datetime] = None 182 timestamp_to: Optional[datetime.datetime] = None 183 184 async def materialize(self, dataset_manager: "DatasetManager") -> MaterializedDataset: 185 """Materialize the tracing data source.""" 186 from evidently.ui.service.tracing.storage.base import TracingStorage 187 188 if dataset_manager.tracing_storage is None: 189 raise DatasetReadError("Tracing storage not available") 190 191 tracing_storage: TracingStorage = dataset_manager.tracing_storage 192 df = tracing_storage.read_with_filter( 193 self.export_id, 194 timestamp_from=self.timestamp_from, 195 timestamp_to=self.timestamp_to, 196 ) 197 convert_uuid_to_str_in_place(df) 198 return self.post_process(df) 199 200 def get_original_dataset_id(self) -> Optional[DatasetID]: 201 """Get the original dataset ID.""" 202 return self.export_id 203 204 205 class TracingSessionDataSource(TracingDataSource): 206 """Data source that reads from tracing storage and groups by session.""" 207 208 session_id_column: str 209 timestamp_column: str 210 question_column: str 211 response_column: str 212 213 async def materialize(self, dataset_manager: "DatasetManager") -> MaterializedDataset: 214 """Materialize the tracing session data source.""" 215 df = await super().materialize(dataset_manager) 216 217 df_sorted = df.sort_values(by=[self.session_id_column, self.timestamp_column]) 218 219 def format_conversation(group: pd.DataFrame) -> str: 220 return "\n\n".join( 221 [f"USER: {q}\n\nAGENT: {r}" for q, r in zip(group[self.question_column], group[self.response_column])] 222 ) 223 224 df_grouped = df_sorted.groupby(self.session_id_column).apply(cast(Any, format_conversation)).reset_index() 225 df_grouped.columns = pd.Index(["session_id", "conversation"]) 226 return df_grouped 227 228 229 DatasetDataSourceDTO = DataSourceDTO.for_type(DatasetDataSource, __name__, exclude=("project_id", "user_id")) 230 FileDataSourceDTO = DataSourceDTO.for_type(FileDataSource, __name__, exclude=("project_id", "user_id")) 231 TracingDataSourceDTO = DataSourceDTO.for_type(TracingDataSource, __name__, exclude=tuple()) 232 TracingSessionDataSourceDTO = DataSourceDTO.for_type(TracingSessionDataSource, __name__, exclude=tuple())