/ haystack / components / preprocessors / document_cleaner.py
document_cleaner.py
  1  # SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
  2  #
  3  # SPDX-License-Identifier: Apache-2.0
  4  
  5  import re
  6  from collections.abc import Generator
  7  from copy import deepcopy
  8  from functools import partial, reduce
  9  from itertools import chain
 10  from typing import Literal
 11  from unicodedata import normalize
 12  
 13  from haystack import Document, component, logging
 14  
 15  logger = logging.getLogger(__name__)
 16  
 17  
 18  @component
 19  class DocumentCleaner:
 20      """
 21      Cleans the text in the documents.
 22  
 23      It removes extra whitespaces,
 24      empty lines, specified substrings, regexes,
 25      page headers and footers (in this order).
 26  
 27      ### Usage example:
 28  
 29      ```python
 30      from haystack import Document
 31      from haystack.components.preprocessors import DocumentCleaner
 32  
 33      doc = Document(content="This   is  a  document  to  clean\\n\\n\\nsubstring to remove")
 34  
 35      cleaner = DocumentCleaner(remove_substrings = ["substring to remove"])
 36      result = cleaner.run(documents=[doc])
 37  
 38      assert result["documents"][0].content == "This is a document to clean "
 39      ```
 40      """
 41  
 42      def __init__(
 43          self,
 44          remove_empty_lines: bool = True,
 45          remove_extra_whitespaces: bool = True,
 46          remove_repeated_substrings: bool = False,
 47          keep_id: bool = False,
 48          remove_substrings: list[str] | None = None,
 49          remove_regex: str | None = None,
 50          unicode_normalization: Literal["NFC", "NFKC", "NFD", "NFKD"] | None = None,
 51          ascii_only: bool = False,
 52          strip_whitespaces: bool = False,
 53          replace_regexes: dict[str, str] | None = None,
 54      ) -> None:
 55          """
 56          Initialize DocumentCleaner.
 57  
 58          :param remove_empty_lines: If `True`, removes empty lines.
 59          :param remove_extra_whitespaces: If `True`, removes extra whitespaces.
 60          :param remove_repeated_substrings: If `True`, removes repeated substrings (headers and footers) from pages.
 61              Pages must be separated by a form feed character "\\f",
 62              which is supported by `TextFileToDocument` and `AzureOCRDocumentConverter`.
 63          :param remove_substrings: List of substrings to remove from the text.
 64          :param remove_regex: Regex to match and replace substrings by "".
 65          :param keep_id: If `True`, keeps the IDs of the original documents.
 66          :param unicode_normalization: Unicode normalization form to apply to the text.
 67              Note: This will run before any other steps.
 68          :param ascii_only: Whether to convert the text to ASCII only.
 69              Will remove accents from characters and replace them with ASCII characters.
 70              Other non-ASCII characters will be removed.
 71              Note: This will run before any pattern matching or removal.
 72          :param strip_whitespaces: If `True`, removes leading and trailing whitespace from the document content
 73              using Python's `str.strip()`. Unlike `remove_extra_whitespaces`, this only affects the beginning
 74              and end of the text, preserving internal whitespace (useful for markdown formatting).
 75          :param replace_regexes: A dictionary mapping regex patterns to their replacement strings.
 76              For example, `{r'\\n\\n+': '\\n'}` replaces multiple consecutive newlines with a single newline.
 77              This is applied after `remove_regex` and allows custom replacements instead of just removal.
 78          """
 79  
 80          self._validate_params(unicode_normalization=unicode_normalization)
 81  
 82          self.remove_empty_lines = remove_empty_lines
 83          self.remove_extra_whitespaces = remove_extra_whitespaces
 84          self.remove_repeated_substrings = remove_repeated_substrings
 85          self.remove_substrings = remove_substrings
 86          self.remove_regex = remove_regex
 87          self.keep_id = keep_id
 88          self.unicode_normalization = unicode_normalization
 89          self.ascii_only = ascii_only
 90          self.strip_whitespaces = strip_whitespaces
 91          self.replace_regexes = replace_regexes
 92  
 93      def _validate_params(self, unicode_normalization: str | None) -> None:
 94          """
 95          Validate the parameters of the DocumentCleaner.
 96  
 97          :param unicode_normalization: Unicode normalization form to apply to the text.
 98          :raises ValueError: if the parameters are not valid.
 99          """
