/ haystack / components / preprocessors / document_splitter.py
document_splitter.py
  1  # SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
  2  #
  3  # SPDX-License-Identifier: Apache-2.0
  4  
  5  from collections.abc import Callable
  6  from copy import deepcopy
  7  from typing import Any, Literal
  8  
  9  from more_itertools import windowed
 10  
 11  from haystack import Document, component, logging
 12  from haystack.components.preprocessors.sentence_tokenizer import Language, SentenceSplitter, nltk_imports
 13  from haystack.core.serialization import default_from_dict, default_to_dict
 14  from haystack.utils import deserialize_callable, serialize_callable
 15  
 16  logger = logging.getLogger(__name__)
 17  
 18  # mapping of split by character, 'function' and 'sentence' don't split by character
 19  _CHARACTER_SPLIT_BY_MAPPING = {"page": "\f", "passage": "\n\n", "period": ".", "word": " ", "line": "\n"}
 20  
 21  
 22  @component
 23  class DocumentSplitter:
 24      """
 25      Splits long documents into smaller chunks.
 26  
 27      This is a common preprocessing step during indexing. It helps Embedders create meaningful semantic representations
 28      and prevents exceeding language model context limits.
 29  
 30      The DocumentSplitter is compatible with the following DocumentStores:
 31      - [Astra](https://docs.haystack.deepset.ai/docs/astradocumentstore)
 32      - [Chroma](https://docs.haystack.deepset.ai/docs/chromadocumentstore) limited support, overlapping information is
 33        not stored
 34      - [Elasticsearch](https://docs.haystack.deepset.ai/docs/elasticsearch-document-store)
 35      - [OpenSearch](https://docs.haystack.deepset.ai/docs/opensearch-document-store)
 36      - [Pgvector](https://docs.haystack.deepset.ai/docs/pgvectordocumentstore)
 37      - [Pinecone](https://docs.haystack.deepset.ai/docs/pinecone-document-store) limited support, overlapping
 38         information is not stored
 39      - [Qdrant](https://docs.haystack.deepset.ai/docs/qdrant-document-store)
 40      - [Weaviate](https://docs.haystack.deepset.ai/docs/weaviatedocumentstore)
 41  
 42      ### Usage example
 43  
 44      ```python
 45      from haystack import Document
 46      from haystack.components.preprocessors import DocumentSplitter
 47  
 48      doc = Document(content="Moonlight shimmered softly, wolves howled nearby, night enveloped everything.")
 49  
 50      splitter = DocumentSplitter(split_by="word", split_length=3, split_overlap=0)
 51      result = splitter.run(documents=[doc])
 52      ```
 53      """
 54  
 55      def __init__(
 56          self,
 57          split_by: Literal["function", "page", "passage", "period", "word", "line", "sentence"] = "word",
 58          split_length: int = 200,
 59          split_overlap: int = 0,
 60          split_threshold: int = 0,
 61          splitting_function: Callable[[str], list[str]] | None = None,
 62          respect_sentence_boundary: bool = False,
 63          language: Language = "en",
 64          use_split_rules: bool = True,
 65          extend_abbreviations: bool = True,
 66          *,
 67          skip_empty_documents: bool = True,
 68      ) -> None:
 69          """
 70          Initialize DocumentSplitter.
 71  
 72          :param split_by: The unit for splitting your documents. Choose from:
 73              - `word` for splitting by spaces (" ")
 74              - `period` for splitting by periods (".")
 75              - `page` for splitting by form feed ("\\f")
 76              - `passage` for splitting by double line breaks ("\\n\\n")
 77              - `line` for splitting each line ("\\n")
 78              - `sentence` for splitting by NLTK sentence tokenizer
 79  
 80          :param split_length: The maximum number of units in each split.
 81          :param split_overlap: The number of overlapping units for each split.
 82          :param split_threshold: The minimum number of units per split. If a split has fewer units
 83              than the threshold, it's attached to the previous split.
 84          :param splitting_function: Necessary when `split_by` is set to "function".
 85              This is a function which must accept a single `str` as input and return a `list` of `str` as output,
 86              representing the chunks after splitting.
 87          :param respect_sentence_boundary: Choose whether to respect sentence boundaries when splitting by "word".
 88              If True, uses NLTK to detect sentence boundaries, ensuring splits occur only between sentences.
 89          :param language: Choose the language for the NLTK tokenizer. The default is English ("en").
 90          :param use_split_rules: Choose whether to use additional split rules when splitting by `sentence`.
 91          :param extend_abbreviations: Choose whether to extend NLTK's PunktTokenizer abbreviations with a list
 92              of curated abbreviations, if available. This is currently supported for English ("en") and German ("de").
 93          :param skip_empty_documents: Choose whether to skip documents with empty content. Default is True.
 94              Set to False when downstream components in the Pipeline (like LLMDocumentContentExtractor) can extract text
 95              from non-textual documents.
 96          """
 97  
 98          self.split_by = split_by
 99          self.split_length = split_length
