/ src / evidently / ui / service / datasets / file_io.py
file_io.py
  1  import os
  2  from io import BytesIO
  3  from typing import Callable
  4  from typing import Container
  5  from typing import List
  6  from typing import Optional
  7  from typing import Tuple
  8  
  9  import pandas as pd
 10  from litestar.datastructures import UploadFile
 11  from litestar.exceptions import HTTPException
 12  from pandas.errors import ParserError
 13  
 14  from evidently.core.datasets import DataDefinition
 15  from evidently.core.datasets import Dataset
 16  from evidently.legacy.ui.type_aliases import UserID
 17  from evidently.ui.service.storage.local.dataset import DatasetFileStorage
 18  from evidently.ui.service.type_aliases import DatasetID
 19  from evidently.ui.service.type_aliases import ProjectID
 20  
 21  FileID = str
 22  
 23  
 24  class FileData:
 25      """Data structure for file upload result."""
 26  
 27      def __init__(
 28          self,
 29          filename: str,
 30          columns: List[str],
 31          data_definition: DataDefinition,
 32          size_bytes: int,
 33          row_count: int,
 34          column_count: int,
 35      ):
 36          self.filename = filename
 37          self.columns = columns
 38          self.data_definition = data_definition
 39          self.size_bytes = size_bytes
 40          self.row_count = row_count
 41          self.column_count = column_count
 42  
 43  
 44  def calculate_data_definition(current_data: pd.DataFrame) -> DataDefinition:
 45      """Calculate data definition from a dataframe."""
 46      dataset = Dataset.from_pandas(current_data)
 47      return dataset.data_definition
 48  
 49  
 50  def read_parquet(path) -> pd.DataFrame:
 51      """Read parquet file."""
 52      df = pd.read_parquet(path, engine="pyarrow")
 53      return df
 54  
 55  
 56  class FileIO:
 57      """Utility for reading and writing dataset files."""
 58  
 59      def __init__(self, file_storage: DatasetFileStorage):
 60          self.file_storage = file_storage
 61  
 62      ALLOWED_FILE_READERS = {
 63          ".csv": pd.read_csv,
 64          ".parquet": read_parquet,
 65      }
 66  
 67      def save_file(
 68          self,
 69          user_id: UserID,
 70          project_id: ProjectID,
 71          dataset_id: DatasetID,
 72          upload_file: UploadFile,
 73          allowed_extensions: Optional[Container[str]] = None,
 74      ) -> Tuple[FileID, str, bytes]:
 75          """Save an uploaded file and return file ID, extension, and content."""
 76          _, file_extension = os.path.splitext(upload_file.filename)
 77          if allowed_extensions is not None and file_extension not in allowed_extensions:
 78              raise HTTPException(status_code=400, detail="Extension not allowed")
 79          file_content: bytes = upload_file.file.read()
 80          return (
 81              self.file_storage.put_dataset(user_id, project_id, dataset_id, upload_file.filename, file_content),
 82              file_extension,
 83              file_content,
 84          )
 85  
 86      def save_dataframe(
 87          self, user_id: UserID, project_id: ProjectID, dataset_id: DatasetID, upload_file: UploadFile
 88      ) -> Tuple[str, pd.DataFrame, int]:
 89          """Save uploaded file as dataframe."""
 90          file_id, file_extension, file_content = self.save_file(
 91              user_id, project_id, dataset_id, upload_file, allowed_extensions=self.ALLOWED_FILE_READERS.keys()
 92          )
 93          try:
 94              reader: Callable[[BytesIO], pd.DataFrame] = self.ALLOWED_FILE_READERS[file_extension]  # type: ignore[assignment]
 95              current_data = reader(BytesIO(file_content))
 96          except ParserError as e:
 97              raise HTTPException(status_code=400, detail=f"Wrong file content: {str(e)}")
 98          return file_id, current_data, int(len(file_content))
 99  
100      def save_dataframe_and_calculate_data_definition(
101          self,
102          user_id: UserID,
103          project_id: ProjectID,
104          dataset_id: DatasetID,
105          file: UploadFile,
106          data_definition: Optional[DataDefinition] = None,
107      ) -> FileData:
108          """Save dataframe and calculate data definition."""
109          file_id, current_data, size_bytes = self.save_dataframe(user_id, project_id, dataset_id, file)
110          result_dd = data_definition
111          if data_definition is None:
112              try:
113                  result_dd = calculate_data_definition(current_data)
114              except Exception as e:
115                  self.file_storage.remove_dataset(file_id)
116                  raise e
117  
118          if result_dd is None:
119              raise ValueError("Data definition is required")
120  
121          row_count, column_count = current_data.shape
122  
123          return FileData(
124              filename=file_id,
125              data_definition=result_dd,
126              columns=list(current_data.columns),
127              size_bytes=size_bytes,
128              row_count=row_count,
129              column_count=column_count,
130          )
131  
132      def read_file_from_storage(
133          self,
134          project_id: ProjectID,
135          file_id: str,
136      ) -> pd.DataFrame:
137          """Read a file from storage."""
138          _, file_extension = os.path.splitext(file_id)
139          if file_extension not in self.ALLOWED_FILE_READERS.keys():
140              raise HTTPException(status_code=400, detail="Extension not allowed")
141          file_content = self.file_storage.get_dataset(file_id)
142          reader: Callable[[BytesIO], pd.DataFrame] = self.ALLOWED_FILE_READERS[file_extension]  # type: ignore[assignment]
143          df = reader(BytesIO(file_content))
144          return df
145  
146  
147  def get_upload_file(df: pd.DataFrame, name: str) -> UploadFile:
148      """Create an UploadFile from a dataframe."""
149      from io import BytesIO
150  
151      buf = BytesIO()
152      df.to_parquet(buf)
153      buf.seek(0)
154      return UploadFile(
155          content_type="application/octet-stream",
156          filename=f"{name}.parquet",
157          file_data=buf.getvalue(),
158      )