/ haystack / components / writers / document_writer.py
document_writer.py
  1  # SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
  2  #
  3  # SPDX-License-Identifier: Apache-2.0
  4  
  5  from typing import Any
  6  
  7  from haystack import Document, component, default_from_dict, default_to_dict
  8  from haystack.document_stores.types import DocumentStore, DuplicatePolicy
  9  
 10  
 11  @component
 12  class DocumentWriter:
 13      """
 14      Writes documents to a DocumentStore.
 15  
 16      ### Usage example
 17      ```python
 18      from haystack import Document
 19      from haystack.components.writers import DocumentWriter
 20      from haystack.document_stores.in_memory import InMemoryDocumentStore
 21      docs = [
 22          Document(content="Python is a popular programming language"),
 23      ]
 24      doc_store = InMemoryDocumentStore()
 25      writer = DocumentWriter(document_store=doc_store)
 26      writer.run(docs)
 27      ```
 28      """
 29  
 30      def __init__(self, document_store: DocumentStore, policy: DuplicatePolicy = DuplicatePolicy.NONE) -> None:
 31          """
 32          Create a DocumentWriter component.
 33  
 34          :param document_store:
 35              The instance of the document store where you want to store your documents.
 36          :param policy:
 37              The policy to apply when a Document with the same ID already exists in the DocumentStore.
 38              - `DuplicatePolicy.NONE`: Default policy, relies on the DocumentStore settings.
 39              - `DuplicatePolicy.SKIP`: Skips documents with the same ID and doesn't write them to the DocumentStore.
 40              - `DuplicatePolicy.OVERWRITE`: Overwrites documents with the same ID.
 41              - `DuplicatePolicy.FAIL`: Raises an error if a Document with the same ID is already in the DocumentStore.
 42          """
 43          self.document_store = document_store
 44          self.policy = policy
 45  
 46      def _get_telemetry_data(self) -> dict[str, Any]:
 47          """
 48          Data that is sent to Posthog for usage analytics.
 49          """
 50          return {"document_store": type(self.document_store).__name__}
 51  
 52      def to_dict(self) -> dict[str, Any]:
 53          """
 54          Serializes the component to a dictionary.
 55  
 56          :returns:
 57              Dictionary with serialized data.
 58          """
 59          return default_to_dict(self, document_store=self.document_store, policy=self.policy.name)
 60  
 61      @classmethod
 62      def from_dict(cls, data: dict[str, Any]) -> "DocumentWriter":
 63          """
 64          Deserializes the component from a dictionary.
 65  
 66          :param data:
 67              The dictionary to deserialize from.
 68          :returns:
 69              The deserialized component.
 70  
 71          :raises DeserializationError:
 72              If the document store is not properly specified in the serialization data or its type cannot be imported.
 73          """
 74          init_params = data.get("init_parameters", {})
 75          if "policy" in init_params:
 76              init_params["policy"] = DuplicatePolicy[init_params["policy"]]
 77          return default_from_dict(cls, data)
 78  
 79      @component.output_types(documents_written=int)
 80      def run(self, documents: list[Document], policy: DuplicatePolicy | None = None) -> dict[str, int]:
 81          """
 82          Run the DocumentWriter on the given input data.
 83  
 84          :param documents:
 85              A list of documents to write to the document store.
 86          :param policy:
 87              The policy to use when encountering duplicate documents.
 88          :returns:
 89              Number of documents written to the document store.
 90  
 91          :raises ValueError:
 92              If the specified document store is not found.
 93          """
 94          if policy is None:
 95              policy = self.policy
 96  
 97          documents_written = self.document_store.write_documents(documents=documents, policy=policy)
 98          return {"documents_written": documents_written}
 99  
100      @component.output_types(documents_written=int)
101      async def run_async(self, documents: list[Document], policy: DuplicatePolicy | None = None) -> dict[str, int]:
102          """
103          Asynchronously run the DocumentWriter on the given input data.
104  
105          This is the asynchronous version of the `run` method. It has the same parameters and return values
106          but can be used with `await` in async code.
107  
108          :param documents:
109              A list of documents to write to the document store.
110          :param policy:
111              The policy to use when encountering duplicate documents.
112          :returns:
113              Number of documents written to the document store.
114  
115          :raises ValueError:
116              If the specified document store is not found.
117          :raises TypeError:
118              If the specified document store does not implement `write_documents_async`.
119          """
120          if policy is None:
121              policy = self.policy
122  
123          if not hasattr(self.document_store, "write_documents_async"):
124              raise TypeError(f"Document store {type(self.document_store).__name__} does not provide async support.")
125  
126          documents_written = await self.document_store.write_documents_async(documents=documents, policy=policy)
127          return {"documents_written": documents_written}