100          self.split_overlap = split_overlap
101          self.split_threshold = split_threshold
102          self.splitting_function = splitting_function
103          self.respect_sentence_boundary = respect_sentence_boundary
104          self.language = language
105          self.use_split_rules = use_split_rules
106          self.extend_abbreviations = extend_abbreviations
107          self.skip_empty_documents = skip_empty_documents
108  
109          self._init_checks(
110              split_by=split_by,
111              split_length=split_length,
112              split_overlap=split_overlap,
113              splitting_function=splitting_function,
114              respect_sentence_boundary=respect_sentence_boundary,
115          )
116          self._use_sentence_splitter = split_by == "sentence" or (respect_sentence_boundary and split_by == "word")
117          if self._use_sentence_splitter:
118              nltk_imports.check()
119              self.sentence_splitter: SentenceSplitter | None = None
120  
121      def _init_checks(
122          self,
123          *,
124          split_by: str,
125          split_length: int,
126          split_overlap: int,
127          splitting_function: Callable | None,
128          respect_sentence_boundary: bool,
129      ) -> None:
130          """
131          Validates initialization parameters for DocumentSplitter.
132  
133          :param split_by: The unit for splitting documents
134          :param split_length: The maximum number of units in each split
135          :param split_overlap: The number of overlapping units for each split
136          :param splitting_function: Custom function for splitting when split_by="function"
137          :param respect_sentence_boundary: Whether to respect sentence boundaries when splitting
138          :raises ValueError: If any parameter is invalid
139          """
140          valid_split_by = ["function", "page", "passage", "period", "word", "line", "sentence"]
141          if split_by not in valid_split_by:
142              raise ValueError(f"split_by must be one of {', '.join(valid_split_by)}.")
143  
144          if split_by == "function" and splitting_function is None:
145              raise ValueError("When 'split_by' is set to 'function', a valid 'splitting_function' must be provided.")
146  
147          if split_length <= 0:
148              raise ValueError("split_length must be greater than 0.")
149  
150          if split_overlap < 0:
151              raise ValueError("split_overlap must be greater than or equal to 0.")
152  
153          if respect_sentence_boundary and split_by != "word":
154              logger.warning(
155                  "The 'respect_sentence_boundary' option is only supported for `split_by='word'`. "
156                  "The option `respect_sentence_boundary` will be set to `False`."
157              )
158              self.respect_sentence_boundary = False
159  
160      def warm_up(self) -> None:
161          """
162          Warm up the DocumentSplitter by loading the sentence tokenizer.
163          """
164          if self._use_sentence_splitter and self.sentence_splitter is None:
165              self.sentence_splitter = SentenceSplitter(
166                  language=self.language,
167                  use_split_rules=self.use_split_rules,
168                  extend_abbreviations=self.extend_abbreviations,
169                  keep_white_spaces=True,
170              )
171  
172      @component.output_types(documents=list[Document])
173      def run(self, documents: list[Document]) -> dict[str, list[Document]]:
174          """
175          Split documents into smaller parts.
176  
177          Splits documents by the unit expressed in `split_by`, with a length of `split_length`
178          and an overlap of `split_overlap`.
179  
180          :param documents: The documents to split.
181          :returns: A dictionary with the following key:
182              - `documents`: List of documents with the split texts. Each document includes:
183                  - A metadata field `source_id` to track the original document.
184                  - A metadata field `page_number` to track the original page number.
185                  - All other metadata copied from the original document.
186  
187          :raises TypeError: if the input is not a list of Documents.
188          :raises ValueError: if the content of a document is None.
189          """
190          if self._use_sentence_splitter and self.sentence_splitter is None:
191              self.warm_up()
192  
193          if not isinstance(documents, list) or (documents and not isinstance(documents[0], Document)):
194              raise TypeError("DocumentSplitter expects a List of Documents as input.")
195  
196          split_docs: list[Document] = []
197          for doc in documents:
198              if doc.content is None:
199                  raise ValueError(
200                      f"DocumentSplitter only works with text documents but content for document ID {doc.id} is None."
201                  )
202              if doc.content == "" and self.skip_empty_documents:
203                  logger.warning("Document ID {doc_id} has an empty content. Skipping this document.", doc_id=doc.id)
204                  continue
205  
206              split_docs += self._split_document(doc)
207          return {"documents": split_docs}
208  
209      def _split_document(self, doc: Document) -> list[Document]:
210          if self.split_by == "sentence" or self.respect_sentence_boundary:
211              return self._split_by_nltk_sentence(doc)
212  
213          if self.split_by == "function" and self.splitting_function is not None:
214              return self._split_by_function(doc)
215  
216          return self._split_by_character(doc)
217  
218      def _split_by_nltk_sentence(self, doc: Document) -> list[Document]:
219          split_docs = []
220  
221          result = self.sentence_splitter.split_sentences(doc.content)  # type: ignore # None check is done in run()
222          units = [sentence["sentence"] for sentence in result]
223  
224          if self.respect_sentence_boundary:
225              text_splits, splits_pages, splits_start_idxs = self._concatenate_sentences_based_on_word_amount(
226                  sentences=units, split_length=self.split_length, split_overlap=self.split_overlap
227              )
228          else:
229              text_splits, splits_pages, splits_start_idxs = self._concatenate_units(
230                  elements=units,
231                  split_length=self.split_length,
232                  split_overlap=self.split_overlap,
233                  split_threshold=self.split_threshold,
234              )
235          metadata = deepcopy(doc.meta)
236          metadata["source_id"] = doc.id
237          split_docs += self._create_docs_from_splits(
238              text_splits=text_splits, splits_pages=splits_pages, splits_start_idxs=splits_start_idxs, meta=metadata
239          )
240  
241          return split_docs
242  
243      def _split_by_character(self, doc: Document) -> list[Document]:
244          split_at = _CHARACTER_SPLIT_BY_MAPPING[self.split_by]
245          units = doc.content.split(split_at)  # type: ignore[union-attr]
246          # Add the delimiter back to all units except the last one
247          for i in range(len(units) - 1):
248              units[i] += split_at
249          text_splits, splits_pages, splits_start_idxs = self._concatenate_units(
250              units, self.split_length, self.split_overlap, self.split_threshold
251          )
252          metadata = deepcopy(doc.meta)
253          metadata["source_id"] = doc.id
254          return self._create_docs_from_splits(
255              text_splits=text_splits, splits_pages=splits_pages, splits_start_idxs=splits_start_idxs, meta=metadata
256          )
257  
258      def _split_by_function(self, doc: Document) -> list[Document]:
259          # the check for None is done already in the run method
260          splits = self.splitting_function(doc.content)  # type: ignore
261          docs: list[Document] = []
262          for s in splits:
263              meta = deepcopy(doc.meta)
264              meta["source_id"] = doc.id
265              docs.append(Document(content=s, meta=meta))
266          return docs
267  
268      def _concatenate_units(
269          self, elements: list[str], split_length: int, split_overlap: int, split_threshold: int
270      ) -> tuple[list[str], list[int], list[int]]:
271          """
272          Concatenates the elements into parts of split_length units.
273  
274          Keeps track of the original page number that each element belongs. If the length of the current units is less
275          than the pre-defined `split_threshold`, it does not create a new split. Instead, it concatenates the current
276          units with the last split, preventing the creation of excessively small splits.
277          """
278  
279          text_splits: list[str] = []
280          splits_pages: list[int] = []
281          splits_start_idxs: list[int] = []
282          cur_start_idx = 0
283          cur_page = 1
284          segments = windowed(elements, n=split_length, step=split_length - split_overlap)
285  
286          for seg in segments:
287              current_units = [unit for unit in seg if unit is not None]
288              txt = "".join(current_units)
289  
290              # check if length of current units is below split_threshold
291              if len(current_units) < split_threshold and len(text_splits) > 0:
292                  # concatenate the last split with the current one
293                  text_splits[-1] += txt
294  
295              # NOTE: If skip_empty_documents is True, this line skips documents that have content=""
296              elif not self.skip_empty_documents or len(txt) > 0:
297                  text_splits.append(txt)
298                  splits_pages.append(cur_page)
299                  splits_start_idxs.append(cur_start_idx)
300  
301              processed_units = current_units[: split_length - split_overlap]
302              cur_start_idx += len("".join(processed_units))
303  
304              if self.split_by == "page":
305                  num_page_breaks = len(processed_units)
306              else:
307                  num_page_breaks = sum(processed_unit.count("\f") for processed_unit in processed_units)
308  
309              cur_page += num_page_breaks
310  
311          return text_splits, splits_pages, splits_start_idxs
312  
313      def _create_docs_from_splits(
314          self, text_splits: list[str], splits_pages: list[int], splits_start_idxs: list[int], meta: dict[str, Any]
315      ) -> list[Document]:
316          """
317          Creates Document objects from splits enriching them with page number and the metadata of the original document.
318          """
319          documents: list[Document] = []
320  
321          for i, (txt, split_idx) in enumerate(zip(text_splits, splits_start_idxs, strict=True)):
322              copied_meta = deepcopy(meta)
323              copied_meta["page_number"] = splits_pages[i]
324              copied_meta["split_id"] = i
325              copied_meta["split_idx_start"] = split_idx
326              doc = Document(content=txt, meta=copied_meta)
327              documents.append(doc)
328  
329              if self.split_overlap <= 0:
330                  continue
331  
332              doc.meta["_split_overlap"] = []
333  
334              if i == 0:
335                  continue
336  
337              doc_start_idx = splits_start_idxs[i]
338              previous_doc = documents[i - 1]
339              previous_doc_start_idx = splits_start_idxs[i - 1]
340              self._add_split_overlap_information(doc, doc_start_idx, previous_doc, previous_doc_start_idx)
341  
342          return documents
343  
344      @staticmethod
345      def _add_split_overlap_information(
346          current_doc: Document, current_doc_start_idx: int, previous_doc: Document, previous_doc_start_idx: int
347      ) -> None:
348          """
349          Adds split overlap information to the current and previous Document's meta.
350  
351          :param current_doc: The Document that is being split.
352          :param current_doc_start_idx: The starting index of the current Document.
353          :param previous_doc: The Document that was split before the current Document.
354          :param previous_doc_start_idx: The starting index of the previous Document.
355          """
356          overlapping_range = (current_doc_start_idx - previous_doc_start_idx, len(previous_doc.content))  # type: ignore
357  
358          if overlapping_range[0] < overlapping_range[1]:
359              overlapping_str = previous_doc.content[overlapping_range[0] : overlapping_range[1]]  # type: ignore
360  
361              if current_doc.content.startswith(overlapping_str):  # type: ignore
362                  # add split overlap information to this Document regarding the previous Document
363                  current_doc.meta["_split_overlap"].append({"doc_id": previous_doc.id, "range": overlapping_range})
364  
365                  # add split overlap information to previous Document regarding this Document
366                  overlapping_range = (0, overlapping_range[1] - overlapping_range[0])
367                  previous_doc.meta["_split_overlap"].append({"doc_id": current_doc.id, "range": overlapping_range})
368  
369      def to_dict(self) -> dict[str, Any]:
370          """
371          Serializes the component to a dictionary.
372          """
373          serialized = default_to_dict(
374              self,
375              split_by=self.split_by,
376              split_length=self.split_length,
377              split_overlap=self.split_overlap,
378              split_threshold=self.split_threshold,
379              respect_sentence_boundary=self.respect_sentence_boundary,
380              language=self.language,
381              use_split_rules=self.use_split_rules,
382              extend_abbreviations=self.extend_abbreviations,
383              skip_empty_documents=self.skip_empty_documents,
384          )
385          if self.splitting_function:
386              serialized["init_parameters"]["splitting_function"] = serialize_callable(self.splitting_function)
387          return serialized
388  
389      @classmethod
390      def from_dict(cls, data: dict[str, Any]) -> "DocumentSplitter":
391          """
392          Deserializes the component from a dictionary.
393          """
394          init_params = data.get("init_parameters", {})
395  
396          splitting_function = init_params.get("splitting_function", None)
397          if splitting_function:
398              init_params["splitting_function"] = deserialize_callable(splitting_function)
399  
400          return default_from_dict(cls, data)
401  
402      @staticmethod
403      def _concatenate_sentences_based_on_word_amount(
404          sentences: list[str], split_length: int, split_overlap: int
405      ) -> tuple[list[str], list[int], list[int]]:
406          """
407          Groups the sentences into chunks of `split_length` words while respecting sentence boundaries.
408  
409          This function is only used when splitting by `word` and `respect_sentence_boundary` is set to `True`, i.e.:
410          with NLTK sentence tokenizer.
411  
412          :param sentences: The list of sentences to split.
413          :param split_length: The maximum number of words in each split.
414          :param split_overlap: The number of overlapping words in each split.
415          :returns: A tuple containing the concatenated sentences, the start page numbers, and the start indices.
416          """
417          # chunk information
418          chunk_word_count = 0
419          chunk_starting_page_number = 1
420          chunk_start_idx = 0
421          current_chunk: list[str] = []
422          # output lists
423          split_start_page_numbers = []
424          list_of_splits: list[list[str]] = []
425          split_start_indices = []
426  
427          for sentence_idx, sentence in enumerate(sentences):
428              current_chunk.append(sentence)
429              chunk_word_count += len(sentence.split())
430              next_sentence_word_count = (
431                  len(sentences[sentence_idx + 1].split()) if sentence_idx < len(sentences) - 1 else 0
432              )
433  
434              # Number of words in the current chunk plus the next sentence is larger than the split_length,
435              # or we reached the last sentence
436              if (chunk_word_count + next_sentence_word_count) > split_length or sentence_idx == len(sentences) - 1:
437                  #  Save current chunk and start a new one
438                  list_of_splits.append(current_chunk)
439                  split_start_page_numbers.append(chunk_starting_page_number)
440                  split_start_indices.append(chunk_start_idx)
441  
442                  # Get the number of sentences that overlap with the next chunk
443                  num_sentences_to_keep = DocumentSplitter._number_of_sentences_to_keep(
444                      sentences=current_chunk, split_length=split_length, split_overlap=split_overlap
445                  )
446                  # Set up information for the new chunk
447                  if num_sentences_to_keep > 0:
448                      # Processed sentences are the ones that are not overlapping with the next chunk
449                      processed_sentences = current_chunk[:-num_sentences_to_keep]
450                      chunk_starting_page_number += sum(sent.count("\f") for sent in processed_sentences)
451                      chunk_start_idx += len("".join(processed_sentences))
452                      # Next chunk starts with the sentences that were overlapping with the previous chunk
453                      current_chunk = current_chunk[-num_sentences_to_keep:]
454                      chunk_word_count = sum(len(s.split()) for s in current_chunk)
455                  else:
456                      # Here processed_sentences is the same as current_chunk since there is no overlap
457                      chunk_starting_page_number += sum(sent.count("\f") for sent in current_chunk)
458                      chunk_start_idx += len("".join(current_chunk))
459                      current_chunk = []
460                      chunk_word_count = 0
461  
462          # Concatenate the sentences together within each split
463          text_splits = []
464          for split in list_of_splits:
465              text = "".join(split)
466              if len(text) > 0:
467                  text_splits.append(text)
468  
469          return text_splits, split_start_page_numbers, split_start_indices
470  
471      @staticmethod
472      def _number_of_sentences_to_keep(sentences: list[str], split_length: int, split_overlap: int) -> int:
473          """
474          Returns the number of sentences to keep in the next chunk based on the `split_overlap` and `split_length`.
475  
476          :param sentences: The list of sentences to split.
477          :param split_length: The maximum number of words in each split.
478          :param split_overlap: The number of overlapping words in each split.
479          :returns: The number of sentences to keep in the next chunk.
480          """
481          # If the split_overlap is 0, we don't need to keep any sentences
482          if split_overlap == 0:
483              return 0
484  
485          num_sentences_to_keep = 0
486          num_words = 0
487          # Next overlapping Document should not start exactly the same as the previous one, so we skip the first sentence
488          for sent in reversed(sentences[1:]):
489              num_words += len(sent.split())
490              # If the number of words is larger than the split_length then don't add any more sentences
491              if num_words > split_length:
492                  break
493              num_sentences_to_keep += 1
494              if num_words > split_overlap:
495                  break
496          return num_sentences_to_keep