/ ragaai_catalyst / dataset.py
dataset.py
1 import os 2 import csv 3 import json 4 import tempfile 5 import requests 6 from .utils import response_checker 7 from typing import Union 8 import logging 9 from .ragaai_catalyst import RagaAICatalyst 10 import pandas as pd 11 logger = logging.getLogger(__name__) 12 get_token = RagaAICatalyst.get_token 13 14 # Job status constants 15 JOB_STATUS_FAILED = "failed" 16 JOB_STATUS_IN_PROGRESS = "in_progress" 17 JOB_STATUS_COMPLETED = "success" 18 19 class Dataset: 20 BASE_URL = None 21 TIMEOUT = 30 22 23 def __init__(self, project_name): 24 self.project_name = project_name 25 self.num_projects = 99999 26 Dataset.BASE_URL = RagaAICatalyst.BASE_URL 27 self.jobId = None 28 headers = { 29 "Authorization": f'Bearer {os.getenv("RAGAAI_CATALYST_TOKEN")}', 30 } 31 try: 32 response = requests.get( 33 f"{Dataset.BASE_URL}/v2/llm/projects?size={self.num_projects}", 34 headers=headers, 35 timeout=self.TIMEOUT, 36 ) 37 response.raise_for_status() 38 logger.debug("Projects list retrieved successfully") 39 40 project_list = [ 41 project["name"] for project in response.json()["data"]["content"] 42 ] 43 44 if project_name not in project_list: 45 raise ValueError("Project not found. Please enter a valid project name") 46 47 self.project_id = [ 48 project["id"] for project in response.json()["data"]["content"] if project["name"] == project_name 49 ][0] 50 51 except requests.exceptions.RequestException as e: 52 logger.error(f"Failed to retrieve projects list: {e}") 53 raise 54 55 def list_datasets(self): 56 """ 57 Retrieves a list of datasets for a given project. 58 59 Returns: 60 list: A list of dataset names. 61 62 Raises: 63 None. 64 """ 65 66 def make_request(): 67 headers = { 68 'Content-Type': 'application/json', 69 "Authorization": f"Bearer {os.getenv('RAGAAI_CATALYST_TOKEN')}", 70 "X-Project-Id": str(self.project_id), 71 } 72 json_data = {"size": 99999, "page": "0", "projectId": str(self.project_id), "search": ""} 73 try: 74 response = requests.post( 75 f"{Dataset.BASE_URL}/v2/llm/dataset", 76 headers=headers, 77 json=json_data, 78 timeout=Dataset.TIMEOUT, 79 ) 80 response.raise_for_status() 81 return response 82 except requests.exceptions.RequestException as e: 83 logger.error(f"Failed to list datasets: {e}") 84 raise 85 86 try: 87 response = make_request() 88 response_checker(response, "Dataset.list_datasets") 89 if response.status_code == 401: 90 get_token() # Fetch a new token and set it in the environment 91 response = make_request() # Retry the request 92 if response.status_code != 200: 93 return { 94 "status_code": response.status_code, 95 "message": response.json(), 96 } 97 datasets = response.json()["data"]["content"] 98 dataset_list = [dataset["name"] for dataset in datasets] 99 return dataset_list 100 except Exception as e: 101 logger.error(f"Error in list_datasets: {e}") 102 raise 103 104 def get_schema_mapping(self): 105 headers = { 106 "Authorization": f"Bearer {os.getenv('RAGAAI_CATALYST_TOKEN')}", 107 "X-Project-Name": self.project_name, 108 } 109 try: 110 response = requests.get( 111 f"{Dataset.BASE_URL}/v1/llm/schema-elements", 112 headers=headers, 113 timeout=Dataset.TIMEOUT, 114 ) 115 response.raise_for_status() 116 response_data = response.json()["data"]["schemaElements"] 117 if not response.json()['success']: 118 raise ValueError('Unable to fetch Schema Elements for the CSV') 119 return response_data 120 except requests.exceptions.RequestException as e: 121 logger.error(f"Failed to get CSV schema: {e}") 122 raise 123 124 ###################### CSV Upload APIs ################### 125 126 def get_dataset_columns(self, dataset_name): 127 list_dataset = self.list_datasets() 128 if dataset_name not in list_dataset: 129 raise ValueError(f"Dataset {dataset_name} does not exists. Please enter a valid dataset name") 130 131 headers = { 132 "Authorization": f"Bearer {os.getenv('RAGAAI_CATALYST_TOKEN')}", 133 "X-Project-Name": self.project_name, 134 } 135 headers = { 136 'Content-Type': 'application/json', 137 "Authorization": f"Bearer {os.getenv('RAGAAI_CATALYST_TOKEN')}", 138 "X-Project-Id": str(self.project_id), 139 } 140 json_data = {"size": 12, "page": "0", "projectId": str(self.project_id), "search": ""} 141 try: 142 response = requests.post( 143 f"{Dataset.BASE_URL}/v2/llm/dataset", 144 headers=headers, 145 json=json_data, 146 timeout=Dataset.TIMEOUT, 147 ) 148 response.raise_for_status() 149 datasets = response.json()["data"]["content"] 150 dataset_id = [dataset["id"] for dataset in datasets if dataset["name"]==dataset_name][0] 151 except requests.exceptions.RequestException as e: 152 logger.error(f"Failed to list datasets: {e}") 153 raise 154 155 try: 156 response = requests.get( 157 f"{Dataset.BASE_URL}/v2/llm/dataset/{dataset_id}?initialCols=0", 158 headers=headers, 159 timeout=Dataset.TIMEOUT, 160 ) 161 response.raise_for_status() 162 dataset_columns = response.json()["data"]["datasetColumnsResponses"] 163 dataset_columns = [item["displayName"] for item in dataset_columns] 164 dataset_columns = [data for data in dataset_columns if not data.startswith('_')] 165 if not response.json()['success']: 166 raise ValueError('Unable to fetch details of for the CSV') 167 return dataset_columns 168 except requests.exceptions.RequestException as e: 169 logger.error(f"Failed to get CSV columns: {e}") 170 raise 171 172 def create_from_csv(self, csv_path, dataset_name, schema_mapping): 173 list_dataset = self.list_datasets() 174 if dataset_name in list_dataset: 175 raise ValueError(f"Dataset name {dataset_name} already exists. Please enter a unique dataset name") 176 177 #### get presigned URL 178 def get_presignedUrl(): 179 headers = { 180 "Authorization": f"Bearer {os.getenv('RAGAAI_CATALYST_TOKEN')}", 181 "X-Project-Id": str(self.project_id), 182 } 183 try: 184 response = requests.get( 185 f"{Dataset.BASE_URL}/v2/llm/dataset/csv/presigned-url", 186 headers=headers, 187 timeout=Dataset.TIMEOUT, 188 ) 189 response.raise_for_status() 190 return response.json() 191 except requests.exceptions.RequestException as e: 192 logger.error(f"Failed to get presigned URL: {e}") 193 raise 194 195 try: 196 presignedUrl = get_presignedUrl() 197 if presignedUrl['success']: 198 url = presignedUrl['data']['presignedUrl'] 199 filename = presignedUrl['data']['fileName'] 200 else: 201 raise ValueError('Unable to fetch presignedUrl') 202 except Exception as e: 203 logger.error(f"Error in get_presignedUrl: {e}") 204 raise 205 206 #### put csv to presigned URL 207 def put_csv_to_presignedUrl(url): 208 headers = { 209 'Content-Type': 'text/csv', 210 'x-ms-blob-type': 'BlockBlob', 211 } 212 try: 213 with open(csv_path, 'rb') as file: 214 response = requests.put( 215 url, 216 headers=headers, 217 data=file, 218 timeout=Dataset.TIMEOUT, 219 ) 220 response.raise_for_status() 221 return response 222 except requests.exceptions.RequestException as e: 223 logger.error(f"Failed to put CSV to presigned URL: {e}") 224 raise 225 226 try: 227 228 put_csv_response = put_csv_to_presignedUrl(url) 229 if put_csv_response.status_code not in (200, 201): 230 raise ValueError('Unable to put csv to the presignedUrl') 231 except Exception as e: 232 logger.error(f"Error in put_csv_to_presignedUrl: {e}") 233 raise 234 235 ## Upload csv to elastic 236 def upload_csv_to_elastic(data): 237 header = { 238 'Content-Type': 'application/json', 239 'Authorization': f"Bearer {os.getenv('RAGAAI_CATALYST_TOKEN')}", 240 "X-Project-Id": str(self.project_id) 241 } 242 try: 243 response = requests.post( 244 f"{Dataset.BASE_URL}/v2/llm/dataset/csv", 245 headers=header, 246 json=data, 247 timeout=Dataset.TIMEOUT, 248 ) 249 if response.status_code==400: 250 raise ValueError(response.json()["message"]) 251 response.raise_for_status() 252 return response.json() 253 except requests.exceptions.RequestException as e: 254 logger.error(f"Failed to upload CSV to elastic: {e}") 255 raise 256 257 def generate_schema(mapping): 258 result = {} 259 for column, schema_element in mapping.items(): 260 result[column] = {"columnType": schema_element} 261 return result 262 263 try: 264 schema_mapping = generate_schema(schema_mapping) 265 data = { 266 "projectId": str(self.project_id), 267 "datasetName": dataset_name, 268 "fileName": filename, 269 "schemaMapping": schema_mapping, 270 "opType": "insert", 271 "description": "" 272 } 273 upload_csv_response = upload_csv_to_elastic(data) 274 if not upload_csv_response['success']: 275 raise ValueError('Unable to upload csv') 276 else: 277 print(upload_csv_response['message']) 278 self.jobId = upload_csv_response['data']['jobId'] 279 except Exception as e: 280 logger.error(f"Error in create_from_csv: {e}") 281 raise 282 283 def add_rows(self, csv_path, dataset_name): 284 """ 285 Add rows to an existing dataset from a CSV file. 286 287 Args: 288 csv_path (str): Path to the CSV file to be added 289 dataset_name (str): Name of the existing dataset to add rows to 290 291 Raises: 292 ValueError: If dataset does not exist or columns are incompatible 293 """ 294 # Get existing dataset columns 295 existing_columns = self.get_dataset_columns(dataset_name) 296 297 # Read the CSV file to check columns 298 try: 299 import pandas as pd 300 df = pd.read_csv(csv_path) 301 csv_columns = df.columns.tolist() 302 except Exception as e: 303 logger.error(f"Failed to read CSV file: {e}") 304 raise ValueError(f"Unable to read CSV file: {e}") 305 306 # Check column compatibility 307 for column in existing_columns: 308 if column not in csv_columns: 309 df[column] = None 310 311 # Get presigned URL for the CSV 312 def get_presignedUrl(): 313 headers = { 314 "Authorization": f"Bearer {os.getenv('RAGAAI_CATALYST_TOKEN')}", 315 "X-Project-Id": str(self.project_id), 316 } 317 try: 318 response = requests.get( 319 f"{Dataset.BASE_URL}/v2/llm/dataset/csv/presigned-url", 320 headers=headers, 321 timeout=Dataset.TIMEOUT, 322 ) 323 response.raise_for_status() 324 return response.json() 325 except requests.exceptions.RequestException as e: 326 logger.error(f"Failed to get presigned URL: {e}") 327 raise 328 329 try: 330 presignedUrl = get_presignedUrl() 331 if presignedUrl['success']: 332 url = presignedUrl['data']['presignedUrl'] 333 filename = presignedUrl['data']['fileName'] 334 else: 335 raise ValueError('Unable to fetch presignedUrl') 336 except Exception as e: 337 logger.error(f"Error in get_presignedUrl: {e}") 338 raise 339 340 # Upload CSV to presigned URL 341 def put_csv_to_presignedUrl(url): 342 headers = { 343 'Content-Type': 'text/csv', 344 'x-ms-blob-type': 'BlockBlob', 345 } 346 try: 347 with open(csv_path, 'rb') as file: 348 response = requests.put( 349 url, 350 headers=headers, 351 data=file, 352 timeout=Dataset.TIMEOUT, 353 ) 354 response.raise_for_status() 355 return response 356 except requests.exceptions.RequestException as e: 357 logger.error(f"Failed to put CSV to presigned URL: {e}") 358 raise 359 360 try: 361 put_csv_response = put_csv_to_presignedUrl(url) 362 if put_csv_response.status_code not in (200, 201): 363 raise ValueError('Unable to put csv to the presignedUrl') 364 except Exception as e: 365 logger.error(f"Error in put_csv_to_presignedUrl: {e}") 366 raise 367 368 # Prepare schema mapping (assuming same mapping as original dataset) 369 def generate_schema_mapping(dataset_name): 370 headers = { 371 'Content-Type': 'application/json', 372 "Authorization": f"Bearer {os.getenv('RAGAAI_CATALYST_TOKEN')}", 373 "X-Project-Id": str(self.project_id), 374 } 375 json_data = { 376 "size": 12, 377 "page": "0", 378 "projectId": str(self.project_id), 379 "search": "" 380 } 381 try: 382 # First get dataset details 383 response = requests.post( 384 f"{Dataset.BASE_URL}/v2/llm/dataset", 385 headers=headers, 386 json=json_data, 387 timeout=Dataset.TIMEOUT, 388 ) 389 response.raise_for_status() 390 datasets = response.json()["data"]["content"] 391 dataset_id = [dataset["id"] for dataset in datasets if dataset["name"]==dataset_name][0] 392 393 # Get dataset details to extract schema mapping 394 response = requests.get( 395 f"{Dataset.BASE_URL}/v2/llm/dataset/{dataset_id}?initialCols=0", 396 headers=headers, 397 timeout=Dataset.TIMEOUT, 398 ) 399 response.raise_for_status() 400 401 # Extract schema mapping 402 schema_mapping = {} 403 for col in response.json()["data"]["datasetColumnsResponses"]: 404 schema_mapping[col["displayName"]] = {"columnType": col["columnType"]} 405 406 return schema_mapping 407 except requests.exceptions.RequestException as e: 408 logger.error(f"Failed to get schema mapping: {e}") 409 raise 410 411 # Upload CSV to elastic 412 try: 413 schema_mapping = generate_schema_mapping(dataset_name) 414 415 data = { 416 "projectId": str(self.project_id), 417 "datasetName": dataset_name, 418 "fileName": filename, 419 "schemaMapping": schema_mapping, 420 "opType": "update", # Use update for adding rows 421 "description": "Adding new rows to dataset" 422 } 423 424 headers = { 425 'Content-Type': 'application/json', 426 'Authorization': f"Bearer {os.getenv('RAGAAI_CATALYST_TOKEN')}", 427 "X-Project-Id": str(self.project_id) 428 } 429 430 response = requests.post( 431 f"{Dataset.BASE_URL}/v2/llm/dataset/csv", 432 headers=headers, 433 json=data, 434 timeout=Dataset.TIMEOUT, 435 ) 436 437 if response.status_code == 400: 438 raise ValueError(response.json().get("message", "Failed to add rows")) 439 440 response.raise_for_status() 441 442 # Check response 443 response_data = response.json() 444 if response_data.get('success', False): 445 print(f"{response_data['message']}") 446 self.jobId = response_data['data']['jobId'] 447 else: 448 raise ValueError(response_data.get('message', 'Failed to add rows')) 449 450 except Exception as e: 451 logger.error(f"Error in add_rows_to_dataset: {e}") 452 raise 453 454 def add_columns(self, text_fields, dataset_name, column_name, provider, model, variables={}): 455 """ 456 Add a column to a dataset with dynamically fetched model parameters 457 458 Args: 459 project_id (int): Project ID 460 dataset_id (int): Dataset ID 461 column_name (str): Name of the new column 462 provider (str): Name of the model provider 463 model (str): Name of the model 464 """ 465 # First, get model parameters 466 467 # Validate text_fields input 468 if not isinstance(text_fields, list): 469 raise ValueError("text_fields must be a list of dictionaries") 470 471 for field in text_fields: 472 if not isinstance(field, dict) or 'role' not in field or 'content' not in field: 473 raise ValueError("Each text field must be a dictionary with 'role' and 'content' keys") 474 475 # First, get the dataset ID 476 headers = { 477 'Content-Type': 'application/json', 478 "Authorization": f"Bearer {os.getenv('RAGAAI_CATALYST_TOKEN')}", 479 "X-Project-Id": str(self.project_id), 480 } 481 json_data = {"size": 12, "page": "0", "projectId": str(self.project_id), "search": ""} 482 483 try: 484 # Get dataset list 485 response = requests.post( 486 f"{Dataset.BASE_URL}/v2/llm/dataset", 487 headers=headers, 488 json=json_data, 489 timeout=Dataset.TIMEOUT, 490 ) 491 response.raise_for_status() 492 datasets = response.json()["data"]["content"] 493 494 # Find dataset ID 495 dataset_id = next((dataset["id"] for dataset in datasets if dataset["name"] == dataset_name), None) 496 497 if dataset_id is None: 498 raise ValueError(f"Dataset {dataset_name} not found") 499 500 501 502 parameters_url= f"{Dataset.BASE_URL}/playground/providers/models/parameters/list" 503 504 headers = { 505 'Content-Type': 'application/json', 506 "Authorization": f"Bearer {os.getenv('RAGAAI_CATALYST_TOKEN')}", 507 "X-Project-Id": str(self.project_id), 508 } 509 510 # Fetch model parameters 511 parameters_payload = { 512 "providerName": provider, 513 "modelName": model 514 } 515 516 # Get model parameters 517 params_response = requests.post( 518 parameters_url, 519 headers=headers, 520 json=parameters_payload, 521 timeout=30 522 ) 523 params_response.raise_for_status() 524 525 # Extract parameters 526 all_parameters = params_response.json().get('data', []) 527 528 # Filter and transform parameters for add-column API 529 formatted_parameters = [] 530 for param in all_parameters: 531 value = param.get('value') 532 param_type = param.get('type') 533 534 if value is None: 535 formatted_param = { 536 "name": param.get('name'), 537 "value": None, # Pass None if the value is null 538 "type": param.get('type') 539 } 540 else: 541 # Improved type handling 542 if param_type == "float": 543 value = float(value) # Ensure value is converted to float 544 elif param_type == "int": 545 value = int(value) # Ensure value is converted to int 546 elif param_type == "bool": 547 value = bool(value) # Ensure value is converted to bool 548 elif param_type == "string": 549 value = str(value) # Ensure value is converted to string 550 else: 551 raise ValueError(f"Unsupported parameter type: {param_type}") # Handle unsupported types 552 553 formatted_param = { 554 "name": param.get('name'), 555 "value": value, 556 "type": param.get('type') 557 } 558 formatted_parameters.append(formatted_param) 559 dataset_id = next((dataset["id"] for dataset in datasets if dataset["name"] == dataset_name), None) 560 561 # Prepare payload for add column API 562 add_column_payload = { 563 "rowFilterList": [], 564 "columnName": column_name, 565 "datasetId": dataset_id, 566 "variables": variables, 567 "promptTemplate": { 568 "textFields": text_fields, 569 "modelSpecs": { 570 "model": f"{provider}/{model}", 571 "parameters": formatted_parameters 572 } 573 } 574 } 575 if variables: 576 variable_specs = [] 577 for key, values in variables.items(): 578 variable_specs.append({ 579 "name": key, 580 "type": "string", 581 "schema": "query" 582 }) 583 add_column_payload["promptTemplate"]["variableSpecs"] = variable_specs 584 585 # Make API call to add column 586 add_column_url = f"{Dataset.BASE_URL}/v2/llm/dataset/add-column" 587 588 response = requests.post( 589 add_column_url, 590 headers={ 591 'Content-Type': 'application/json', 592 'Authorization': f"Bearer {os.getenv('RAGAAI_CATALYST_TOKEN')}", 593 "X-Project-Id": str(self.project_id) 594 }, 595 json=add_column_payload, 596 timeout=30 597 ) 598 599 # Check response 600 response.raise_for_status() 601 response_data = response.json() 602 603 if response_data.get('success', False): 604 print(f"Column '{column_name}' added successfully to dataset '{dataset_name}'") 605 self.jobId = response_data['data']['jobId'] 606 else: 607 raise ValueError(response_data.get('message', 'Failed to add column')) 608 609 except requests.exceptions.RequestException as e: 610 print(f"Error adding column: {e}") 611 raise 612 613 def get_status(self): 614 headers = { 615 'Content-Type': 'application/json', 616 "Authorization": f"Bearer {os.getenv('RAGAAI_CATALYST_TOKEN')}", 617 'X-Project-Id': str(self.project_id), 618 } 619 try: 620 response = requests.get( 621 f'{Dataset.BASE_URL}/job/status', 622 headers=headers, 623 timeout=30) 624 response.raise_for_status() 625 if response.json()["success"]: 626 627 status_json = [item["status"] for item in response.json()["data"]["content"] if item["id"]==self.jobId] 628 status_json = status_json[0] 629 if status_json == "Failed": 630 print("Job failed. No results to fetch.") 631 return JOB_STATUS_FAILED 632 elif status_json == "In Progress": 633 print(f"Job in progress. Please wait while the job completes.\nVisit Job Status: {Dataset.BASE_URL.removesuffix('/api')}/projects/job-status?projectId={self.project_id} to track") 634 return JOB_STATUS_IN_PROGRESS 635 elif status_json == "Completed": 636 print(f"Job completed. Fetching results.\nVisit Job Status: {Dataset.BASE_URL.removesuffix('/api')}/projects/job-status?projectId={self.project_id} to check") 637 return JOB_STATUS_COMPLETED 638 else: 639 logger.error(f"Unknown status received: {status_json}") 640 return JOB_STATUS_FAILED 641 else: 642 logger.error("Request was not successful") 643 return JOB_STATUS_FAILED 644 except requests.exceptions.HTTPError as http_err: 645 logger.error(f"HTTP error occurred: {http_err}") 646 return JOB_STATUS_FAILED 647 except requests.exceptions.ConnectionError as conn_err: 648 logger.error(f"Connection error occurred: {conn_err}") 649 return JOB_STATUS_FAILED 650 except requests.exceptions.Timeout as timeout_err: 651 logger.error(f"Timeout error occurred: {timeout_err}") 652 return JOB_STATUS_FAILED 653 except requests.exceptions.RequestException as req_err: 654 logger.error(f"An error occurred: {req_err}") 655 return JOB_STATUS_FAILED 656 except Exception as e: 657 logger.error(f"An unexpected error occurred: {e}") 658 return JOB_STATUS_FAILED 659 660 def _jsonl_to_csv(self, jsonl_file, csv_file): 661 """Convert a JSONL file to a CSV file.""" 662 with open(jsonl_file, 'r', encoding='utf-8') as infile: 663 data = [json.loads(line) for line in infile] 664 665 if not data: 666 print("Empty JSONL file.") 667 return 668 669 with open(csv_file, 'w', newline='', encoding='utf-8') as outfile: 670 writer = csv.DictWriter(outfile, fieldnames=data[0].keys()) 671 writer.writeheader() 672 writer.writerows(data) 673 674 print(f"Converted {jsonl_file} to {csv_file}") 675 676 def create_from_jsonl(self, jsonl_path, dataset_name, schema_mapping): 677 tmp_csv_path = os.path.join(tempfile.gettempdir(), f"{dataset_name}.csv") 678 try: 679 self._jsonl_to_csv(jsonl_path, tmp_csv_path) 680 self.create_from_csv(tmp_csv_path, dataset_name, schema_mapping) 681 except (IOError, UnicodeError) as e: 682 logger.error(f"Error converting JSONL to CSV: {e}") 683 raise 684 finally: 685 if os.path.exists(tmp_csv_path): 686 try: 687 os.remove(tmp_csv_path) 688 except Exception as e: 689 logger.error(f"Error removing temporary CSV file: {e}") 690 691 def add_rows_from_jsonl(self, jsonl_path, dataset_name): 692 tmp_csv_path = os.path.join(tempfile.gettempdir(), f"{dataset_name}.csv") 693 try: 694 self._jsonl_to_csv(jsonl_path, tmp_csv_path) 695 self.add_rows(tmp_csv_path, dataset_name) 696 except (IOError, UnicodeError) as e: 697 logger.error(f"Error converting JSONL to CSV: {e}") 698 raise 699 finally: 700 if os.path.exists(tmp_csv_path): 701 try: 702 os.remove(tmp_csv_path) 703 except Exception as e: 704 logger.error(f"Error removing temporary CSV file: {e}") 705 706 def create_from_df(self, df, dataset_name, schema_mapping): 707 tmp_csv_path = os.path.join(tempfile.gettempdir(), f"{dataset_name}.csv") 708 try: 709 df.to_csv(tmp_csv_path, index=False) 710 self.create_from_csv(tmp_csv_path, dataset_name, schema_mapping) 711 except (IOError, UnicodeError) as e: 712 logger.error(f"Error converting DataFrame to CSV: {e}") 713 raise 714 finally: 715 if os.path.exists(tmp_csv_path): 716 try: 717 os.remove(tmp_csv_path) 718 except Exception as e: 719 logger.error(f"Error removing temporary CSV file: {e}") 720 721 def add_rows_from_df(self, df, dataset_name): 722 tmp_csv_path = os.path.join(tempfile.gettempdir(), f"{dataset_name}.csv") 723 try: 724 df.to_csv(tmp_csv_path, index=False) 725 self.add_rows(tmp_csv_path, dataset_name) 726 except (IOError, UnicodeError) as e: 727 logger.error(f"Error converting DataFrame to CSV: {e}") 728 raise 729 finally: 730 if os.path.exists(tmp_csv_path): 731 try: 732 os.remove(tmp_csv_path) 733 except Exception as e: 734 logger.error(f"Error removing temporary CSV file: {e}")