/ src / evidently / ui / service / datasets / data_source.py
data_source.py
  1  import datetime
  2  from abc import ABC
  3  from typing import TYPE_CHECKING
  4  from typing import Any
  5  from typing import ClassVar
  6  from typing import List
  7  from typing import Optional
  8  from typing import Tuple
  9  from typing import Type
 10  from typing import cast
 11  from uuid import UUID
 12  
 13  import pandas as pd
 14  from litestar import Response
 15  from typing_extensions import TypeAlias
 16  
 17  from evidently._pydantic_compat import BaseModel
 18  from evidently.core.datasets import DataDefinition
 19  from evidently.core.metric_types import AutoAliasMixin
 20  from evidently.pydantic_utils import PolymorphicModel
 21  from evidently.ui.service.datasets.filters import FilterBy
 22  from evidently.ui.service.datasets.filters import filter_df
 23  from evidently.ui.service.errors import DatasetNotFound
 24  from evidently.ui.service.errors import EvidentlyServiceError
 25  from evidently.ui.service.storage.local.dataset import DatasetFileStorage
 26  from evidently.ui.service.type_aliases import DatasetID
 27  from evidently.ui.service.type_aliases import ProjectID
 28  from evidently.ui.service.type_aliases import UserID
 29  
 30  if TYPE_CHECKING:
 31      from evidently.ui.service.managers.datasets import DatasetManager
 32  
 33  MaterializedDataset: TypeAlias = pd.DataFrame
 34  
 35  
 36  class SortBy(BaseModel):
 37      """Sorting configuration."""
 38  
 39      column: str
 40      ascending: bool = True
 41  
 42  
 43  class DatasetReadError(EvidentlyServiceError):
 44      """Error reading dataset."""
 45  
 46      def to_response(self) -> Response:
 47          return Response(
 48              status_code=500,
 49              content={"detail": "dataset read error"},
 50          )
 51  
 52  
 53  class DataSource(AutoAliasMixin, PolymorphicModel, ABC):
 54      """Base class for data sources."""
 55  
 56      __alias_namespace__: ClassVar = "evidently"
 57      __alias_type__: ClassVar = "data_source"
 58  
 59      class Config:
 60          is_base_type = True
 61          alias_required = True
 62  
 63      async def materialize(self, dataset_manager: "DatasetManager") -> MaterializedDataset:
 64          """Materialize the data source into a DataFrame."""
 65          raise NotImplementedError(self.__class__.__name__)
 66  
 67      async def materialize_with_data_definition(
 68          self, dataset_manager: "DatasetManager", data_definition: DataDefinition
 69      ) -> tuple[MaterializedDataset, DataDefinition]:
 70          """Materialize with data definition applied."""
 71          df = await self.materialize(dataset_manager)
 72          return self.apply_data_definition(df, data_definition), data_definition
 73  
 74      def apply_data_definition(self, df: MaterializedDataset, data_definition: DataDefinition) -> MaterializedDataset:
 75          """Apply data definition to dataframe."""
 76          return df
 77  
 78      def get_original_dataset_id(self) -> Optional[DatasetID]:
 79          """Get the original dataset ID if this is a dataset source."""
 80          raise NotImplementedError
 81  
 82  
 83  class SortedFilteredDataSource(DataSource, ABC):
 84      """Data source with filtering and sorting support."""
 85  
 86      filter_by: Optional[List[FilterBy]] = None
 87      sort_by: Optional[SortBy] = None
 88  
 89      def post_process(self, df: pd.DataFrame) -> pd.DataFrame:
 90          """Apply filtering and sorting to the dataframe."""
 91          filtered_df = filter_df(df, self.filter_by)
 92          if self.sort_by:
 93              filtered_sorted_df = filtered_df.sort_values(by=self.sort_by.column, ascending=self.sort_by.ascending)
 94          else:
 95              filtered_sorted_df = filtered_df
 96          return filtered_sorted_df
 97  
 98  
 99  class FileDataSource(SortedFilteredDataSource):
