csv_document_cleaner.py
1 # SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai> 2 # 3 # SPDX-License-Identifier: Apache-2.0 4 5 from copy import deepcopy 6 from io import StringIO 7 from typing import Optional 8 9 from haystack import Document, component, logging 10 from haystack.lazy_imports import LazyImport 11 12 with LazyImport("Run 'pip install pandas'") as pandas_import: 13 import pandas as pd 14 15 logger = logging.getLogger(__name__) 16 17 18 @component 19 class CSVDocumentCleaner: 20 """ 21 A component for cleaning CSV documents by removing empty rows and columns. 22 23 This component processes CSV content stored in Documents, allowing 24 for the optional ignoring of a specified number of rows and columns before performing 25 the cleaning operation. Additionally, it provides options to keep document IDs and 26 control whether empty rows and columns should be removed. 27 """ 28 29 def __init__( 30 self, 31 *, 32 ignore_rows: int = 0, 33 ignore_columns: int = 0, 34 remove_empty_rows: bool = True, 35 remove_empty_columns: bool = True, 36 keep_id: bool = False, 37 ) -> None: 38 """ 39 Initializes the CSVDocumentCleaner component. 40 41 :param ignore_rows: Number of rows to ignore from the top of the CSV table before processing. 42 :param ignore_columns: Number of columns to ignore from the left of the CSV table before processing. 43 :param remove_empty_rows: Whether to remove rows that are entirely empty. 44 :param remove_empty_columns: Whether to remove columns that are entirely empty. 45 :param keep_id: Whether to retain the original document ID in the output document. 46 47 Rows and columns ignored using these parameters are preserved in the final output, meaning 48 they are not considered when removing empty rows and columns. 49 """ 50 self.ignore_rows = ignore_rows 51 self.ignore_columns = ignore_columns 52 self.remove_empty_rows = remove_empty_rows 53 self.remove_empty_columns = remove_empty_columns 54 self.keep_id = keep_id 55 pandas_import.check() 56 57 @component.output_types(documents=list[Document]) 58 def run(self, documents: list[Document]) -> dict[str, list[Document]]: 59 """ 60 Cleans CSV documents by removing empty rows and columns while preserving specified ignored rows and columns. 61 62 :param documents: List of Documents containing CSV-formatted content. 63 :return: A dictionary with a list of cleaned Documents under the key "documents". 64 65 Processing steps: 66 1. Reads each document's content as a CSV table. 67 2. Retains the specified number of `ignore_rows` from the top and `ignore_columns` from the left. 68 3. Drops any rows and columns that are entirely empty (if enabled by `remove_empty_rows` and 69 `remove_empty_columns`). 70 4. Reattaches the ignored rows and columns to maintain their original positions. 71 5. Returns the cleaned CSV content as a new `Document` object, with an option to retain the original 72 document ID. 73 """ 74 if len(documents) == 0: 75 return {"documents": []} 76 77 ignore_rows = self.ignore_rows 78 ignore_columns = self.ignore_columns 79 80 cleaned_documents = [] 81 for document in documents: 82 try: 83 df = pd.read_csv(StringIO(document.content), header=None, dtype=object) 84 except Exception as e: 85 logger.exception( 86 "Error processing document {id}. Keeping it, but skipping cleaning. Error: {error}", 87 id=document.id, 88 error=e, 89 ) 90 cleaned_documents.append(document) 91 continue 92 93 if ignore_rows > df.shape[0] or ignore_columns > df.shape[1]: 94 logger.warning( 95 "Document {id} has fewer rows {df_rows} or columns {df_cols} " 96 "than the number of rows {rows} or columns {cols} to ignore. " 97 "Keeping the entire document.", 98 id=document.id, 99 df_rows=df.shape[0], 100 df_cols=df.shape[1], 101 rows=ignore_rows, 102 cols=ignore_columns, 103 ) 104 cleaned_documents.append(document) 105 continue 106 107 final_df = self._clean_df(df=df, ignore_rows=ignore_rows, ignore_columns=ignore_columns) 108 109 clean_doc = Document( 110 id=document.id if self.keep_id else "", 111 content=final_df.to_csv(index=False, header=False, lineterminator="\n"), 112 blob=document.blob, 113 meta=deepcopy(document.meta), 114 score=document.score, 115 embedding=document.embedding, 116 sparse_embedding=document.sparse_embedding, 117 ) 118 cleaned_documents.append(clean_doc) 119 return {"documents": cleaned_documents} 120 121 def _clean_df(self, df: "pd.DataFrame", ignore_rows: int, ignore_columns: int) -> "pd.DataFrame": 122 """ 123 Cleans a DataFrame by removing empty rows and columns while preserving ignored sections. 124 125 :param df: The input DataFrame representing the CSV data. 126 :param ignore_rows: Number of top rows to ignore. 127 :param ignore_columns: Number of left columns to ignore. 128 """ 129 # Get ignored rows and columns 130 ignored_rows = self._get_ignored_rows(df=df, ignore_rows=ignore_rows) 131 ignored_columns = self._get_ignored_columns(df=df, ignore_columns=ignore_columns) 132 final_df = df.iloc[ignore_rows:, ignore_columns:] 133 134 # Drop rows that are entirely empty 135 if self.remove_empty_rows: 136 final_df = final_df.dropna(axis=0, how="all") 137 138 # Drop columns that are entirely empty 139 if self.remove_empty_columns: 140 final_df = final_df.dropna(axis=1, how="all") 141 142 # Reattach ignored rows 143 if ignore_rows > 0 and ignored_rows is not None: 144 # Keep only relevant columns 145 ignored_rows = ignored_rows.loc[:, final_df.columns] 146 final_df = pd.concat([ignored_rows, final_df], axis=0) 147 148 # Reattach ignored columns 149 if ignore_columns > 0 and ignored_columns is not None: 150 # Keep only relevant rows 151 ignored_columns = ignored_columns.loc[final_df.index, :] 152 final_df = pd.concat([ignored_columns, final_df], axis=1) 153 154 return final_df 155 156 @staticmethod 157 def _get_ignored_rows(df: "pd.DataFrame", ignore_rows: int) -> Optional["pd.DataFrame"]: 158 """ 159 Extracts the rows to be ignored from the DataFrame. 160 161 :param df: The input DataFrame. 162 :param ignore_rows: Number of rows to extract from the top. 163 """ 164 if ignore_rows > 0: 165 return df.iloc[:ignore_rows, :] 166 return None 167 168 @staticmethod 169 def _get_ignored_columns(df: "pd.DataFrame", ignore_columns: int) -> Optional["pd.DataFrame"]: 170 """ 171 Extracts the columns to be ignored from the DataFrame. 172 173 :param df: The input DataFrame. 174 :param ignore_columns: Number of columns to extract from the left. 175 """ 176 if ignore_columns > 0: 177 return df.iloc[:, :ignore_columns] 178 return None