document_joiner.py
1 # SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai> 2 # 3 # SPDX-License-Identifier: Apache-2.0 4 5 import itertools 6 from collections import defaultdict 7 from dataclasses import replace 8 from enum import Enum 9 from math import inf 10 from typing import Any 11 12 from haystack import Document, component, default_from_dict, default_to_dict, logging 13 from haystack.core.component.types import Variadic 14 15 logger = logging.getLogger(__name__) 16 17 18 class JoinMode(Enum): 19 """ 20 Enum for join mode. 21 """ 22 23 CONCATENATE = "concatenate" 24 MERGE = "merge" 25 RECIPROCAL_RANK_FUSION = "reciprocal_rank_fusion" 26 DISTRIBUTION_BASED_RANK_FUSION = "distribution_based_rank_fusion" 27 28 def __str__(self) -> str: 29 return self.value 30 31 @staticmethod 32 def from_str(string: str) -> "JoinMode": 33 """ 34 Convert a string to a JoinMode enum. 35 """ 36 enum_map = {e.value: e for e in JoinMode} 37 mode = enum_map.get(string) 38 if mode is None: 39 msg = f"Unknown join mode '{string}'. Supported modes in DocumentJoiner are: {list(enum_map.keys())}" 40 raise ValueError(msg) 41 return mode 42 43 44 @component 45 class DocumentJoiner: 46 """ 47 Joins multiple lists of documents into a single list. 48 49 It supports different join modes: 50 - concatenate: Keeps the highest-scored document in case of duplicates. 51 - merge: Calculates a weighted sum of scores for duplicates and merges them. 52 - reciprocal_rank_fusion: Merges and assigns scores based on reciprocal rank fusion. 53 - distribution_based_rank_fusion: Merges and assigns scores based on scores distribution in each Retriever. 54 55 ### Usage example: 56 57 ```python 58 from haystack import Pipeline, Document 59 from haystack.components.embedders import SentenceTransformersTextEmbedder, SentenceTransformersDocumentEmbedder 60 from haystack.components.joiners import DocumentJoiner 61 from haystack.components.retrievers import InMemoryBM25Retriever 62 from haystack.components.retrievers import InMemoryEmbeddingRetriever 63 from haystack.document_stores.in_memory import InMemoryDocumentStore 64 65 document_store = InMemoryDocumentStore() 66 docs = [Document(content="Paris"), Document(content="Berlin"), Document(content="London")] 67 embedder = SentenceTransformersDocumentEmbedder(model="sentence-transformers/all-MiniLM-L6-v2") 68 docs_embeddings = embedder.run(docs) 69 document_store.write_documents(docs_embeddings['documents']) 70 71 p = Pipeline() 72 p.add_component(instance=InMemoryBM25Retriever(document_store=document_store), name="bm25_retriever") 73 p.add_component( 74 instance=SentenceTransformersTextEmbedder(model="sentence-transformers/all-MiniLM-L6-v2"), 75 name="text_embedder", 76 ) 77 p.add_component(instance=InMemoryEmbeddingRetriever(document_store=document_store), name="embedding_retriever") 78 p.add_component(instance=DocumentJoiner(), name="joiner") 79 p.connect("bm25_retriever", "joiner") 80 p.connect("embedding_retriever", "joiner") 81 p.connect("text_embedder", "embedding_retriever") 82 query = "What is the capital of France?" 83 p.run(data={"query": query, "text": query, "top_k": 1}) 84 ``` 85 """ 86 87 def __init__( 88 self, 89 join_mode: str | JoinMode = JoinMode.CONCATENATE, 90 weights: list[float] | None = None, 91 top_k: int | None = None, 92 sort_by_score: bool = True, 93 ) -> None: 94 """ 95 Creates a DocumentJoiner component. 96 97 :param join_mode: 98 Specifies the join mode to use. Available modes: 99 - `concatenate`: Keeps the highest-scored document in case of duplicates. 100 - `merge`: Calculates a weighted sum of scores for duplicates and merges them. 101 - `reciprocal_rank_fusion`: Merges and assigns scores based on reciprocal rank fusion. 102 - `distribution_based_rank_fusion`: Merges and assigns scores based on scores 103 distribution in each Retriever. 104 :param weights: 105 Assign importance to each list of documents to influence how they're joined. 106 This parameter is ignored for 107 `concatenate` or `distribution_based_rank_fusion` join modes. 108 Weight for each list of documents must match the number of inputs. 109 :param top_k: 110 The maximum number of documents to return. 111 :param sort_by_score: 112 If `True`, sorts the documents by score in descending order. 113 If a document has no score, it is handled as if its score is -infinity. 114 """ 115 if isinstance(join_mode, str): 116 join_mode = JoinMode.from_str(join_mode) 117 join_mode_functions = { 118 JoinMode.CONCATENATE: DocumentJoiner._concatenate, 119 JoinMode.MERGE: self._merge, 120 JoinMode.RECIPROCAL_RANK_FUSION: self._reciprocal_rank_fusion, 121 JoinMode.DISTRIBUTION_BASED_RANK_FUSION: DocumentJoiner._distribution_based_rank_fusion, 122 } 123 self.join_mode_function = join_mode_functions[join_mode] 124 self.join_mode = join_mode 125 self.weights = [float(i) / sum(weights) for i in weights] if weights else None 126 self.top_k = top_k 127 self.sort_by_score = sort_by_score 128 129 @component.output_types(documents=list[Document]) 130 def run(self, documents: Variadic[list[Document]], top_k: int | None = None) -> dict[str, Any]: 131 """ 132 Joins multiple lists of Documents into a single list depending on the `join_mode` parameter. 133 134 :param documents: 135 List of list of documents to be merged. 136 :param top_k: 137 The maximum number of documents to return. Overrides the instance's `top_k` if provided. 138 139 :returns: 140 A dictionary with the following keys: 141 - `documents`: Merged list of Documents 142 """ 143 documents = list(documents) 144 output_documents = self.join_mode_function(documents) 145 146 if self.sort_by_score: 147 output_documents = sorted( 148 output_documents, key=lambda doc: doc.score if doc.score is not None else -inf, reverse=True 149 ) 150 if any(doc.score is None for doc in output_documents): 151 logger.info( 152 "Some of the Documents DocumentJoiner got have score=None. It was configured to sort Documents by " 153 "score, so those with score=None were sorted as if they had a score of -infinity." 154 ) 155 156 if top_k: 157 output_documents = output_documents[:top_k] 158 elif self.top_k: 159 output_documents = output_documents[: self.top_k] 160 161 return {"documents": output_documents} 162 163 @staticmethod 164 def _concatenate(document_lists: list[list[Document]]) -> list[Document]: 165 """ 166 Concatenate multiple lists of Documents and return only the Document with the highest score for duplicates. 167 """ 168 output = [] 169 docs_per_id = defaultdict(list) 170 for doc in itertools.chain.from_iterable(document_lists): 171 docs_per_id[doc.id].append(doc) 172 for docs in docs_per_id.values(): 173 doc_with_best_score = max(docs, key=lambda doc: doc.score if doc.score else -inf) 174 output.append(doc_with_best_score) 175 return output 176 177 def _merge(self, document_lists: list[list[Document]]) -> list[Document]: 178 """ 179 Merge multiple lists of Documents and calculate a weighted sum of the scores of duplicate Documents. 180 """ 181 # This check prevents a division by zero when no documents are passed 182 if not document_lists: 183 return [] 184 185 scores_map: dict = defaultdict(int) 186 documents_map = {} 187 weights = self.weights if self.weights else [1 / len(document_lists)] * len(document_lists) 188 189 for documents, weight in zip(document_lists, weights, strict=True): 190 for doc in documents: 191 scores_map[doc.id] += (doc.score if doc.score else 0) * weight 192 documents_map[doc.id] = doc 193 194 return [replace(doc, score=scores_map[doc.id]) for doc in documents_map.values()] 195 196 def _reciprocal_rank_fusion(self, document_lists: list[list[Document]]) -> list[Document]: 197 """ 198 Merge multiple lists of Documents and assign scores based on reciprocal rank fusion. 199 200 The constant k is set to 61 (60 was suggested by the original paper, 201 plus 1 as python lists are 0-based and the paper used 1-based ranking). 202 """ 203 # This check prevents a division by zero when no documents are passed 204 if not document_lists: 205 return [] 206 207 k = 61 208 209 scores_map: dict = defaultdict(int) 210 documents_map = {} 211 weights = self.weights if self.weights else [1 / len(document_lists)] * len(document_lists) 212 213 # Calculate weighted reciprocal rank fusion score 214 for documents, weight in zip(document_lists, weights, strict=True): 215 for rank, doc in enumerate(documents): 216 scores_map[doc.id] += (weight * len(document_lists)) / (k + rank) 217 documents_map[doc.id] = doc 218 219 # Normalize scores. Note: len(results) / k is the maximum possible score, 220 # achieved by being ranked first in all doc lists with non-zero weight. 221 for _id in scores_map: 222 scores_map[_id] /= len(document_lists) / k 223 224 return [replace(doc, score=scores_map[doc.id]) for doc in documents_map.values()] 225 226 @staticmethod 227 def _distribution_based_rank_fusion(document_lists: list[list[Document]]) -> list[Document]: 228 """ 229 Merge multiple lists of Documents and assign scores based on Distribution-Based Score Fusion. 230 231 (https://medium.com/plain-simple-software/distribution-based-score-fusion-dbsf-a-new-approach-to-vector-search-ranking-f87c37488b18) 232 If a Document is in more than one retriever, the one with the highest score is used. 233 """ 234 rescaled_lists: list[list[Document]] = [] 235 for documents in document_lists: 236 if len(documents) == 0: 237 rescaled_lists.append(documents) 238 continue 239 240 scores_list = [doc.score if doc.score is not None else 0 for doc in documents] 241 242 mean_score = sum(scores_list) / len(scores_list) 243 std_dev = (sum((x - mean_score) ** 2 for x in scores_list) / len(scores_list)) ** 0.5 244 min_score = mean_score - 3 * std_dev 245 max_score = mean_score + 3 * std_dev 246 delta_score = max_score - min_score 247 248 # if all docs have the same score delta_score is 0, the docs are uninformative for the query 249 rescaled_lists.append( 250 [ 251 replace(doc, score=(doc.score - min_score) / delta_score if delta_score != 0.0 else 0.0) 252 for doc in documents 253 ] 254 ) 255 256 return DocumentJoiner._concatenate(document_lists=rescaled_lists) 257 258 def to_dict(self) -> dict[str, Any]: 259 """ 260 Serializes the component to a dictionary. 261 262 :returns: 263 Dictionary with serialized data. 264 """ 265 return default_to_dict( 266 self, 267 join_mode=str(self.join_mode), 268 weights=self.weights, 269 top_k=self.top_k, 270 sort_by_score=self.sort_by_score, 271 ) 272 273 @classmethod 274 def from_dict(cls, data: dict[str, Any]) -> "DocumentJoiner": 275 """ 276 Deserializes the component from a dictionary. 277 278 :param data: 279 The dictionary to deserialize from. 280 :returns: 281 The deserialized component. 282 """ 283 return default_from_dict(cls, data)