document_splitter.py
1 # SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai> 2 # 3 # SPDX-License-Identifier: Apache-2.0 4 5 from collections.abc import Callable 6 from copy import deepcopy 7 from typing import Any, Literal 8 9 from more_itertools import windowed 10 11 from haystack import Document, component, logging 12 from haystack.components.preprocessors.sentence_tokenizer import Language, SentenceSplitter, nltk_imports 13 from haystack.core.serialization import default_from_dict, default_to_dict 14 from haystack.utils import deserialize_callable, serialize_callable 15 16 logger = logging.getLogger(__name__) 17 18 # mapping of split by character, 'function' and 'sentence' don't split by character 19 _CHARACTER_SPLIT_BY_MAPPING = {"page": "\f", "passage": "\n\n", "period": ".", "word": " ", "line": "\n"} 20 21 22 @component 23 class DocumentSplitter: 24 """ 25 Splits long documents into smaller chunks. 26 27 This is a common preprocessing step during indexing. It helps Embedders create meaningful semantic representations 28 and prevents exceeding language model context limits. 29 30 The DocumentSplitter is compatible with the following DocumentStores: 31 - [Astra](https://docs.haystack.deepset.ai/docs/astradocumentstore) 32 - [Chroma](https://docs.haystack.deepset.ai/docs/chromadocumentstore) limited support, overlapping information is 33 not stored 34 - [Elasticsearch](https://docs.haystack.deepset.ai/docs/elasticsearch-document-store) 35 - [OpenSearch](https://docs.haystack.deepset.ai/docs/opensearch-document-store) 36 - [Pgvector](https://docs.haystack.deepset.ai/docs/pgvectordocumentstore) 37 - [Pinecone](https://docs.haystack.deepset.ai/docs/pinecone-document-store) limited support, overlapping 38 information is not stored 39 - [Qdrant](https://docs.haystack.deepset.ai/docs/qdrant-document-store) 40 - [Weaviate](https://docs.haystack.deepset.ai/docs/weaviatedocumentstore) 41 42 ### Usage example 43 44 ```python 45 from haystack import Document 46 from haystack.components.preprocessors import DocumentSplitter 47 48 doc = Document(content="Moonlight shimmered softly, wolves howled nearby, night enveloped everything.") 49 50 splitter = DocumentSplitter(split_by="word", split_length=3, split_overlap=0) 51 result = splitter.run(documents=[doc]) 52 ``` 53 """ 54 55 def __init__( 56 self, 57 split_by: Literal["function", "page", "passage", "period", "word", "line", "sentence"] = "word", 58 split_length: int = 200, 59 split_overlap: int = 0, 60 split_threshold: int = 0, 61 splitting_function: Callable[[str], list[str]] | None = None, 62 respect_sentence_boundary: bool = False, 63 language: Language = "en", 64 use_split_rules: bool = True, 65 extend_abbreviations: bool = True, 66 *, 67 skip_empty_documents: bool = True, 68 ) -> None: 69 """ 70 Initialize DocumentSplitter. 71 72 :param split_by: The unit for splitting your documents. Choose from: 73 - `word` for splitting by spaces (" ") 74 - `period` for splitting by periods (".") 75 - `page` for splitting by form feed ("\\f") 76 - `passage` for splitting by double line breaks ("\\n\\n") 77 - `line` for splitting each line ("\\n") 78 - `sentence` for splitting by NLTK sentence tokenizer 79 80 :param split_length: The maximum number of units in each split. 81 :param split_overlap: The number of overlapping units for each split. 82 :param split_threshold: The minimum number of units per split. If a split has fewer units 83 than the threshold, it's attached to the previous split. 84 :param splitting_function: Necessary when `split_by` is set to "function". 85 This is a function which must accept a single `str` as input and return a `list` of `str` as output, 86 representing the chunks after splitting. 87 :param respect_sentence_boundary: Choose whether to respect sentence boundaries when splitting by "word". 88 If True, uses NLTK to detect sentence boundaries, ensuring splits occur only between sentences. 89 :param language: Choose the language for the NLTK tokenizer. The default is English ("en"). 90 :param use_split_rules: Choose whether to use additional split rules when splitting by `sentence`. 91 :param extend_abbreviations: Choose whether to extend NLTK's PunktTokenizer abbreviations with a list 92 of curated abbreviations, if available. This is currently supported for English ("en") and German ("de"). 93 :param skip_empty_documents: Choose whether to skip documents with empty content. Default is True. 94 Set to False when downstream components in the Pipeline (like LLMDocumentContentExtractor) can extract text 95 from non-textual documents. 96 """ 97 98 self.split_by = split_by 99 self.split_length = split_length 100 self.split_overlap = split_overlap 101 self.split_threshold = split_threshold 102 self.splitting_function = splitting_function 103 self.respect_sentence_boundary = respect_sentence_boundary 104 self.language = language 105 self.use_split_rules = use_split_rules 106 self.extend_abbreviations = extend_abbreviations 107 self.skip_empty_documents = skip_empty_documents 108 109 self._init_checks( 110 split_by=split_by, 111 split_length=split_length, 112 split_overlap=split_overlap, 113 splitting_function=splitting_function, 114 respect_sentence_boundary=respect_sentence_boundary, 115 ) 116 self._use_sentence_splitter = split_by == "sentence" or (respect_sentence_boundary and split_by == "word") 117 if self._use_sentence_splitter: 118 nltk_imports.check() 119 self.sentence_splitter: SentenceSplitter | None = None 120 121 def _init_checks( 122 self, 123 *, 124 split_by: str, 125 split_length: int, 126 split_overlap: int, 127 splitting_function: Callable | None, 128 respect_sentence_boundary: bool, 129 ) -> None: 130 """ 131 Validates initialization parameters for DocumentSplitter. 132 133 :param split_by: The unit for splitting documents 134 :param split_length: The maximum number of units in each split 135 :param split_overlap: The number of overlapping units for each split 136 :param splitting_function: Custom function for splitting when split_by="function" 137 :param respect_sentence_boundary: Whether to respect sentence boundaries when splitting 138 :raises ValueError: If any parameter is invalid 139 """ 140 valid_split_by = ["function", "page", "passage", "period", "word", "line", "sentence"] 141 if split_by not in valid_split_by: 142 raise ValueError(f"split_by must be one of {', '.join(valid_split_by)}.") 143 144 if split_by == "function" and splitting_function is None: 145 raise ValueError("When 'split_by' is set to 'function', a valid 'splitting_function' must be provided.") 146 147 if split_length <= 0: 148 raise ValueError("split_length must be greater than 0.") 149 150 if split_overlap < 0: 151 raise ValueError("split_overlap must be greater than or equal to 0.") 152 153 if respect_sentence_boundary and split_by != "word": 154 logger.warning( 155 "The 'respect_sentence_boundary' option is only supported for `split_by='word'`. " 156 "The option `respect_sentence_boundary` will be set to `False`." 157 ) 158 self.respect_sentence_boundary = False 159 160 def warm_up(self) -> None: 161 """ 162 Warm up the DocumentSplitter by loading the sentence tokenizer. 163 """ 164 if self._use_sentence_splitter and self.sentence_splitter is None: 165 self.sentence_splitter = SentenceSplitter( 166 language=self.language, 167 use_split_rules=self.use_split_rules, 168 extend_abbreviations=self.extend_abbreviations, 169 keep_white_spaces=True, 170 ) 171 172 @component.output_types(documents=list[Document]) 173 def run(self, documents: list[Document]) -> dict[str, list[Document]]: 174 """ 175 Split documents into smaller parts. 176 177 Splits documents by the unit expressed in `split_by`, with a length of `split_length` 178 and an overlap of `split_overlap`. 179 180 :param documents: The documents to split. 181 :returns: A dictionary with the following key: 182 - `documents`: List of documents with the split texts. Each document includes: 183 - A metadata field `source_id` to track the original document. 184 - A metadata field `page_number` to track the original page number. 185 - All other metadata copied from the original document. 186 187 :raises TypeError: if the input is not a list of Documents. 188 :raises ValueError: if the content of a document is None. 189 """ 190 if self._use_sentence_splitter and self.sentence_splitter is None: 191 self.warm_up() 192 193 if not isinstance(documents, list) or (documents and not isinstance(documents[0], Document)): 194 raise TypeError("DocumentSplitter expects a List of Documents as input.") 195 196 split_docs: list[Document] = [] 197 for doc in documents: 198 if doc.content is None: 199 raise ValueError( 200 f"DocumentSplitter only works with text documents but content for document ID {doc.id} is None." 201 ) 202 if doc.content == "" and self.skip_empty_documents: 203 logger.warning("Document ID {doc_id} has an empty content. Skipping this document.", doc_id=doc.id) 204 continue 205 206 split_docs += self._split_document(doc) 207 return {"documents": split_docs} 208 209 def _split_document(self, doc: Document) -> list[Document]: 210 if self.split_by == "sentence" or self.respect_sentence_boundary: 211 return self._split_by_nltk_sentence(doc) 212 213 if self.split_by == "function" and self.splitting_function is not None: 214 return self._split_by_function(doc) 215 216 return self._split_by_character(doc) 217 218 def _split_by_nltk_sentence(self, doc: Document) -> list[Document]: 219 split_docs = [] 220 221 result = self.sentence_splitter.split_sentences(doc.content) # type: ignore # None check is done in run() 222 units = [sentence["sentence"] for sentence in result] 223 224 if self.respect_sentence_boundary: 225 text_splits, splits_pages, splits_start_idxs = self._concatenate_sentences_based_on_word_amount( 226 sentences=units, split_length=self.split_length, split_overlap=self.split_overlap 227 ) 228 else: 229 text_splits, splits_pages, splits_start_idxs = self._concatenate_units( 230 elements=units, 231 split_length=self.split_length, 232 split_overlap=self.split_overlap, 233 split_threshold=self.split_threshold, 234 ) 235 metadata = deepcopy(doc.meta) 236 metadata["source_id"] = doc.id 237 split_docs += self._create_docs_from_splits( 238 text_splits=text_splits, splits_pages=splits_pages, splits_start_idxs=splits_start_idxs, meta=metadata 239 ) 240 241 return split_docs 242 243 def _split_by_character(self, doc: Document) -> list[Document]: 244 split_at = _CHARACTER_SPLIT_BY_MAPPING[self.split_by] 245 units = doc.content.split(split_at) # type: ignore[union-attr] 246 # Add the delimiter back to all units except the last one 247 for i in range(len(units) - 1): 248 units[i] += split_at 249 text_splits, splits_pages, splits_start_idxs = self._concatenate_units( 250 units, self.split_length, self.split_overlap, self.split_threshold 251 ) 252 metadata = deepcopy(doc.meta) 253 metadata["source_id"] = doc.id 254 return self._create_docs_from_splits( 255 text_splits=text_splits, splits_pages=splits_pages, splits_start_idxs=splits_start_idxs, meta=metadata 256 ) 257 258 def _split_by_function(self, doc: Document) -> list[Document]: 259 # the check for None is done already in the run method 260 splits = self.splitting_function(doc.content) # type: ignore 261 docs: list[Document] = [] 262 for s in splits: 263 meta = deepcopy(doc.meta) 264 meta["source_id"] = doc.id 265 docs.append(Document(content=s, meta=meta)) 266 return docs 267 268 def _concatenate_units( 269 self, elements: list[str], split_length: int, split_overlap: int, split_threshold: int 270 ) -> tuple[list[str], list[int], list[int]]: 271 """ 272 Concatenates the elements into parts of split_length units. 273 274 Keeps track of the original page number that each element belongs. If the length of the current units is less 275 than the pre-defined `split_threshold`, it does not create a new split. Instead, it concatenates the current 276 units with the last split, preventing the creation of excessively small splits. 277 """ 278 279 text_splits: list[str] = [] 280 splits_pages: list[int] = [] 281 splits_start_idxs: list[int] = [] 282 cur_start_idx = 0 283 cur_page = 1 284 segments = windowed(elements, n=split_length, step=split_length - split_overlap) 285 286 for seg in segments: 287 current_units = [unit for unit in seg if unit is not None] 288 txt = "".join(current_units) 289 290 # check if length of current units is below split_threshold 291 if len(current_units) < split_threshold and len(text_splits) > 0: 292 # concatenate the last split with the current one 293 text_splits[-1] += txt 294 295 # NOTE: If skip_empty_documents is True, this line skips documents that have content="" 296 elif not self.skip_empty_documents or len(txt) > 0: 297 text_splits.append(txt) 298 splits_pages.append(cur_page) 299 splits_start_idxs.append(cur_start_idx) 300 301 processed_units = current_units[: split_length - split_overlap] 302 cur_start_idx += len("".join(processed_units)) 303 304 if self.split_by == "page": 305 num_page_breaks = len(processed_units) 306 else: 307 num_page_breaks = sum(processed_unit.count("\f") for processed_unit in processed_units) 308 309 cur_page += num_page_breaks 310 311 return text_splits, splits_pages, splits_start_idxs 312 313 def _create_docs_from_splits( 314 self, text_splits: list[str], splits_pages: list[int], splits_start_idxs: list[int], meta: dict[str, Any] 315 ) -> list[Document]: 316 """ 317 Creates Document objects from splits enriching them with page number and the metadata of the original document. 318 """ 319 documents: list[Document] = [] 320 321 for i, (txt, split_idx) in enumerate(zip(text_splits, splits_start_idxs, strict=True)): 322 copied_meta = deepcopy(meta) 323 copied_meta["page_number"] = splits_pages[i] 324 copied_meta["split_id"] = i 325 copied_meta["split_idx_start"] = split_idx 326 doc = Document(content=txt, meta=copied_meta) 327 documents.append(doc) 328 329 if self.split_overlap <= 0: 330 continue 331 332 doc.meta["_split_overlap"] = [] 333 334 if i == 0: 335 continue 336 337 doc_start_idx = splits_start_idxs[i] 338 previous_doc = documents[i - 1] 339 previous_doc_start_idx = splits_start_idxs[i - 1] 340 self._add_split_overlap_information(doc, doc_start_idx, previous_doc, previous_doc_start_idx) 341 342 return documents 343 344 @staticmethod 345 def _add_split_overlap_information( 346 current_doc: Document, current_doc_start_idx: int, previous_doc: Document, previous_doc_start_idx: int 347 ) -> None: 348 """ 349 Adds split overlap information to the current and previous Document's meta. 350 351 :param current_doc: The Document that is being split. 352 :param current_doc_start_idx: The starting index of the current Document. 353 :param previous_doc: The Document that was split before the current Document. 354 :param previous_doc_start_idx: The starting index of the previous Document. 355 """ 356 overlapping_range = (current_doc_start_idx - previous_doc_start_idx, len(previous_doc.content)) # type: ignore 357 358 if overlapping_range[0] < overlapping_range[1]: 359 overlapping_str = previous_doc.content[overlapping_range[0] : overlapping_range[1]] # type: ignore 360 361 if current_doc.content.startswith(overlapping_str): # type: ignore 362 # add split overlap information to this Document regarding the previous Document 363 current_doc.meta["_split_overlap"].append({"doc_id": previous_doc.id, "range": overlapping_range}) 364 365 # add split overlap information to previous Document regarding this Document 366 overlapping_range = (0, overlapping_range[1] - overlapping_range[0]) 367 previous_doc.meta["_split_overlap"].append({"doc_id": current_doc.id, "range": overlapping_range}) 368 369 def to_dict(self) -> dict[str, Any]: 370 """ 371 Serializes the component to a dictionary. 372 """ 373 serialized = default_to_dict( 374 self, 375 split_by=self.split_by, 376 split_length=self.split_length, 377 split_overlap=self.split_overlap, 378 split_threshold=self.split_threshold, 379 respect_sentence_boundary=self.respect_sentence_boundary, 380 language=self.language, 381 use_split_rules=self.use_split_rules, 382 extend_abbreviations=self.extend_abbreviations, 383 skip_empty_documents=self.skip_empty_documents, 384 ) 385 if self.splitting_function: 386 serialized["init_parameters"]["splitting_function"] = serialize_callable(self.splitting_function) 387 return serialized 388 389 @classmethod 390 def from_dict(cls, data: dict[str, Any]) -> "DocumentSplitter": 391 """ 392 Deserializes the component from a dictionary. 393 """ 394 init_params = data.get("init_parameters", {}) 395 396 splitting_function = init_params.get("splitting_function", None) 397 if splitting_function: 398 init_params["splitting_function"] = deserialize_callable(splitting_function) 399 400 return default_from_dict(cls, data) 401 402 @staticmethod 403 def _concatenate_sentences_based_on_word_amount( 404 sentences: list[str], split_length: int, split_overlap: int 405 ) -> tuple[list[str], list[int], list[int]]: 406 """ 407 Groups the sentences into chunks of `split_length` words while respecting sentence boundaries. 408 409 This function is only used when splitting by `word` and `respect_sentence_boundary` is set to `True`, i.e.: 410 with NLTK sentence tokenizer. 411 412 :param sentences: The list of sentences to split. 413 :param split_length: The maximum number of words in each split. 414 :param split_overlap: The number of overlapping words in each split. 415 :returns: A tuple containing the concatenated sentences, the start page numbers, and the start indices. 416 """ 417 # chunk information 418 chunk_word_count = 0 419 chunk_starting_page_number = 1 420 chunk_start_idx = 0 421 current_chunk: list[str] = [] 422 # output lists 423 split_start_page_numbers = [] 424 list_of_splits: list[list[str]] = [] 425 split_start_indices = [] 426 427 for sentence_idx, sentence in enumerate(sentences): 428 current_chunk.append(sentence) 429 chunk_word_count += len(sentence.split()) 430 next_sentence_word_count = ( 431 len(sentences[sentence_idx + 1].split()) if sentence_idx < len(sentences) - 1 else 0 432 ) 433 434 # Number of words in the current chunk plus the next sentence is larger than the split_length, 435 # or we reached the last sentence 436 if (chunk_word_count + next_sentence_word_count) > split_length or sentence_idx == len(sentences) - 1: 437 # Save current chunk and start a new one 438 list_of_splits.append(current_chunk) 439 split_start_page_numbers.append(chunk_starting_page_number) 440 split_start_indices.append(chunk_start_idx) 441 442 # Get the number of sentences that overlap with the next chunk 443 num_sentences_to_keep = DocumentSplitter._number_of_sentences_to_keep( 444 sentences=current_chunk, split_length=split_length, split_overlap=split_overlap 445 ) 446 # Set up information for the new chunk 447 if num_sentences_to_keep > 0: 448 # Processed sentences are the ones that are not overlapping with the next chunk 449 processed_sentences = current_chunk[:-num_sentences_to_keep] 450 chunk_starting_page_number += sum(sent.count("\f") for sent in processed_sentences) 451 chunk_start_idx += len("".join(processed_sentences)) 452 # Next chunk starts with the sentences that were overlapping with the previous chunk 453 current_chunk = current_chunk[-num_sentences_to_keep:] 454 chunk_word_count = sum(len(s.split()) for s in current_chunk) 455 else: 456 # Here processed_sentences is the same as current_chunk since there is no overlap 457 chunk_starting_page_number += sum(sent.count("\f") for sent in current_chunk) 458 chunk_start_idx += len("".join(current_chunk)) 459 current_chunk = [] 460 chunk_word_count = 0 461 462 # Concatenate the sentences together within each split 463 text_splits = [] 464 for split in list_of_splits: 465 text = "".join(split) 466 if len(text) > 0: 467 text_splits.append(text) 468 469 return text_splits, split_start_page_numbers, split_start_indices 470 471 @staticmethod 472 def _number_of_sentences_to_keep(sentences: list[str], split_length: int, split_overlap: int) -> int: 473 """ 474 Returns the number of sentences to keep in the next chunk based on the `split_overlap` and `split_length`. 475 476 :param sentences: The list of sentences to split. 477 :param split_length: The maximum number of words in each split. 478 :param split_overlap: The number of overlapping words in each split. 479 :returns: The number of sentences to keep in the next chunk. 480 """ 481 # If the split_overlap is 0, we don't need to keep any sentences 482 if split_overlap == 0: 483 return 0 484 485 num_sentences_to_keep = 0 486 num_words = 0 487 # Next overlapping Document should not start exactly the same as the previous one, so we skip the first sentence 488 for sent in reversed(sentences[1:]): 489 num_words += len(sent.split()) 490 # If the number of words is larger than the split_length then don't add any more sentences 491 if num_words > split_length: 492 break 493 num_sentences_to_keep += 1 494 if num_words > split_overlap: 495 break 496 return num_sentences_to_keep