100          if unicode_normalization and unicode_normalization not in ["NFC", "NFKC", "NFD", "NFKD"]:
101              raise ValueError("unicode_normalization must be one of 'NFC', 'NFKC', 'NFD', 'NFKD'.")
102  
103      @component.output_types(documents=list[Document])
104      def run(self, documents: list[Document]) -> dict[str, list[Document]]:
105          """
106          Cleans up the documents.
107  
108          :param documents: List of Documents to clean.
109  
110          :returns: A dictionary with the following key:
111              - `documents`: List of cleaned Documents.
112  
113          :raises TypeError: if documents is not a list of Documents.
114          """
115          if not isinstance(documents, list) or documents and not isinstance(documents[0], Document):
116              raise TypeError("DocumentCleaner expects a List of Documents as input.")
117  
118          cleaned_docs = []
119          for doc in documents:
120              if doc.content is None:
121                  logger.warning(
122                      "DocumentCleaner only cleans text documents but document.content for document ID"
123                      " {document_id} is None.",
124                      document_id=doc.id,
125                  )
126                  cleaned_docs.append(doc)
127                  continue
128              text = doc.content
129  
130              if self.unicode_normalization:
131                  text = self._normalize_unicode(text, self.unicode_normalization)
132              if self.ascii_only:
133                  text = self._ascii_only(text)
134              if self.remove_extra_whitespaces:
135                  text = self._remove_extra_whitespaces(text)
136              if self.remove_empty_lines:
137                  text = self._remove_empty_lines(text)
138              if self.remove_substrings:
139                  text = self._remove_substrings(text, self.remove_substrings)
140              if self.remove_regex:
141                  text = self._remove_regex(text, self.remove_regex)
142              if self.replace_regexes:
143                  text = self._replace_regexes(text, self.replace_regexes)
144              if self.remove_repeated_substrings:
145                  text = self._remove_repeated_substrings(text)
146              if self.strip_whitespaces:
147                  text = text.strip()
148  
149              clean_doc = Document(
150                  id=doc.id if self.keep_id else "",
151                  content=text,
152                  blob=doc.blob,
153                  meta=deepcopy(doc.meta),
154                  score=doc.score,
155                  embedding=doc.embedding,
156                  sparse_embedding=doc.sparse_embedding,
157              )
158              cleaned_docs.append(clean_doc)
159  
160          return {"documents": cleaned_docs}
161  
162      def _normalize_unicode(self, text: str, form: Literal["NFC", "NFKC", "NFD", "NFKD"]) -> str:
163          """
164          Normalize the unicode of the text.
165  
166          :param text: Text to normalize.
167          :param form: Unicode normalization form to apply to the text.
168              Options: "NFC", "NFKC", "NFD", "NFKD".
169          :returns: The normalized text.
170          """
171          return normalize(form, text)
172  
173      def _ascii_only(self, text: str) -> str:
174          """
175          Convert the text to ASCII only.
176  
177          Will remove accents from characters and replace them with ASCII characters.
178          Other non-ASCII characters will be removed.
179  
180          :param text: Text to convert to ASCII only.
181          :returns: The text in ASCII only.
182          """
183  
184          # First normalize the text to NFKD to separate the characters and their diacritics
185          # Then encode it to ASCII and ignore any characters that can't be encoded
186          return self._normalize_unicode(text, "NFKD").encode("ascii", "ignore").decode("utf-8")
187  
188      def _remove_empty_lines(self, text: str) -> str:
189          """
190          Remove empty lines and lines that contain nothing but whitespaces from text.
191  
192          :param text: Text to clean.
193          :returns: The text without empty lines.
194          """
195          pages = text.split("\f")
196          cleaned_pages = ["\n".join(line for line in page.split("\n") if line.strip()) for page in pages]
197          return "\f".join(cleaned_pages)
198  
199      def _remove_extra_whitespaces(self, text: str) -> str:
200          """
201          Remove extra whitespaces from text.
202  
203          :param text: Text to clean.
204          :returns: The text without extra whitespaces.
205          """
206          texts = text.split("\f")
207          cleaned_text = [re.sub(r"\s\s+", " ", text).strip() for text in texts]
208          return "\f".join(cleaned_text)
209  
210      def _remove_regex(self, text: str, regex: str) -> str:
211          """
212          Remove substrings that match the specified regex from the text.
213  
214          :param text: Text to clean.
215          :param regex: Regex to match and replace substrings by "".
216          :returns: The text without the substrings that match the regex.
217          """
218          texts = text.split("\f")
219          cleaned_text = [re.sub(regex, "", text).strip() for text in texts]
220          return "\f".join(cleaned_text)
221  
222      def _replace_regexes(self, text: str, replace_regexes: dict[str, str]) -> str:
223          """
224          Replace substrings that match the specified regex patterns with custom replacement strings.
225  
226          :param text: Text to clean.
227          :param replace_regexes: A dictionary mapping regex patterns to their replacement strings.
228          :returns: The text with the regex matches replaced by the specified strings.
229          """
230          pages = text.split("\f")
231          cleaned_pages = []
232          for page in pages:
233              for pattern, replacement in replace_regexes.items():
234                  page = re.sub(pattern, replacement, page)
235              cleaned_pages.append(page)
236          return "\f".join(cleaned_pages)
237  
238      def _remove_substrings(self, text: str, substrings: list[str]) -> str:
239          """
240          Remove all specified substrings from the text.
241  
242          :param text: Text to clean.
243          :param substrings: Substrings to remove.
244          :returns: The text without the specified substrings.
245          """
246          for substring in substrings:
247              text = text.replace(substring, "")
248          return text
249  
250      def _remove_repeated_substrings(self, text: str) -> str:
251          """
252          Remove any substrings from the text that occur repeatedly on every page. For example headers or footers.
253  
254          Pages in the text need to be separated by form feed character "\f".
255          :param text: Text to clean.
256          :returns: The text without the repeated substrings.
257          """
258          return self._find_and_remove_header_footer(
259              text, n_chars=300, n_first_pages_to_ignore=1, n_last_pages_to_ignore=1
260          )
261  
262      def _find_and_remove_header_footer(
263          self, text: str, n_chars: int, n_first_pages_to_ignore: int, n_last_pages_to_ignore: int
264      ) -> str:
265          """
266          Heuristic to find footers and headers across different pages by searching for the longest common string.
267  
268          Pages in the text need to be separated by form feed character "\f".
269          For headers, we only search in the first n_chars characters (for footer: last n_chars).
270          Note: This heuristic uses exact matches and therefore works well for footers like "Copyright 2019 by XXX",
271           but won't detect "Page 3 of 4" or similar.
272  
273          :param n_chars: The number of first/last characters where the header/footer shall be searched in.
274          :param n_first_pages_to_ignore: The number of first pages to ignore
275              (e.g. TOCs often don't contain footer/header).
276          :param n_last_pages_to_ignore: The number of last pages to ignore.
277          :returns: The text without the found headers and footers.
278          """
279  
280          pages = text.split("\f")
281  
282          # header
283          start_of_pages = [p[:n_chars] for p in pages[n_first_pages_to_ignore:-n_last_pages_to_ignore]]
284          found_header = self._find_longest_common_ngram(start_of_pages)
285          if found_header:
286              pages = [page.replace(found_header, "") for page in pages]
287  
288          # footer
289          end_of_pages = [p[-n_chars:] for p in pages[n_first_pages_to_ignore:-n_last_pages_to_ignore]]
290          found_footer = self._find_longest_common_ngram(end_of_pages)
291          if found_footer:
292              pages = [page.replace(found_footer, "") for page in pages]
293  
294          logger.debug(
295              "Removed header '{header}' and footer '{footer}' in document", header=found_header, footer=found_footer
296          )
297          return "\f".join(pages)
298  
299      def _ngram(self, seq: str, n: int) -> Generator[str, None, None]:
300          """
301          Return all ngrams of length n from a text sequence. Each ngram consists of n words split by whitespace.
302  
303          :param seq: The sequence to generate ngrams from.
304          :param n: The length of the ngrams to generate.
305          :returns: A Generator generating all ngrams of length n from the given sequence.
306          """
307  
308          # In order to maintain the original whitespace, but still consider \n and \t for n-gram tokenization,
309          # we add a space here and remove it after creation of the ngrams again (see below)
310          seq = seq.replace("\n", " \n")
311          seq = seq.replace("\t", " \t")
312  
313          words = seq.split(" ")
314          return (" ".join(words[i : i + n]).replace(" \n", "\n").replace(" \t", "\t") for i in range(len(words) - n + 1))
315  
316      def _allngram(self, seq: str, min_ngram: int, max_ngram: int) -> set[str]:
317          """
318          Generates all possible ngrams from a given sequence of text.
319  
320          Considering all ngram lengths between the minimum and maximum length.
321  
322          :param seq: The sequence to generate ngrams from.
323          :param min_ngram: The minimum length of ngram to consider.
324          :param max_ngram: The maximum length of ngram to consider.
325          :returns: A set of all ngrams from the given sequence.
326          """
327          lengths = range(min_ngram, max_ngram) if max_ngram else range(min_ngram, len(seq))
328          ngrams = map(partial(self._ngram, seq), lengths)
329          return set(chain.from_iterable(ngrams))
330  
331      def _find_longest_common_ngram(self, sequences: list[str], min_ngram: int = 3, max_ngram: int = 30) -> str:
332          """
333          Find the longest common ngram across a list of text sequences (e.g. start of pages).
334  
335          Considering all ngram lengths between the minimum and maximum length. Helpful for finding footers, headers etc.
336          Empty sequences are ignored.
337  
338          :param sequences: The list of strings that shall be searched for common n_grams.
339          :param max_ngram: The maximum length of ngram to consider.
340          :param min_ngram: The minimum length of ngram to consider.
341          :returns: The longest ngram that all sequences have in common.
342          """
343          sequences = [s for s in sequences if s]  # filter empty sequences
344          if not sequences:
345              return ""
346          seqs_ngrams = map(partial(self._allngram, min_ngram=min_ngram, max_ngram=max_ngram), sequences)
347          intersection = reduce(set.intersection, seqs_ngrams)
348  
349          longest = max(intersection, key=len, default="")
350          return longest if longest.strip() else ""