document_cleaner.py
1 # SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai> 2 # 3 # SPDX-License-Identifier: Apache-2.0 4 5 import re 6 from collections.abc import Generator 7 from copy import deepcopy 8 from functools import partial, reduce 9 from itertools import chain 10 from typing import Literal 11 from unicodedata import normalize 12 13 from haystack import Document, component, logging 14 15 logger = logging.getLogger(__name__) 16 17 18 @component 19 class DocumentCleaner: 20 """ 21 Cleans the text in the documents. 22 23 It removes extra whitespaces, 24 empty lines, specified substrings, regexes, 25 page headers and footers (in this order). 26 27 ### Usage example: 28 29 ```python 30 from haystack import Document 31 from haystack.components.preprocessors import DocumentCleaner 32 33 doc = Document(content="This is a document to clean\\n\\n\\nsubstring to remove") 34 35 cleaner = DocumentCleaner(remove_substrings = ["substring to remove"]) 36 result = cleaner.run(documents=[doc]) 37 38 assert result["documents"][0].content == "This is a document to clean " 39 ``` 40 """ 41 42 def __init__( 43 self, 44 remove_empty_lines: bool = True, 45 remove_extra_whitespaces: bool = True, 46 remove_repeated_substrings: bool = False, 47 keep_id: bool = False, 48 remove_substrings: list[str] | None = None, 49 remove_regex: str | None = None, 50 unicode_normalization: Literal["NFC", "NFKC", "NFD", "NFKD"] | None = None, 51 ascii_only: bool = False, 52 strip_whitespaces: bool = False, 53 replace_regexes: dict[str, str] | None = None, 54 ) -> None: 55 """ 56 Initialize DocumentCleaner. 57 58 :param remove_empty_lines: If `True`, removes empty lines. 59 :param remove_extra_whitespaces: If `True`, removes extra whitespaces. 60 :param remove_repeated_substrings: If `True`, removes repeated substrings (headers and footers) from pages. 61 Pages must be separated by a form feed character "\\f", 62 which is supported by `TextFileToDocument` and `AzureOCRDocumentConverter`. 63 :param remove_substrings: List of substrings to remove from the text. 64 :param remove_regex: Regex to match and replace substrings by "". 65 :param keep_id: If `True`, keeps the IDs of the original documents. 66 :param unicode_normalization: Unicode normalization form to apply to the text. 67 Note: This will run before any other steps. 68 :param ascii_only: Whether to convert the text to ASCII only. 69 Will remove accents from characters and replace them with ASCII characters. 70 Other non-ASCII characters will be removed. 71 Note: This will run before any pattern matching or removal. 72 :param strip_whitespaces: If `True`, removes leading and trailing whitespace from the document content 73 using Python's `str.strip()`. Unlike `remove_extra_whitespaces`, this only affects the beginning 74 and end of the text, preserving internal whitespace (useful for markdown formatting). 75 :param replace_regexes: A dictionary mapping regex patterns to their replacement strings. 76 For example, `{r'\\n\\n+': '\\n'}` replaces multiple consecutive newlines with a single newline. 77 This is applied after `remove_regex` and allows custom replacements instead of just removal. 78 """ 79 80 self._validate_params(unicode_normalization=unicode_normalization) 81 82 self.remove_empty_lines = remove_empty_lines 83 self.remove_extra_whitespaces = remove_extra_whitespaces 84 self.remove_repeated_substrings = remove_repeated_substrings 85 self.remove_substrings = remove_substrings 86 self.remove_regex = remove_regex 87 self.keep_id = keep_id 88 self.unicode_normalization = unicode_normalization 89 self.ascii_only = ascii_only 90 self.strip_whitespaces = strip_whitespaces 91 self.replace_regexes = replace_regexes 92 93 def _validate_params(self, unicode_normalization: str | None) -> None: 94 """ 95 Validate the parameters of the DocumentCleaner. 96 97 :param unicode_normalization: Unicode normalization form to apply to the text. 98 :raises ValueError: if the parameters are not valid. 99 """ 100 if unicode_normalization and unicode_normalization not in ["NFC", "NFKC", "NFD", "NFKD"]: 101 raise ValueError("unicode_normalization must be one of 'NFC', 'NFKC', 'NFD', 'NFKD'.") 102 103 @component.output_types(documents=list[Document]) 104 def run(self, documents: list[Document]) -> dict[str, list[Document]]: 105 """ 106 Cleans up the documents. 107 108 :param documents: List of Documents to clean. 109 110 :returns: A dictionary with the following key: 111 - `documents`: List of cleaned Documents. 112 113 :raises TypeError: if documents is not a list of Documents. 114 """ 115 if not isinstance(documents, list) or documents and not isinstance(documents[0], Document): 116 raise TypeError("DocumentCleaner expects a List of Documents as input.") 117 118 cleaned_docs = [] 119 for doc in documents: 120 if doc.content is None: 121 logger.warning( 122 "DocumentCleaner only cleans text documents but document.content for document ID" 123 " {document_id} is None.", 124 document_id=doc.id, 125 ) 126 cleaned_docs.append(doc) 127 continue 128 text = doc.content 129 130 if self.unicode_normalization: 131 text = self._normalize_unicode(text, self.unicode_normalization) 132 if self.ascii_only: 133 text = self._ascii_only(text) 134 if self.remove_extra_whitespaces: 135 text = self._remove_extra_whitespaces(text) 136 if self.remove_empty_lines: 137 text = self._remove_empty_lines(text) 138 if self.remove_substrings: 139 text = self._remove_substrings(text, self.remove_substrings) 140 if self.remove_regex: 141 text = self._remove_regex(text, self.remove_regex) 142 if self.replace_regexes: 143 text = self._replace_regexes(text, self.replace_regexes) 144 if self.remove_repeated_substrings: 145 text = self._remove_repeated_substrings(text) 146 if self.strip_whitespaces: 147 text = text.strip() 148 149 clean_doc = Document( 150 id=doc.id if self.keep_id else "", 151 content=text, 152 blob=doc.blob, 153 meta=deepcopy(doc.meta), 154 score=doc.score, 155 embedding=doc.embedding, 156 sparse_embedding=doc.sparse_embedding, 157 ) 158 cleaned_docs.append(clean_doc) 159 160 return {"documents": cleaned_docs} 161 162 def _normalize_unicode(self, text: str, form: Literal["NFC", "NFKC", "NFD", "NFKD"]) -> str: 163 """ 164 Normalize the unicode of the text. 165 166 :param text: Text to normalize. 167 :param form: Unicode normalization form to apply to the text. 168 Options: "NFC", "NFKC", "NFD", "NFKD". 169 :returns: The normalized text. 170 """ 171 return normalize(form, text) 172 173 def _ascii_only(self, text: str) -> str: 174 """ 175 Convert the text to ASCII only. 176 177 Will remove accents from characters and replace them with ASCII characters. 178 Other non-ASCII characters will be removed. 179 180 :param text: Text to convert to ASCII only. 181 :returns: The text in ASCII only. 182 """ 183 184 # First normalize the text to NFKD to separate the characters and their diacritics 185 # Then encode it to ASCII and ignore any characters that can't be encoded 186 return self._normalize_unicode(text, "NFKD").encode("ascii", "ignore").decode("utf-8") 187 188 def _remove_empty_lines(self, text: str) -> str: 189 """ 190 Remove empty lines and lines that contain nothing but whitespaces from text. 191 192 :param text: Text to clean. 193 :returns: The text without empty lines. 194 """ 195 pages = text.split("\f") 196 cleaned_pages = ["\n".join(line for line in page.split("\n") if line.strip()) for page in pages] 197 return "\f".join(cleaned_pages) 198 199 def _remove_extra_whitespaces(self, text: str) -> str: 200 """ 201 Remove extra whitespaces from text. 202 203 :param text: Text to clean. 204 :returns: The text without extra whitespaces. 205 """ 206 texts = text.split("\f") 207 cleaned_text = [re.sub(r"\s\s+", " ", text).strip() for text in texts] 208 return "\f".join(cleaned_text) 209 210 def _remove_regex(self, text: str, regex: str) -> str: 211 """ 212 Remove substrings that match the specified regex from the text. 213 214 :param text: Text to clean. 215 :param regex: Regex to match and replace substrings by "". 216 :returns: The text without the substrings that match the regex. 217 """ 218 texts = text.split("\f") 219 cleaned_text = [re.sub(regex, "", text).strip() for text in texts] 220 return "\f".join(cleaned_text) 221 222 def _replace_regexes(self, text: str, replace_regexes: dict[str, str]) -> str: 223 """ 224 Replace substrings that match the specified regex patterns with custom replacement strings. 225 226 :param text: Text to clean. 227 :param replace_regexes: A dictionary mapping regex patterns to their replacement strings. 228 :returns: The text with the regex matches replaced by the specified strings. 229 """ 230 pages = text.split("\f") 231 cleaned_pages = [] 232 for page in pages: 233 for pattern, replacement in replace_regexes.items(): 234 page = re.sub(pattern, replacement, page) 235 cleaned_pages.append(page) 236 return "\f".join(cleaned_pages) 237 238 def _remove_substrings(self, text: str, substrings: list[str]) -> str: 239 """ 240 Remove all specified substrings from the text. 241 242 :param text: Text to clean. 243 :param substrings: Substrings to remove. 244 :returns: The text without the specified substrings. 245 """ 246 for substring in substrings: 247 text = text.replace(substring, "") 248 return text 249 250 def _remove_repeated_substrings(self, text: str) -> str: 251 """ 252 Remove any substrings from the text that occur repeatedly on every page. For example headers or footers. 253 254 Pages in the text need to be separated by form feed character "\f". 255 :param text: Text to clean. 256 :returns: The text without the repeated substrings. 257 """ 258 return self._find_and_remove_header_footer( 259 text, n_chars=300, n_first_pages_to_ignore=1, n_last_pages_to_ignore=1 260 ) 261 262 def _find_and_remove_header_footer( 263 self, text: str, n_chars: int, n_first_pages_to_ignore: int, n_last_pages_to_ignore: int 264 ) -> str: 265 """ 266 Heuristic to find footers and headers across different pages by searching for the longest common string. 267 268 Pages in the text need to be separated by form feed character "\f". 269 For headers, we only search in the first n_chars characters (for footer: last n_chars). 270 Note: This heuristic uses exact matches and therefore works well for footers like "Copyright 2019 by XXX", 271 but won't detect "Page 3 of 4" or similar. 272 273 :param n_chars: The number of first/last characters where the header/footer shall be searched in. 274 :param n_first_pages_to_ignore: The number of first pages to ignore 275 (e.g. TOCs often don't contain footer/header). 276 :param n_last_pages_to_ignore: The number of last pages to ignore. 277 :returns: The text without the found headers and footers. 278 """ 279 280 pages = text.split("\f") 281 282 # header 283 start_of_pages = [p[:n_chars] for p in pages[n_first_pages_to_ignore:-n_last_pages_to_ignore]] 284 found_header = self._find_longest_common_ngram(start_of_pages) 285 if found_header: 286 pages = [page.replace(found_header, "") for page in pages] 287 288 # footer 289 end_of_pages = [p[-n_chars:] for p in pages[n_first_pages_to_ignore:-n_last_pages_to_ignore]] 290 found_footer = self._find_longest_common_ngram(end_of_pages) 291 if found_footer: 292 pages = [page.replace(found_footer, "") for page in pages] 293 294 logger.debug( 295 "Removed header '{header}' and footer '{footer}' in document", header=found_header, footer=found_footer 296 ) 297 return "\f".join(pages) 298 299 def _ngram(self, seq: str, n: int) -> Generator[str, None, None]: 300 """ 301 Return all ngrams of length n from a text sequence. Each ngram consists of n words split by whitespace. 302 303 :param seq: The sequence to generate ngrams from. 304 :param n: The length of the ngrams to generate. 305 :returns: A Generator generating all ngrams of length n from the given sequence. 306 """ 307 308 # In order to maintain the original whitespace, but still consider \n and \t for n-gram tokenization, 309 # we add a space here and remove it after creation of the ngrams again (see below) 310 seq = seq.replace("\n", " \n") 311 seq = seq.replace("\t", " \t") 312 313 words = seq.split(" ") 314 return (" ".join(words[i : i + n]).replace(" \n", "\n").replace(" \t", "\t") for i in range(len(words) - n + 1)) 315 316 def _allngram(self, seq: str, min_ngram: int, max_ngram: int) -> set[str]: 317 """ 318 Generates all possible ngrams from a given sequence of text. 319 320 Considering all ngram lengths between the minimum and maximum length. 321 322 :param seq: The sequence to generate ngrams from. 323 :param min_ngram: The minimum length of ngram to consider. 324 :param max_ngram: The maximum length of ngram to consider. 325 :returns: A set of all ngrams from the given sequence. 326 """ 327 lengths = range(min_ngram, max_ngram) if max_ngram else range(min_ngram, len(seq)) 328 ngrams = map(partial(self._ngram, seq), lengths) 329 return set(chain.from_iterable(ngrams)) 330 331 def _find_longest_common_ngram(self, sequences: list[str], min_ngram: int = 3, max_ngram: int = 30) -> str: 332 """ 333 Find the longest common ngram across a list of text sequences (e.g. start of pages). 334 335 Considering all ngram lengths between the minimum and maximum length. Helpful for finding footers, headers etc. 336 Empty sequences are ignored. 337 338 :param sequences: The list of strings that shall be searched for common n_grams. 339 :param max_ngram: The maximum length of ngram to consider. 340 :param min_ngram: The minimum length of ngram to consider. 341 :returns: The longest ngram that all sequences have in common. 342 """ 343 sequences = [s for s in sequences if s] # filter empty sequences 344 if not sequences: 345 return "" 346 seqs_ngrams = map(partial(self._allngram, min_ngram=min_ngram, max_ngram=max_ngram), sequences) 347 intersection = reduce(set.intersection, seqs_ngrams) 348 349 longest = max(intersection, key=len, default="") 350 return longest if longest.strip() else ""