/ haystack / components / joiners / document_joiner.py
document_joiner.py
  1  # SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
  2  #
  3  # SPDX-License-Identifier: Apache-2.0
  4  
  5  import itertools
  6  from collections import defaultdict
  7  from dataclasses import replace
  8  from enum import Enum
  9  from math import inf
 10  from typing import Any
 11  
 12  from haystack import Document, component, default_from_dict, default_to_dict, logging
 13  from haystack.core.component.types import Variadic
 14  
 15  logger = logging.getLogger(__name__)
 16  
 17  
 18  class JoinMode(Enum):
 19      """
 20      Enum for join mode.
 21      """
 22  
 23      CONCATENATE = "concatenate"
 24      MERGE = "merge"
 25      RECIPROCAL_RANK_FUSION = "reciprocal_rank_fusion"
 26      DISTRIBUTION_BASED_RANK_FUSION = "distribution_based_rank_fusion"
 27  
 28      def __str__(self) -> str:
 29          return self.value
 30  
 31      @staticmethod
 32      def from_str(string: str) -> "JoinMode":
 33          """
 34          Convert a string to a JoinMode enum.
 35          """
 36          enum_map = {e.value: e for e in JoinMode}
 37          mode = enum_map.get(string)
 38          if mode is None:
 39              msg = f"Unknown join mode '{string}'. Supported modes in DocumentJoiner are: {list(enum_map.keys())}"
 40              raise ValueError(msg)
 41          return mode
 42  
 43  
 44  @component
 45  class DocumentJoiner:
 46      """
 47      Joins multiple lists of documents into a single list.
 48  
 49      It supports different join modes:
 50      - concatenate: Keeps the highest-scored document in case of duplicates.
 51      - merge: Calculates a weighted sum of scores for duplicates and merges them.
 52      - reciprocal_rank_fusion: Merges and assigns scores based on reciprocal rank fusion.
 53      - distribution_based_rank_fusion: Merges and assigns scores based on scores distribution in each Retriever.
 54  
 55      ### Usage example:
 56  
 57      ```python
 58      from haystack import Pipeline, Document
 59      from haystack.components.embedders import SentenceTransformersTextEmbedder, SentenceTransformersDocumentEmbedder
 60      from haystack.components.joiners import DocumentJoiner
 61      from haystack.components.retrievers import InMemoryBM25Retriever
 62      from haystack.components.retrievers import InMemoryEmbeddingRetriever
 63      from haystack.document_stores.in_memory import InMemoryDocumentStore
 64  
 65      document_store = InMemoryDocumentStore()
 66      docs = [Document(content="Paris"), Document(content="Berlin"), Document(content="London")]
 67      embedder = SentenceTransformersDocumentEmbedder(model="sentence-transformers/all-MiniLM-L6-v2")
 68      docs_embeddings = embedder.run(docs)
 69      document_store.write_documents(docs_embeddings['documents'])
 70  
 71      p = Pipeline()
 72      p.add_component(instance=InMemoryBM25Retriever(document_store=document_store), name="bm25_retriever")
 73      p.add_component(
 74              instance=SentenceTransformersTextEmbedder(model="sentence-transformers/all-MiniLM-L6-v2"),
 75              name="text_embedder",
 76          )
 77      p.add_component(instance=InMemoryEmbeddingRetriever(document_store=document_store), name="embedding_retriever")
 78      p.add_component(instance=DocumentJoiner(), name="joiner")
 79      p.connect("bm25_retriever", "joiner")
 80      p.connect("embedding_retriever", "joiner")
 81      p.connect("text_embedder", "embedding_retriever")
 82      query = "What is the capital of France?"
 83      p.run(data={"query": query, "text": query, "top_k": 1})
 84      ```
 85      """
 86  
 87      def __init__(
 88          self,
 89          join_mode: str | JoinMode = JoinMode.CONCATENATE,
 90          weights: list[float] | None = None,
 91          top_k: int | None = None,
 92          sort_by_score: bool = True,
 93      ) -> None:
 94          """
 95          Creates a DocumentJoiner component.
 96  
 97          :param join_mode:
 98              Specifies the join mode to use. Available modes:
 99              - `concatenate`: Keeps the highest-scored document in case of duplicates.
100              - `merge`: Calculates a weighted sum of scores for duplicates and merges them.
101              - `reciprocal_rank_fusion`: Merges and assigns scores based on reciprocal rank fusion.
102              - `distribution_based_rank_fusion`: Merges and assigns scores based on scores
103              distribution in each Retriever.
104          :param weights:
105              Assign importance to each list of documents to influence how they're joined.
106              This parameter is ignored for
107              `concatenate` or `distribution_based_rank_fusion` join modes.
108              Weight for each list of documents must match the number of inputs.
109          :param top_k:
110              The maximum number of documents to return.
111          :param sort_by_score:
112              If `True`, sorts the documents by score in descending order.
113              If a document has no score, it is handled as if its score is -infinity.
114          """
115          if isinstance(join_mode, str):
116              join_mode = JoinMode.from_str(join_mode)
117          join_mode_functions = {
118              JoinMode.CONCATENATE: DocumentJoiner._concatenate,
119              JoinMode.MERGE: self._merge,
120              JoinMode.RECIPROCAL_RANK_FUSION: self._reciprocal_rank_fusion,
121              JoinMode.DISTRIBUTION_BASED_RANK_FUSION: DocumentJoiner._distribution_based_rank_fusion,
122          }
123          self.join_mode_function = join_mode_functions[join_mode]
124          self.join_mode = join_mode
125          self.weights = [float(i) / sum(weights) for i in weights] if weights else None
126          self.top_k = top_k
127          self.sort_by_score = sort_by_score
128  
129      @component.output_types(documents=list[Document])
130      def run(self, documents: Variadic[list[Document]], top_k: int | None = None) -> dict[str, Any]:
131          """
132          Joins multiple lists of Documents into a single list depending on the `join_mode` parameter.
133  
134          :param documents:
135              List of list of documents to be merged.
136          :param top_k:
137              The maximum number of documents to return. Overrides the instance's `top_k` if provided.
138  
139          :returns:
140              A dictionary with the following keys:
141              - `documents`: Merged list of Documents
142          """
143          documents = list(documents)
144          output_documents = self.join_mode_function(documents)
145  
146          if self.sort_by_score:
147              output_documents = sorted(
148                  output_documents, key=lambda doc: doc.score if doc.score is not None else -inf, reverse=True
149              )
150              if any(doc.score is None for doc in output_documents):
151                  logger.info(
152                      "Some of the Documents DocumentJoiner got have score=None. It was configured to sort Documents by "
153                      "score, so those with score=None were sorted as if they had a score of -infinity."
154                  )
155  
156          if top_k:
157              output_documents = output_documents[:top_k]
158          elif self.top_k:
159              output_documents = output_documents[: self.top_k]
160  
161          return {"documents": output_documents}
162  
163      @staticmethod
164      def _concatenate(document_lists: list[list[Document]]) -> list[Document]:
165          """
166          Concatenate multiple lists of Documents and return only the Document with the highest score for duplicates.
167          """
168          output = []
169          docs_per_id = defaultdict(list)
170          for doc in itertools.chain.from_iterable(document_lists):
171              docs_per_id[doc.id].append(doc)
172          for docs in docs_per_id.values():
173              doc_with_best_score = max(docs, key=lambda doc: doc.score if doc.score else -inf)
174              output.append(doc_with_best_score)
175          return output
176  
177      def _merge(self, document_lists: list[list[Document]]) -> list[Document]:
178          """
179          Merge multiple lists of Documents and calculate a weighted sum of the scores of duplicate Documents.
180          """
181          # This check prevents a division by zero when no documents are passed
182          if not document_lists:
183              return []
184  
185          scores_map: dict = defaultdict(int)
186          documents_map = {}
187          weights = self.weights if self.weights else [1 / len(document_lists)] * len(document_lists)
188  
189          for documents, weight in zip(document_lists, weights, strict=True):
190              for doc in documents:
191                  scores_map[doc.id] += (doc.score if doc.score else 0) * weight
192                  documents_map[doc.id] = doc
193  
194          return [replace(doc, score=scores_map[doc.id]) for doc in documents_map.values()]
195  
196      def _reciprocal_rank_fusion(self, document_lists: list[list[Document]]) -> list[Document]:
197          """
198          Merge multiple lists of Documents and assign scores based on reciprocal rank fusion.
199  
200          The constant k is set to 61 (60 was suggested by the original paper,
201          plus 1 as python lists are 0-based and the paper used 1-based ranking).
202          """
203          # This check prevents a division by zero when no documents are passed
204          if not document_lists:
205              return []
206  
207          k = 61
208  
209          scores_map: dict = defaultdict(int)
210          documents_map = {}
211          weights = self.weights if self.weights else [1 / len(document_lists)] * len(document_lists)
212  
213          # Calculate weighted reciprocal rank fusion score
214          for documents, weight in zip(document_lists, weights, strict=True):
215              for rank, doc in enumerate(documents):
216                  scores_map[doc.id] += (weight * len(document_lists)) / (k + rank)
217                  documents_map[doc.id] = doc
218  
219          # Normalize scores. Note: len(results) / k is the maximum possible score,
220          # achieved by being ranked first in all doc lists with non-zero weight.
221          for _id in scores_map:
222              scores_map[_id] /= len(document_lists) / k
223  
224          return [replace(doc, score=scores_map[doc.id]) for doc in documents_map.values()]
225  
226      @staticmethod
227      def _distribution_based_rank_fusion(document_lists: list[list[Document]]) -> list[Document]:
228          """
229          Merge multiple lists of Documents and assign scores based on Distribution-Based Score Fusion.
230  
231          (https://medium.com/plain-simple-software/distribution-based-score-fusion-dbsf-a-new-approach-to-vector-search-ranking-f87c37488b18)
232          If a Document is in more than one retriever, the one with the highest score is used.
233          """
234          rescaled_lists: list[list[Document]] = []
235          for documents in document_lists:
236              if len(documents) == 0:
237                  rescaled_lists.append(documents)
238                  continue
239  
240              scores_list = [doc.score if doc.score is not None else 0 for doc in documents]
241  
242              mean_score = sum(scores_list) / len(scores_list)
243              std_dev = (sum((x - mean_score) ** 2 for x in scores_list) / len(scores_list)) ** 0.5
244              min_score = mean_score - 3 * std_dev
245              max_score = mean_score + 3 * std_dev
246              delta_score = max_score - min_score
247  
248              # if all docs have the same score delta_score is 0, the docs are uninformative for the query
249              rescaled_lists.append(
250                  [
251                      replace(doc, score=(doc.score - min_score) / delta_score if delta_score != 0.0 else 0.0)
252                      for doc in documents
253                  ]
254              )
255  
256          return DocumentJoiner._concatenate(document_lists=rescaled_lists)
257  
258      def to_dict(self) -> dict[str, Any]:
259          """
260          Serializes the component to a dictionary.
261  
262          :returns:
263              Dictionary with serialized data.
264          """
265          return default_to_dict(
266              self,
267              join_mode=str(self.join_mode),
268              weights=self.weights,
269              top_k=self.top_k,
270              sort_by_score=self.sort_by_score,
271          )
272  
273      @classmethod
274      def from_dict(cls, data: dict[str, Any]) -> "DocumentJoiner":
275          """
276          Deserializes the component from a dictionary.
277  
278          :param data:
279              The dictionary to deserialize from.
280          :returns:
281              The deserialized component.
282          """
283          return default_from_dict(cls, data)