/ ragaai_catalyst / dataset.py
dataset.py
  1  import os
  2  import csv
  3  import json
  4  import tempfile
  5  import requests
  6  from .utils import response_checker
  7  from typing import Union
  8  import logging
  9  from .ragaai_catalyst import RagaAICatalyst
 10  import pandas as pd
 11  logger = logging.getLogger(__name__)
 12  get_token = RagaAICatalyst.get_token
 13  
 14  # Job status constants
 15  JOB_STATUS_FAILED = "failed"
 16  JOB_STATUS_IN_PROGRESS = "in_progress"
 17  JOB_STATUS_COMPLETED = "success"
 18  
 19  class Dataset:
 20      BASE_URL = None
 21      TIMEOUT = 30
 22  
 23      def __init__(self, project_name):
 24          self.project_name = project_name
 25          self.num_projects = 99999
 26          Dataset.BASE_URL = RagaAICatalyst.BASE_URL
 27          self.jobId = None
 28          headers = {
 29              "Authorization": f'Bearer {os.getenv("RAGAAI_CATALYST_TOKEN")}',
 30          }
 31          try:
 32              response = requests.get(
 33                  f"{Dataset.BASE_URL}/v2/llm/projects?size={self.num_projects}",
 34                  headers=headers,
 35                  timeout=self.TIMEOUT,
 36              )
 37              response.raise_for_status()
 38              logger.debug("Projects list retrieved successfully")
 39              
 40              project_list = [
 41                  project["name"] for project in response.json()["data"]["content"]
 42              ]
 43  
 44              if project_name not in project_list:
 45                  raise ValueError("Project not found. Please enter a valid project name")
 46  
 47              self.project_id = [
 48                  project["id"] for project in response.json()["data"]["content"] if project["name"] == project_name
 49              ][0]
 50  
 51          except requests.exceptions.RequestException as e:
 52              logger.error(f"Failed to retrieve projects list: {e}")
 53              raise
 54  
 55      def list_datasets(self):
 56          """
 57          Retrieves a list of datasets for a given project.
 58  
 59          Returns:
 60              list: A list of dataset names.
 61  
 62          Raises:
 63              None.
 64          """
 65  
 66          def make_request():
 67              headers = {
 68                  'Content-Type': 'application/json',
 69                  "Authorization": f"Bearer {os.getenv('RAGAAI_CATALYST_TOKEN')}",
 70                  "X-Project-Id": str(self.project_id),
 71              }
 72              json_data = {"size": 99999, "page": "0", "projectId": str(self.project_id), "search": ""}
 73              try:
 74                  response = requests.post(
 75                      f"{Dataset.BASE_URL}/v2/llm/dataset",
 76                      headers=headers,
 77                      json=json_data,
 78                      timeout=Dataset.TIMEOUT,
 79                  )
 80                  response.raise_for_status()
 81                  return response
 82              except requests.exceptions.RequestException as e:
 83                  logger.error(f"Failed to list datasets: {e}")
 84                  raise
 85  
 86          try:
 87              response = make_request()
 88              response_checker(response, "Dataset.list_datasets")
 89              if response.status_code == 401:
 90                  get_token()  # Fetch a new token and set it in the environment
 91                  response = make_request()  # Retry the request
 92              if response.status_code != 200:
 93                  return {
 94                      "status_code": response.status_code,
 95                      "message": response.json(),
 96                  }
 97              datasets = response.json()["data"]["content"]
 98              dataset_list = [dataset["name"] for dataset in datasets]
 99              return dataset_list