100      """Data source that reads from a file."""
101  
102      project_id: ProjectID
103      filename: str
104      is_tmp: bool = False
105  
106      def read(self, storage: DatasetFileStorage) -> pd.DataFrame:
107          """Read the file from storage."""
108          try:
109              from evidently.ui.service.datasets.file_io import FileIO
110  
111              df = FileIO(storage).read_file_from_storage(self.project_id, self.filename)
112          except FileNotFoundError:
113              raise DatasetReadError(f"No such file {self.filename}")
114          return df
115  
116      async def materialize(self, dataset_manager: "DatasetManager") -> MaterializedDataset:
117          """Materialize the file data source."""
118          df = self.read(dataset_manager.dataset_file_storage)
119          return self.post_process(df)
120  
121  
122  class DatasetDataSource(SortedFilteredDataSource):
123      """Data source that reads from another dataset."""
124  
125      user_id: UserID
126      dataset_id: DatasetID
127  
128      async def materialize(self, dataset_manager: "DatasetManager") -> MaterializedDataset:
129          """Materialize the dataset data source."""
130          dataset = await dataset_manager.get_dataset_metadata(self.user_id, self.dataset_id)
131          if not dataset:
132              raise DatasetNotFound()
133          df = await dataset.source.materialize(dataset_manager)
134          return self.post_process(df)
135  
136      def get_original_dataset_id(self) -> Optional[DatasetID]:
137          """Get the original dataset ID."""
138          return self.dataset_id
139  
140  
141  class DataSourceDTO(AutoAliasMixin, PolymorphicModel, ABC):
142      """DTO for data source serialization."""
143  
144      class Config:
145          is_base_type = True
146  
147      __data_source_type__: ClassVar[Type[DataSource]]
148      __alias_type__: ClassVar = "data_source_dto"
149  
150      def to_data_source(self, **kwargs) -> DataSource:
151          """Convert DTO to data source."""
152          kwargs = {k: v for k, v in kwargs.items() if k in self.__data_source_type__.__fields__}
153          return self.__data_source_type__(**self.__dict__, **kwargs)
154  
155      @staticmethod
156      def for_type(
157          data_source_type: Type[DataSource], __module__: str, exclude: Tuple[str, ...]
158      ) -> Type["DataSourceDTO"]:
159          """Create a DTO type for a data source type."""
160          namespace = {
161              "__annotations__": {n: f.outer_type_ for n, f in data_source_type.__fields__.items() if n not in exclude},
162              **{n: f.default for n, f in data_source_type.__fields__.items() if n not in exclude and not f.required},
163          }
164  
165          new_dto_type: Type[DataSourceDTO] = type(f"{data_source_type.__name__}DTO", (DataSourceDTO,), namespace)
166          new_dto_type.__data_source_type__ = data_source_type
167          new_dto_type.__module__ = __module__
168          return new_dto_type
169  
170  
171  def convert_uuid_to_str_in_place(df: pd.DataFrame):
172      for col in df.columns:
173          if df[col].dtype == "object" and isinstance(df[col].iloc[0], UUID):
174              df[col] = df[col].astype(str)
175  
176  
177  class TracingDataSource(SortedFilteredDataSource):
178      """Data source that reads from tracing storage."""
179  
180      export_id: DatasetID
181      timestamp_from: Optional[datetime.datetime] = None
182      timestamp_to: Optional[datetime.datetime] = None
183  
184      async def materialize(self, dataset_manager: "DatasetManager") -> MaterializedDataset:
185          """Materialize the tracing data source."""
186          from evidently.ui.service.tracing.storage.base import TracingStorage
187  
188          if dataset_manager.tracing_storage is None:
189              raise DatasetReadError("Tracing storage not available")
190  
191          tracing_storage: TracingStorage = dataset_manager.tracing_storage
192          df = tracing_storage.read_with_filter(
193              self.export_id,
194              timestamp_from=self.timestamp_from,
195              timestamp_to=self.timestamp_to,
196          )
197          convert_uuid_to_str_in_place(df)
198          return self.post_process(df)
199  
200      def get_original_dataset_id(self) -> Optional[DatasetID]:
201          """Get the original dataset ID."""
202          return self.export_id
203  
204  
205  class TracingSessionDataSource(TracingDataSource):
206      """Data source that reads from tracing storage and groups by session."""
207  
208      session_id_column: str
209      timestamp_column: str
210      question_column: str
211      response_column: str
212  
213      async def materialize(self, dataset_manager: "DatasetManager") -> MaterializedDataset:
214          """Materialize the tracing session data source."""
215          df = await super().materialize(dataset_manager)
216  
217          df_sorted = df.sort_values(by=[self.session_id_column, self.timestamp_column])
218  
219          def format_conversation(group: pd.DataFrame) -> str:
220              return "\n\n".join(
221                  [f"USER: {q}\n\nAGENT: {r}" for q, r in zip(group[self.question_column], group[self.response_column])]
222              )
223  
224          df_grouped = df_sorted.groupby(self.session_id_column).apply(cast(Any, format_conversation)).reset_index()
225          df_grouped.columns = pd.Index(["session_id", "conversation"])
226          return df_grouped
227  
228  
229  DatasetDataSourceDTO = DataSourceDTO.for_type(DatasetDataSource, __name__, exclude=("project_id", "user_id"))
230  FileDataSourceDTO = DataSourceDTO.for_type(FileDataSource, __name__, exclude=("project_id", "user_id"))
231  TracingDataSourceDTO = DataSourceDTO.for_type(TracingDataSource, __name__, exclude=tuple())
232  TracingSessionDataSourceDTO = DataSourceDTO.for_type(TracingSessionDataSource, __name__, exclude=tuple())