file_io.py
1 import os 2 from io import BytesIO 3 from typing import Callable 4 from typing import Container 5 from typing import List 6 from typing import Optional 7 from typing import Tuple 8 9 import pandas as pd 10 from litestar.datastructures import UploadFile 11 from litestar.exceptions import HTTPException 12 from pandas.errors import ParserError 13 14 from evidently.core.datasets import DataDefinition 15 from evidently.core.datasets import Dataset 16 from evidently.legacy.ui.type_aliases import UserID 17 from evidently.ui.service.storage.local.dataset import DatasetFileStorage 18 from evidently.ui.service.type_aliases import DatasetID 19 from evidently.ui.service.type_aliases import ProjectID 20 21 FileID = str 22 23 24 class FileData: 25 """Data structure for file upload result.""" 26 27 def __init__( 28 self, 29 filename: str, 30 columns: List[str], 31 data_definition: DataDefinition, 32 size_bytes: int, 33 row_count: int, 34 column_count: int, 35 ): 36 self.filename = filename 37 self.columns = columns 38 self.data_definition = data_definition 39 self.size_bytes = size_bytes 40 self.row_count = row_count 41 self.column_count = column_count 42 43 44 def calculate_data_definition(current_data: pd.DataFrame) -> DataDefinition: 45 """Calculate data definition from a dataframe.""" 46 dataset = Dataset.from_pandas(current_data) 47 return dataset.data_definition 48 49 50 def read_parquet(path) -> pd.DataFrame: 51 """Read parquet file.""" 52 df = pd.read_parquet(path, engine="pyarrow") 53 return df 54 55 56 class FileIO: 57 """Utility for reading and writing dataset files.""" 58 59 def __init__(self, file_storage: DatasetFileStorage): 60 self.file_storage = file_storage 61 62 ALLOWED_FILE_READERS = { 63 ".csv": pd.read_csv, 64 ".parquet": read_parquet, 65 } 66 67 def save_file( 68 self, 69 user_id: UserID, 70 project_id: ProjectID, 71 dataset_id: DatasetID, 72 upload_file: UploadFile, 73 allowed_extensions: Optional[Container[str]] = None, 74 ) -> Tuple[FileID, str, bytes]: 75 """Save an uploaded file and return file ID, extension, and content.""" 76 _, file_extension = os.path.splitext(upload_file.filename) 77 if allowed_extensions is not None and file_extension not in allowed_extensions: 78 raise HTTPException(status_code=400, detail="Extension not allowed") 79 file_content: bytes = upload_file.file.read() 80 return ( 81 self.file_storage.put_dataset(user_id, project_id, dataset_id, upload_file.filename, file_content), 82 file_extension, 83 file_content, 84 ) 85 86 def save_dataframe( 87 self, user_id: UserID, project_id: ProjectID, dataset_id: DatasetID, upload_file: UploadFile 88 ) -> Tuple[str, pd.DataFrame, int]: 89 """Save uploaded file as dataframe.""" 90 file_id, file_extension, file_content = self.save_file( 91 user_id, project_id, dataset_id, upload_file, allowed_extensions=self.ALLOWED_FILE_READERS.keys() 92 ) 93 try: 94 reader: Callable[[BytesIO], pd.DataFrame] = self.ALLOWED_FILE_READERS[file_extension] # type: ignore[assignment] 95 current_data = reader(BytesIO(file_content)) 96 except ParserError as e: 97 raise HTTPException(status_code=400, detail=f"Wrong file content: {str(e)}") 98 return file_id, current_data, int(len(file_content)) 99 100 def save_dataframe_and_calculate_data_definition( 101 self, 102 user_id: UserID, 103 project_id: ProjectID, 104 dataset_id: DatasetID, 105 file: UploadFile, 106 data_definition: Optional[DataDefinition] = None, 107 ) -> FileData: 108 """Save dataframe and calculate data definition.""" 109 file_id, current_data, size_bytes = self.save_dataframe(user_id, project_id, dataset_id, file) 110 result_dd = data_definition 111 if data_definition is None: 112 try: 113 result_dd = calculate_data_definition(current_data) 114 except Exception as e: 115 self.file_storage.remove_dataset(file_id) 116 raise e 117 118 if result_dd is None: 119 raise ValueError("Data definition is required") 120 121 row_count, column_count = current_data.shape 122 123 return FileData( 124 filename=file_id, 125 data_definition=result_dd, 126 columns=list(current_data.columns), 127 size_bytes=size_bytes, 128 row_count=row_count, 129 column_count=column_count, 130 ) 131 132 def read_file_from_storage( 133 self, 134 project_id: ProjectID, 135 file_id: str, 136 ) -> pd.DataFrame: 137 """Read a file from storage.""" 138 _, file_extension = os.path.splitext(file_id) 139 if file_extension not in self.ALLOWED_FILE_READERS.keys(): 140 raise HTTPException(status_code=400, detail="Extension not allowed") 141 file_content = self.file_storage.get_dataset(file_id) 142 reader: Callable[[BytesIO], pd.DataFrame] = self.ALLOWED_FILE_READERS[file_extension] # type: ignore[assignment] 143 df = reader(BytesIO(file_content)) 144 return df 145 146 147 def get_upload_file(df: pd.DataFrame, name: str) -> UploadFile: 148 """Create an UploadFile from a dataframe.""" 149 from io import BytesIO 150 151 buf = BytesIO() 152 df.to_parquet(buf) 153 buf.seek(0) 154 return UploadFile( 155 content_type="application/octet-stream", 156 filename=f"{name}.parquet", 157 file_data=buf.getvalue(), 158 )