100          except Exception as e:
101              logger.error(f"Error in list_datasets: {e}")
102              raise
103  
104      def get_schema_mapping(self):
105          headers = {
106              "Authorization": f"Bearer {os.getenv('RAGAAI_CATALYST_TOKEN')}",
107              "X-Project-Name": self.project_name,
108          }
109          try:
110              response = requests.get(
111                  f"{Dataset.BASE_URL}/v1/llm/schema-elements",
112                  headers=headers,
113                  timeout=Dataset.TIMEOUT,
114              )
115              response.raise_for_status()
116              response_data = response.json()["data"]["schemaElements"]
117              if not response.json()['success']:
118                  raise ValueError('Unable to fetch Schema Elements for the CSV')
119              return response_data
120          except requests.exceptions.RequestException as e:
121              logger.error(f"Failed to get CSV schema: {e}")
122              raise
123  
124      ###################### CSV Upload APIs ###################
125  
126      def get_dataset_columns(self, dataset_name):
127          list_dataset = self.list_datasets()
128          if dataset_name not in list_dataset:
129              raise ValueError(f"Dataset {dataset_name} does not exists. Please enter a valid dataset name")
130  
131          headers = {
132              "Authorization": f"Bearer {os.getenv('RAGAAI_CATALYST_TOKEN')}",
133              "X-Project-Name": self.project_name,
134          }
135          headers = {
136                  'Content-Type': 'application/json',
137                  "Authorization": f"Bearer {os.getenv('RAGAAI_CATALYST_TOKEN')}",
138                  "X-Project-Id": str(self.project_id),
139              }
140          json_data = {"size": 12, "page": "0", "projectId": str(self.project_id), "search": ""}
141          try:
142              response = requests.post(
143                  f"{Dataset.BASE_URL}/v2/llm/dataset",
144                  headers=headers,
145                  json=json_data,
146                  timeout=Dataset.TIMEOUT,
147              )
148              response.raise_for_status()
149              datasets = response.json()["data"]["content"]
150              dataset_id = [dataset["id"] for dataset in datasets if dataset["name"]==dataset_name][0]
151          except requests.exceptions.RequestException as e:
152              logger.error(f"Failed to list datasets: {e}")
153              raise
154  
155          try:
156              response = requests.get(
157                  f"{Dataset.BASE_URL}/v2/llm/dataset/{dataset_id}?initialCols=0",
158                  headers=headers,
159                  timeout=Dataset.TIMEOUT,
160              )
161              response.raise_for_status()
162              dataset_columns = response.json()["data"]["datasetColumnsResponses"]
163              dataset_columns = [item["displayName"] for item in dataset_columns]
164              dataset_columns = [data for data in dataset_columns if not data.startswith('_')]
165              if not response.json()['success']:
166                  raise ValueError('Unable to fetch details of for the CSV')
167              return dataset_columns
168          except requests.exceptions.RequestException as e:
169              logger.error(f"Failed to get CSV columns: {e}")
170              raise
171  
172      def create_from_csv(self, csv_path, dataset_name, schema_mapping):
173          list_dataset = self.list_datasets()
174          if dataset_name in list_dataset:
175              raise ValueError(f"Dataset name {dataset_name} already exists. Please enter a unique dataset name")
176  
177          #### get presigned URL
178          def get_presignedUrl():
179              headers = {
180                  "Authorization": f"Bearer {os.getenv('RAGAAI_CATALYST_TOKEN')}",
181                  "X-Project-Id": str(self.project_id),
182              }
183              try:
184                  response = requests.get(
185                      f"{Dataset.BASE_URL}/v2/llm/dataset/csv/presigned-url",
186                      headers=headers,
187                      timeout=Dataset.TIMEOUT,
188                  )
189                  response.raise_for_status()
190                  return response.json()
191              except requests.exceptions.RequestException as e:
192                  logger.error(f"Failed to get presigned URL: {e}")
193                  raise
194  
195          try:
196              presignedUrl = get_presignedUrl()
197              if presignedUrl['success']:
198                  url = presignedUrl['data']['presignedUrl']
199                  filename = presignedUrl['data']['fileName']
200              else:
201                  raise ValueError('Unable to fetch presignedUrl')
202          except Exception as e:
203              logger.error(f"Error in get_presignedUrl: {e}")
204              raise
205  
206          #### put csv to presigned URL
207          def put_csv_to_presignedUrl(url):
208              headers = {
209                  'Content-Type': 'text/csv',
210                  'x-ms-blob-type': 'BlockBlob',
211              }
212              try:
213                  with open(csv_path, 'rb') as file:
214                      response = requests.put(
215                          url,
216                          headers=headers,
217                          data=file,
218                          timeout=Dataset.TIMEOUT,
219                      )
220                      response.raise_for_status()
221                      return response
222              except requests.exceptions.RequestException as e:
223                  logger.error(f"Failed to put CSV to presigned URL: {e}")
224                  raise
225  
226          try:
227  
228              put_csv_response = put_csv_to_presignedUrl(url)
229              if put_csv_response.status_code not in (200, 201):
230                  raise ValueError('Unable to put csv to the presignedUrl')
231          except Exception as e:
232              logger.error(f"Error in put_csv_to_presignedUrl: {e}")
233              raise
234  
235          ## Upload csv to elastic
236          def upload_csv_to_elastic(data):
237              header = {
238                  'Content-Type': 'application/json',
239                  'Authorization': f"Bearer {os.getenv('RAGAAI_CATALYST_TOKEN')}",
240                  "X-Project-Id": str(self.project_id)
241              }
242              try:
243                  response = requests.post(
244                      f"{Dataset.BASE_URL}/v2/llm/dataset/csv",
245                      headers=header,
246                      json=data,
247                      timeout=Dataset.TIMEOUT,
248                  )
249                  if response.status_code==400:
250                      raise ValueError(response.json()["message"])
251                  response.raise_for_status()
252                  return response.json()
253              except requests.exceptions.RequestException as e:
254                  logger.error(f"Failed to upload CSV to elastic: {e}")
255                  raise
256  
257          def generate_schema(mapping):
258              result = {}
259              for column, schema_element in mapping.items():
260                  result[column] = {"columnType": schema_element}
261              return result
262  
263          try:
264              schema_mapping = generate_schema(schema_mapping)
265              data = {
266                  "projectId": str(self.project_id),
267                  "datasetName": dataset_name,
268                  "fileName": filename,
269                  "schemaMapping": schema_mapping,
270                  "opType": "insert",
271                  "description": ""
272              }
273              upload_csv_response = upload_csv_to_elastic(data)
274              if not upload_csv_response['success']:
275                  raise ValueError('Unable to upload csv')
276              else:
277                  print(upload_csv_response['message'])
278                  self.jobId = upload_csv_response['data']['jobId']
279          except Exception as e:
280              logger.error(f"Error in create_from_csv: {e}")
281              raise
282  
283      def add_rows(self, csv_path, dataset_name):
284          """
285          Add rows to an existing dataset from a CSV file.
286  
287          Args:
288              csv_path (str): Path to the CSV file to be added
289              dataset_name (str): Name of the existing dataset to add rows to
290  
291          Raises:
292              ValueError: If dataset does not exist or columns are incompatible
293          """
294          # Get existing dataset columns
295          existing_columns = self.get_dataset_columns(dataset_name)
296  
297          # Read the CSV file to check columns
298          try:
299              import pandas as pd
300              df = pd.read_csv(csv_path)
301              csv_columns = df.columns.tolist()
302          except Exception as e:
303              logger.error(f"Failed to read CSV file: {e}")
304              raise ValueError(f"Unable to read CSV file: {e}")
305  
306          # Check column compatibility
307          for column in existing_columns:
308              if column not in csv_columns:
309                  df[column] = None  
310  
311          # Get presigned URL for the CSV
312          def get_presignedUrl():
313              headers = {
314                  "Authorization": f"Bearer {os.getenv('RAGAAI_CATALYST_TOKEN')}",
315                  "X-Project-Id": str(self.project_id),
316              }
317              try:
318                  response = requests.get(
319                      f"{Dataset.BASE_URL}/v2/llm/dataset/csv/presigned-url",
320                      headers=headers,
321                      timeout=Dataset.TIMEOUT,
322                  )
323                  response.raise_for_status()
324                  return response.json()
325              except requests.exceptions.RequestException as e:
326                  logger.error(f"Failed to get presigned URL: {e}")
327                  raise
328  
329          try:
330              presignedUrl = get_presignedUrl()
331              if presignedUrl['success']:
332                  url = presignedUrl['data']['presignedUrl']
333                  filename = presignedUrl['data']['fileName']
334              else:
335                  raise ValueError('Unable to fetch presignedUrl')
336          except Exception as e:
337              logger.error(f"Error in get_presignedUrl: {e}")
338              raise
339  
340          # Upload CSV to presigned URL
341          def put_csv_to_presignedUrl(url):
342              headers = {
343                  'Content-Type': 'text/csv',
344                  'x-ms-blob-type': 'BlockBlob',
345              }
346              try:
347                  with open(csv_path, 'rb') as file:
348                      response = requests.put(
349                          url,
350                          headers=headers,
351                          data=file,
352                          timeout=Dataset.TIMEOUT,
353                      )
354                      response.raise_for_status()
355                      return response
356              except requests.exceptions.RequestException as e:
357                  logger.error(f"Failed to put CSV to presigned URL: {e}")
358                  raise
359  
360          try:
361              put_csv_response = put_csv_to_presignedUrl(url)
362              if put_csv_response.status_code not in (200, 201):
363                  raise ValueError('Unable to put csv to the presignedUrl')
364          except Exception as e:
365              logger.error(f"Error in put_csv_to_presignedUrl: {e}")
366              raise
367  
368          # Prepare schema mapping (assuming same mapping as original dataset)
369          def generate_schema_mapping(dataset_name):
370              headers = {
371                  'Content-Type': 'application/json',
372                  "Authorization": f"Bearer {os.getenv('RAGAAI_CATALYST_TOKEN')}",
373                  "X-Project-Id": str(self.project_id),
374              }
375              json_data = {
376                  "size": 12, 
377                  "page": "0", 
378                  "projectId": str(self.project_id), 
379                  "search": ""
380              }
381              try:
382                  # First get dataset details
383                  response = requests.post(
384                      f"{Dataset.BASE_URL}/v2/llm/dataset",
385                      headers=headers,
386                      json=json_data,
387                      timeout=Dataset.TIMEOUT,
388                  )
389                  response.raise_for_status()
390                  datasets = response.json()["data"]["content"]
391                  dataset_id = [dataset["id"] for dataset in datasets if dataset["name"]==dataset_name][0]
392  
393                  # Get dataset details to extract schema mapping
394                  response = requests.get(
395                      f"{Dataset.BASE_URL}/v2/llm/dataset/{dataset_id}?initialCols=0",
396                      headers=headers,
397                      timeout=Dataset.TIMEOUT,
398                  )
399                  response.raise_for_status()
400                  
401                  # Extract schema mapping
402                  schema_mapping = {}
403                  for col in response.json()["data"]["datasetColumnsResponses"]:
404                      schema_mapping[col["displayName"]] = {"columnType": col["columnType"]}
405                  
406                  return schema_mapping
407              except requests.exceptions.RequestException as e:
408                  logger.error(f"Failed to get schema mapping: {e}")
409                  raise
410  
411          # Upload CSV to elastic
412          try:
413              schema_mapping = generate_schema_mapping(dataset_name)
414              
415              data = {
416                  "projectId": str(self.project_id),
417                  "datasetName": dataset_name,
418                  "fileName": filename,
419                  "schemaMapping": schema_mapping,
420                  "opType": "update",  # Use update for adding rows
421                  "description": "Adding new rows to dataset"
422              }
423              
424              headers = {
425                  'Content-Type': 'application/json',
426                  'Authorization': f"Bearer {os.getenv('RAGAAI_CATALYST_TOKEN')}",
427                  "X-Project-Id": str(self.project_id)
428              }
429              
430              response = requests.post(
431                  f"{Dataset.BASE_URL}/v2/llm/dataset/csv",
432                  headers=headers,
433                  json=data,
434                  timeout=Dataset.TIMEOUT,
435              )
436              
437              if response.status_code == 400:
438                  raise ValueError(response.json().get("message", "Failed to add rows"))
439              
440              response.raise_for_status()
441              
442              # Check response
443              response_data = response.json()
444              if response_data.get('success', False):
445                  print(f"{response_data['message']}")
446                  self.jobId = response_data['data']['jobId']
447              else:
448                  raise ValueError(response_data.get('message', 'Failed to add rows'))
449          
450          except Exception as e:
451              logger.error(f"Error in add_rows_to_dataset: {e}")
452              raise
453  
454      def add_columns(self, text_fields, dataset_name, column_name, provider, model, variables={}):
455          """
456          Add a column to a dataset with dynamically fetched model parameters
457          
458          Args:
459              project_id (int): Project ID
460              dataset_id (int): Dataset ID
461              column_name (str): Name of the new column
462              provider (str): Name of the model provider
463              model (str): Name of the model
464          """
465          # First, get model parameters
466  
467          # Validate text_fields input
468          if not isinstance(text_fields, list):
469              raise ValueError("text_fields must be a list of dictionaries")
470          
471          for field in text_fields:
472              if not isinstance(field, dict) or 'role' not in field or 'content' not in field:
473                  raise ValueError("Each text field must be a dictionary with 'role' and 'content' keys")
474              
475          # First, get the dataset ID
476          headers = {
477              'Content-Type': 'application/json',
478              "Authorization": f"Bearer {os.getenv('RAGAAI_CATALYST_TOKEN')}",
479              "X-Project-Id": str(self.project_id),
480          }
481          json_data = {"size": 12, "page": "0", "projectId": str(self.project_id), "search": ""}
482          
483          try:
484              # Get dataset list
485              response = requests.post(
486                  f"{Dataset.BASE_URL}/v2/llm/dataset",
487                  headers=headers,
488                  json=json_data,
489                  timeout=Dataset.TIMEOUT,
490              )
491              response.raise_for_status()
492              datasets = response.json()["data"]["content"]
493              
494              # Find dataset ID
495              dataset_id = next((dataset["id"] for dataset in datasets if dataset["name"] == dataset_name), None)
496              
497              if dataset_id is None:
498                  raise ValueError(f"Dataset {dataset_name} not found")
499  
500  
501  
502              parameters_url= f"{Dataset.BASE_URL}/playground/providers/models/parameters/list"
503              
504              headers = {
505                  'Content-Type': 'application/json',
506                  "Authorization": f"Bearer {os.getenv('RAGAAI_CATALYST_TOKEN')}",
507                  "X-Project-Id": str(self.project_id),
508              }
509              
510              # Fetch model parameters
511              parameters_payload = {
512                  "providerName": provider,
513                  "modelName": model
514              }
515          
516              # Get model parameters
517              params_response = requests.post(
518                  parameters_url, 
519                  headers=headers, 
520                  json=parameters_payload, 
521                  timeout=30
522              )
523              params_response.raise_for_status()
524              
525              # Extract parameters
526              all_parameters = params_response.json().get('data', [])
527              
528              # Filter and transform parameters for add-column API
529              formatted_parameters = []
530              for param in all_parameters:
531                  value = param.get('value')
532                  param_type = param.get('type')
533  
534                  if value is None:
535                      formatted_param = {
536                          "name": param.get('name'),
537                          "value": None,  # Pass None if the value is null
538                          "type": param.get('type')
539                      }
540                  else:
541                      # Improved type handling
542                      if param_type == "float":
543                          value = float(value)  # Ensure value is converted to float
544                      elif param_type == "int":
545                          value = int(value)  # Ensure value is converted to int
546                      elif param_type == "bool":
547                          value = bool(value)  # Ensure value is converted to bool
548                      elif param_type == "string":
549                          value = str(value)  # Ensure value is converted to string
550                      else:
551                          raise ValueError(f"Unsupported parameter type: {param_type}")  # Handle unsupported types
552  
553                      formatted_param = {
554                          "name": param.get('name'),
555                          "value": value,
556                          "type": param.get('type')
557                      }
558                  formatted_parameters.append(formatted_param)
559              dataset_id = next((dataset["id"] for dataset in datasets if dataset["name"] == dataset_name), None)
560  
561              # Prepare payload for add column API
562              add_column_payload = {
563                  "rowFilterList": [],
564                  "columnName": column_name,
565                  "datasetId": dataset_id,
566                  "variables": variables,
567                  "promptTemplate": {
568                      "textFields": text_fields,
569                      "modelSpecs": {
570                          "model": f"{provider}/{model}",
571                          "parameters": formatted_parameters
572                      }
573                  }
574              }
575              if variables:
576                  variable_specs = []
577                  for key, values in variables.items():
578                      variable_specs.append({
579                          "name": key,
580                          "type": "string",
581                          "schema": "query"
582                      })
583                  add_column_payload["promptTemplate"]["variableSpecs"] = variable_specs
584              
585              # Make API call to add column
586              add_column_url = f"{Dataset.BASE_URL}/v2/llm/dataset/add-column"
587              
588              response = requests.post(
589                  add_column_url, 
590                  headers={
591                      'Content-Type': 'application/json',
592                      'Authorization': f"Bearer {os.getenv('RAGAAI_CATALYST_TOKEN')}",
593                      "X-Project-Id": str(self.project_id)
594                  }, 
595                  json=add_column_payload,
596                  timeout=30
597              )
598              
599              # Check response
600              response.raise_for_status()
601              response_data = response.json()
602              
603              if response_data.get('success', False):
604                  print(f"Column '{column_name}' added successfully to dataset '{dataset_name}'")
605                  self.jobId = response_data['data']['jobId']
606              else:
607                  raise ValueError(response_data.get('message', 'Failed to add column'))
608          
609          except requests.exceptions.RequestException as e:
610              print(f"Error adding column: {e}")
611              raise
612  
613      def get_status(self):
614          headers = {
615              'Content-Type': 'application/json',
616              "Authorization": f"Bearer {os.getenv('RAGAAI_CATALYST_TOKEN')}",
617              'X-Project-Id': str(self.project_id),
618          }
619          try:
620              response = requests.get(
621                  f'{Dataset.BASE_URL}/job/status', 
622                  headers=headers, 
623                  timeout=30)
624              response.raise_for_status()
625              if response.json()["success"]:
626  
627                  status_json = [item["status"] for item in response.json()["data"]["content"] if item["id"]==self.jobId]
628                  status_json = status_json[0]
629                  if status_json == "Failed":
630                      print("Job failed. No results to fetch.")
631                      return JOB_STATUS_FAILED
632                  elif status_json == "In Progress":
633                      print(f"Job in progress. Please wait while the job completes.\nVisit Job Status: {Dataset.BASE_URL.removesuffix('/api')}/projects/job-status?projectId={self.project_id} to track")
634                      return JOB_STATUS_IN_PROGRESS
635                  elif status_json == "Completed":
636                      print(f"Job completed. Fetching results.\nVisit Job Status: {Dataset.BASE_URL.removesuffix('/api')}/projects/job-status?projectId={self.project_id} to check")
637                      return JOB_STATUS_COMPLETED
638                  else:
639                      logger.error(f"Unknown status received: {status_json}")
640                      return JOB_STATUS_FAILED
641              else:
642                  logger.error("Request was not successful")
643                  return JOB_STATUS_FAILED
644          except requests.exceptions.HTTPError as http_err:
645              logger.error(f"HTTP error occurred: {http_err}")
646              return JOB_STATUS_FAILED
647          except requests.exceptions.ConnectionError as conn_err:
648              logger.error(f"Connection error occurred: {conn_err}")
649              return JOB_STATUS_FAILED
650          except requests.exceptions.Timeout as timeout_err:
651              logger.error(f"Timeout error occurred: {timeout_err}")
652              return JOB_STATUS_FAILED
653          except requests.exceptions.RequestException as req_err:
654              logger.error(f"An error occurred: {req_err}")
655              return JOB_STATUS_FAILED
656          except Exception as e:
657              logger.error(f"An unexpected error occurred: {e}")
658              return JOB_STATUS_FAILED
659  
660      def _jsonl_to_csv(self, jsonl_file, csv_file):
661          """Convert a JSONL file to a CSV file."""
662          with open(jsonl_file, 'r', encoding='utf-8') as infile:
663              data = [json.loads(line) for line in infile]
664          
665          if not data:
666              print("Empty JSONL file.")
667              return
668          
669          with open(csv_file, 'w', newline='', encoding='utf-8') as outfile:
670              writer = csv.DictWriter(outfile, fieldnames=data[0].keys())
671              writer.writeheader()
672              writer.writerows(data)
673          
674          print(f"Converted {jsonl_file} to {csv_file}")
675  
676      def create_from_jsonl(self, jsonl_path, dataset_name, schema_mapping):
677          tmp_csv_path = os.path.join(tempfile.gettempdir(), f"{dataset_name}.csv")
678          try:
679              self._jsonl_to_csv(jsonl_path, tmp_csv_path)
680              self.create_from_csv(tmp_csv_path, dataset_name, schema_mapping)
681          except (IOError, UnicodeError) as e:
682              logger.error(f"Error converting JSONL to CSV: {e}")
683              raise
684          finally:
685              if os.path.exists(tmp_csv_path):
686                  try:
687                      os.remove(tmp_csv_path)
688                  except Exception as e:
689                      logger.error(f"Error removing temporary CSV file: {e}")
690  
691      def add_rows_from_jsonl(self, jsonl_path, dataset_name):
692          tmp_csv_path = os.path.join(tempfile.gettempdir(), f"{dataset_name}.csv")
693          try:
694              self._jsonl_to_csv(jsonl_path, tmp_csv_path)
695              self.add_rows(tmp_csv_path, dataset_name)
696          except (IOError, UnicodeError) as e:
697              logger.error(f"Error converting JSONL to CSV: {e}")
698              raise
699          finally:
700              if os.path.exists(tmp_csv_path):
701                  try:
702                      os.remove(tmp_csv_path)
703                  except Exception as e:
704                      logger.error(f"Error removing temporary CSV file: {e}")
705  
706      def create_from_df(self, df, dataset_name, schema_mapping):
707          tmp_csv_path = os.path.join(tempfile.gettempdir(), f"{dataset_name}.csv")
708          try:
709              df.to_csv(tmp_csv_path, index=False)
710              self.create_from_csv(tmp_csv_path, dataset_name, schema_mapping)
711          except (IOError, UnicodeError) as e:
712              logger.error(f"Error converting DataFrame to CSV: {e}")
713              raise
714          finally:
715              if os.path.exists(tmp_csv_path):
716                  try:
717                      os.remove(tmp_csv_path)
718                  except Exception as e:
719                      logger.error(f"Error removing temporary CSV file: {e}")
720  
721      def add_rows_from_df(self, df, dataset_name):
722          tmp_csv_path = os.path.join(tempfile.gettempdir(), f"{dataset_name}.csv")
723          try:
724              df.to_csv(tmp_csv_path, index=False)
725              self.add_rows(tmp_csv_path, dataset_name)
726          except (IOError, UnicodeError) as e:
727              logger.error(f"Error converting DataFrame to CSV: {e}")
728              raise
729          finally:
730              if os.path.exists(tmp_csv_path):
731                  try:
732                      os.remove(tmp_csv_path)
733                  except Exception as e:
734                      logger.error(f"Error removing temporary CSV file: {e}")