/ haystack / components / preprocessors / csv_document_cleaner.py
csv_document_cleaner.py
  1  # SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
  2  #
  3  # SPDX-License-Identifier: Apache-2.0
  4  
  5  from copy import deepcopy
  6  from io import StringIO
  7  from typing import Optional
  8  
  9  from haystack import Document, component, logging
 10  from haystack.lazy_imports import LazyImport
 11  
 12  with LazyImport("Run 'pip install pandas'") as pandas_import:
 13      import pandas as pd
 14  
 15  logger = logging.getLogger(__name__)
 16  
 17  
 18  @component
 19  class CSVDocumentCleaner:
 20      """
 21      A component for cleaning CSV documents by removing empty rows and columns.
 22  
 23      This component processes CSV content stored in Documents, allowing
 24      for the optional ignoring of a specified number of rows and columns before performing
 25      the cleaning operation. Additionally, it provides options to keep document IDs and
 26      control whether empty rows and columns should be removed.
 27      """
 28  
 29      def __init__(
 30          self,
 31          *,
 32          ignore_rows: int = 0,
 33          ignore_columns: int = 0,
 34          remove_empty_rows: bool = True,
 35          remove_empty_columns: bool = True,
 36          keep_id: bool = False,
 37      ) -> None:
 38          """
 39          Initializes the CSVDocumentCleaner component.
 40  
 41          :param ignore_rows: Number of rows to ignore from the top of the CSV table before processing.
 42          :param ignore_columns: Number of columns to ignore from the left of the CSV table before processing.
 43          :param remove_empty_rows: Whether to remove rows that are entirely empty.
 44          :param remove_empty_columns: Whether to remove columns that are entirely empty.
 45          :param keep_id: Whether to retain the original document ID in the output document.
 46  
 47          Rows and columns ignored using these parameters are preserved in the final output, meaning
 48          they are not considered when removing empty rows and columns.
 49          """
 50          self.ignore_rows = ignore_rows
 51          self.ignore_columns = ignore_columns
 52          self.remove_empty_rows = remove_empty_rows
 53          self.remove_empty_columns = remove_empty_columns
 54          self.keep_id = keep_id
 55          pandas_import.check()
 56  
 57      @component.output_types(documents=list[Document])
 58      def run(self, documents: list[Document]) -> dict[str, list[Document]]:
 59          """
 60          Cleans CSV documents by removing empty rows and columns while preserving specified ignored rows and columns.
 61  
 62          :param documents: List of Documents containing CSV-formatted content.
 63          :return: A dictionary with a list of cleaned Documents under the key "documents".
 64  
 65          Processing steps:
 66          1. Reads each document's content as a CSV table.
 67          2. Retains the specified number of `ignore_rows` from the top and `ignore_columns` from the left.
 68          3. Drops any rows and columns that are entirely empty (if enabled by `remove_empty_rows` and
 69              `remove_empty_columns`).
 70          4. Reattaches the ignored rows and columns to maintain their original positions.
 71          5. Returns the cleaned CSV content as a new `Document` object, with an option to retain the original
 72              document ID.
 73          """
 74          if len(documents) == 0:
 75              return {"documents": []}
 76  
 77          ignore_rows = self.ignore_rows
 78          ignore_columns = self.ignore_columns
 79  
 80          cleaned_documents = []
 81          for document in documents:
 82              try:
 83                  df = pd.read_csv(StringIO(document.content), header=None, dtype=object)
 84              except Exception as e:
 85                  logger.exception(
 86                      "Error processing document {id}. Keeping it, but skipping cleaning. Error: {error}",
 87                      id=document.id,
 88                      error=e,
 89                  )
 90                  cleaned_documents.append(document)
 91                  continue
 92  
 93              if ignore_rows > df.shape[0] or ignore_columns > df.shape[1]:
 94                  logger.warning(
 95                      "Document {id} has fewer rows {df_rows} or columns {df_cols} "
 96                      "than the number of rows {rows} or columns {cols} to ignore. "
 97                      "Keeping the entire document.",
 98                      id=document.id,
 99                      df_rows=df.shape[0],
100                      df_cols=df.shape[1],
101                      rows=ignore_rows,
102                      cols=ignore_columns,
103                  )
104                  cleaned_documents.append(document)
105                  continue
106  
107              final_df = self._clean_df(df=df, ignore_rows=ignore_rows, ignore_columns=ignore_columns)
108  
109              clean_doc = Document(
110                  id=document.id if self.keep_id else "",
111                  content=final_df.to_csv(index=False, header=False, lineterminator="\n"),
112                  blob=document.blob,
113                  meta=deepcopy(document.meta),
114                  score=document.score,
115                  embedding=document.embedding,
116                  sparse_embedding=document.sparse_embedding,
117              )
118              cleaned_documents.append(clean_doc)
119          return {"documents": cleaned_documents}
120  
121      def _clean_df(self, df: "pd.DataFrame", ignore_rows: int, ignore_columns: int) -> "pd.DataFrame":
122          """
123          Cleans a DataFrame by removing empty rows and columns while preserving ignored sections.
124  
125          :param df: The input DataFrame representing the CSV data.
126          :param ignore_rows: Number of top rows to ignore.
127          :param ignore_columns: Number of left columns to ignore.
128          """
129          # Get ignored rows and columns
130          ignored_rows = self._get_ignored_rows(df=df, ignore_rows=ignore_rows)
131          ignored_columns = self._get_ignored_columns(df=df, ignore_columns=ignore_columns)
132          final_df = df.iloc[ignore_rows:, ignore_columns:]
133  
134          # Drop rows that are entirely empty
135          if self.remove_empty_rows:
136              final_df = final_df.dropna(axis=0, how="all")
137  
138          # Drop columns that are entirely empty
139          if self.remove_empty_columns:
140              final_df = final_df.dropna(axis=1, how="all")
141  
142          # Reattach ignored rows
143          if ignore_rows > 0 and ignored_rows is not None:
144              # Keep only relevant columns
145              ignored_rows = ignored_rows.loc[:, final_df.columns]
146              final_df = pd.concat([ignored_rows, final_df], axis=0)
147  
148          # Reattach ignored columns
149          if ignore_columns > 0 and ignored_columns is not None:
150              # Keep only relevant rows
151              ignored_columns = ignored_columns.loc[final_df.index, :]
152              final_df = pd.concat([ignored_columns, final_df], axis=1)
153  
154          return final_df
155  
156      @staticmethod
157      def _get_ignored_rows(df: "pd.DataFrame", ignore_rows: int) -> Optional["pd.DataFrame"]:
158          """
159          Extracts the rows to be ignored from the DataFrame.
160  
161          :param df: The input DataFrame.
162          :param ignore_rows: Number of rows to extract from the top.
163          """
164          if ignore_rows > 0:
165              return df.iloc[:ignore_rows, :]
166          return None
167  
168      @staticmethod
169      def _get_ignored_columns(df: "pd.DataFrame", ignore_columns: int) -> Optional["pd.DataFrame"]:
170          """
171          Extracts the columns to be ignored from the DataFrame.
172  
173          :param df: The input DataFrame.
174          :param ignore_columns: Number of columns to extract from the left.
175          """
176          if ignore_columns > 0:
177              return df.iloc[:, :ignore_columns]
178          return None