document_writer.py
1 # SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai> 2 # 3 # SPDX-License-Identifier: Apache-2.0 4 5 from typing import Any 6 7 from haystack import Document, component, default_from_dict, default_to_dict 8 from haystack.document_stores.types import DocumentStore, DuplicatePolicy 9 10 11 @component 12 class DocumentWriter: 13 """ 14 Writes documents to a DocumentStore. 15 16 ### Usage example 17 ```python 18 from haystack import Document 19 from haystack.components.writers import DocumentWriter 20 from haystack.document_stores.in_memory import InMemoryDocumentStore 21 docs = [ 22 Document(content="Python is a popular programming language"), 23 ] 24 doc_store = InMemoryDocumentStore() 25 writer = DocumentWriter(document_store=doc_store) 26 writer.run(docs) 27 ``` 28 """ 29 30 def __init__(self, document_store: DocumentStore, policy: DuplicatePolicy = DuplicatePolicy.NONE) -> None: 31 """ 32 Create a DocumentWriter component. 33 34 :param document_store: 35 The instance of the document store where you want to store your documents. 36 :param policy: 37 The policy to apply when a Document with the same ID already exists in the DocumentStore. 38 - `DuplicatePolicy.NONE`: Default policy, relies on the DocumentStore settings. 39 - `DuplicatePolicy.SKIP`: Skips documents with the same ID and doesn't write them to the DocumentStore. 40 - `DuplicatePolicy.OVERWRITE`: Overwrites documents with the same ID. 41 - `DuplicatePolicy.FAIL`: Raises an error if a Document with the same ID is already in the DocumentStore. 42 """ 43 self.document_store = document_store 44 self.policy = policy 45 46 def _get_telemetry_data(self) -> dict[str, Any]: 47 """ 48 Data that is sent to Posthog for usage analytics. 49 """ 50 return {"document_store": type(self.document_store).__name__} 51 52 def to_dict(self) -> dict[str, Any]: 53 """ 54 Serializes the component to a dictionary. 55 56 :returns: 57 Dictionary with serialized data. 58 """ 59 return default_to_dict(self, document_store=self.document_store, policy=self.policy.name) 60 61 @classmethod 62 def from_dict(cls, data: dict[str, Any]) -> "DocumentWriter": 63 """ 64 Deserializes the component from a dictionary. 65 66 :param data: 67 The dictionary to deserialize from. 68 :returns: 69 The deserialized component. 70 71 :raises DeserializationError: 72 If the document store is not properly specified in the serialization data or its type cannot be imported. 73 """ 74 init_params = data.get("init_parameters", {}) 75 if "policy" in init_params: 76 init_params["policy"] = DuplicatePolicy[init_params["policy"]] 77 return default_from_dict(cls, data) 78 79 @component.output_types(documents_written=int) 80 def run(self, documents: list[Document], policy: DuplicatePolicy | None = None) -> dict[str, int]: 81 """ 82 Run the DocumentWriter on the given input data. 83 84 :param documents: 85 A list of documents to write to the document store. 86 :param policy: 87 The policy to use when encountering duplicate documents. 88 :returns: 89 Number of documents written to the document store. 90 91 :raises ValueError: 92 If the specified document store is not found. 93 """ 94 if policy is None: 95 policy = self.policy 96 97 documents_written = self.document_store.write_documents(documents=documents, policy=policy) 98 return {"documents_written": documents_written} 99 100 @component.output_types(documents_written=int) 101 async def run_async(self, documents: list[Document], policy: DuplicatePolicy | None = None) -> dict[str, int]: 102 """ 103 Asynchronously run the DocumentWriter on the given input data. 104 105 This is the asynchronous version of the `run` method. It has the same parameters and return values 106 but can be used with `await` in async code. 107 108 :param documents: 109 A list of documents to write to the document store. 110 :param policy: 111 The policy to use when encountering duplicate documents. 112 :returns: 113 Number of documents written to the document store. 114 115 :raises ValueError: 116 If the specified document store is not found. 117 :raises TypeError: 118 If the specified document store does not implement `write_documents_async`. 119 """ 120 if policy is None: 121 policy = self.policy 122 123 if not hasattr(self.document_store, "write_documents_async"): 124 raise TypeError(f"Document store {type(self.document_store).__name__} does not provide async support.") 125 126 documents_written = await self.document_store.write_documents_async(documents=documents, policy=policy) 127 return {"documents_written": documents_written}