joiners_api.md
1 --- 2 title: Joiners 3 id: joiners-api 4 description: Components that join list of different objects 5 slug: "/joiners-api" 6 --- 7 8 <a id="answer_joiner"></a> 9 10 # Module answer\_joiner 11 12 <a id="answer_joiner.JoinMode"></a> 13 14 ## JoinMode 15 16 Enum for AnswerJoiner join modes. 17 18 <a id="answer_joiner.JoinMode.from_str"></a> 19 20 #### JoinMode.from\_str 21 22 ```python 23 @staticmethod 24 def from_str(string: str) -> "JoinMode" 25 ``` 26 27 Convert a string to a JoinMode enum. 28 29 <a id="answer_joiner.AnswerJoiner"></a> 30 31 ## AnswerJoiner 32 33 Merges multiple lists of `Answer` objects into a single list. 34 35 Use this component to combine answers from different Generators into a single list. 36 Currently, the component supports only one join mode: `CONCATENATE`. 37 This mode concatenates multiple lists of answers into a single list. 38 39 ### Usage example 40 41 In this example, AnswerJoiner merges answers from two different Generators: 42 43 ```python 44 from haystack.components.builders import AnswerBuilder 45 from haystack.components.joiners import AnswerJoiner 46 47 from haystack.core.pipeline import Pipeline 48 49 from haystack.components.generators.chat import OpenAIChatGenerator 50 from haystack.dataclasses import ChatMessage 51 52 53 query = "What's Natural Language Processing?" 54 messages = [ChatMessage.from_system("You are a helpful, respectful and honest assistant. Be super concise."), 55 ChatMessage.from_user(query)] 56 57 pipe = Pipeline() 58 pipe.add_component("gpt-4o", OpenAIChatGenerator(model="gpt-4o")) 59 pipe.add_component("gpt-4o-mini", OpenAIChatGenerator(model="gpt-4o-mini")) 60 pipe.add_component("aba", AnswerBuilder()) 61 pipe.add_component("abb", AnswerBuilder()) 62 pipe.add_component("joiner", AnswerJoiner()) 63 64 pipe.connect("gpt-4o.replies", "aba") 65 pipe.connect("gpt-4o-mini.replies", "abb") 66 pipe.connect("aba.answers", "joiner") 67 pipe.connect("abb.answers", "joiner") 68 69 results = pipe.run(data={"gpt-4o": {"messages": messages}, 70 "gpt-4o-mini": {"messages": messages}, 71 "aba": {"query": query}, 72 "abb": {"query": query}}) 73 ``` 74 75 <a id="answer_joiner.AnswerJoiner.__init__"></a> 76 77 #### AnswerJoiner.\_\_init\_\_ 78 79 ```python 80 def __init__(join_mode: Union[str, JoinMode] = JoinMode.CONCATENATE, 81 top_k: Optional[int] = None, 82 sort_by_score: bool = False) 83 ``` 84 85 Creates an AnswerJoiner component. 86 87 **Arguments**: 88 89 - `join_mode`: Specifies the join mode to use. Available modes: 90 - `concatenate`: Concatenates multiple lists of Answers into a single list. 91 - `top_k`: The maximum number of Answers to return. 92 - `sort_by_score`: If `True`, sorts the documents by score in descending order. 93 If a document has no score, it is handled as if its score is -infinity. 94 95 <a id="answer_joiner.AnswerJoiner.run"></a> 96 97 #### AnswerJoiner.run 98 99 ```python 100 @component.output_types(answers=list[AnswerType]) 101 def run(answers: Variadic[list[AnswerType]], top_k: Optional[int] = None) 102 ``` 103 104 Joins multiple lists of Answers into a single list depending on the `join_mode` parameter. 105 106 **Arguments**: 107 108 - `answers`: Nested list of Answers to be merged. 109 - `top_k`: The maximum number of Answers to return. Overrides the instance's `top_k` if provided. 110 111 **Returns**: 112 113 A dictionary with the following keys: 114 - `answers`: Merged list of Answers 115 116 <a id="answer_joiner.AnswerJoiner.to_dict"></a> 117 118 #### AnswerJoiner.to\_dict 119 120 ```python 121 def to_dict() -> dict[str, Any] 122 ``` 123 124 Serializes the component to a dictionary. 125 126 **Returns**: 127 128 Dictionary with serialized data. 129 130 <a id="answer_joiner.AnswerJoiner.from_dict"></a> 131 132 #### AnswerJoiner.from\_dict 133 134 ```python 135 @classmethod 136 def from_dict(cls, data: dict[str, Any]) -> "AnswerJoiner" 137 ``` 138 139 Deserializes the component from a dictionary. 140 141 **Arguments**: 142 143 - `data`: The dictionary to deserialize from. 144 145 **Returns**: 146 147 The deserialized component. 148 149 <a id="branch"></a> 150 151 # Module branch 152 153 <a id="branch.BranchJoiner"></a> 154 155 ## BranchJoiner 156 157 A component that merges multiple input branches of a pipeline into a single output stream. 158 159 `BranchJoiner` receives multiple inputs of the same data type and forwards the first received value 160 to its output. This is useful for scenarios where multiple branches need to converge before proceeding. 161 162 ### Common Use Cases: 163 - **Loop Handling:** `BranchJoiner` helps close loops in pipelines. For example, if a pipeline component validates 164 or modifies incoming data and produces an error-handling branch, `BranchJoiner` can merge both branches and send 165 (or resend in the case of a loop) the data to the component that evaluates errors. See "Usage example" below. 166 167 - **Decision-Based Merging:** `BranchJoiner` reconciles branches coming from Router components (such as 168 `ConditionalRouter`, `TextLanguageRouter`). Suppose a `TextLanguageRouter` directs user queries to different 169 Retrievers based on the detected language. Each Retriever processes its assigned query and passes the results 170 to `BranchJoiner`, which consolidates them into a single output before passing them to the next component, such 171 as a `PromptBuilder`. 172 173 ### Example Usage: 174 ```python 175 import json 176 177 from haystack import Pipeline 178 from haystack.components.converters import OutputAdapter 179 from haystack.components.generators.chat import OpenAIChatGenerator 180 from haystack.components.joiners import BranchJoiner 181 from haystack.components.validators import JsonSchemaValidator 182 from haystack.dataclasses import ChatMessage 183 184 # Define a schema for validation 185 person_schema = { 186 "type": "object", 187 "properties": { 188 "first_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"}, 189 "last_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"}, 190 "nationality": {"type": "string", "enum": ["Italian", "Portuguese", "American"]}, 191 }, 192 "required": ["first_name", "last_name", "nationality"] 193 } 194 195 # Initialize a pipeline 196 pipe = Pipeline() 197 198 # Add components to the pipeline 199 pipe.add_component('joiner', BranchJoiner(list[ChatMessage])) 200 pipe.add_component('generator', OpenAIChatGenerator(model="gpt-4o-mini")) 201 pipe.add_component('validator', JsonSchemaValidator(json_schema=person_schema)) 202 pipe.add_component('adapter', OutputAdapter("{{chat_message}}", list[ChatMessage], unsafe=True)) 203 204 # And connect them 205 pipe.connect("adapter", "joiner") 206 pipe.connect("joiner", "generator") 207 pipe.connect("generator.replies", "validator.messages") 208 pipe.connect("validator.validation_error", "joiner") 209 210 result = pipe.run( 211 data={ 212 "generator": {"generation_kwargs": {"response_format": {"type": "json_object"}}}, 213 "adapter": {"chat_message": [ChatMessage.from_user("Create json from Peter Parker")]}} 214 ) 215 216 print(json.loads(result["validator"]["validated"][0].text)) 217 218 219 >> {'first_name': 'Peter', 'last_name': 'Parker', 'nationality': 'American', 'name': 'Spider-Man', 'occupation': 220 >> 'Superhero', 'age': 23, 'location': 'New York City'} 221 ``` 222 223 Note that `BranchJoiner` can manage only one data type at a time. In this case, `BranchJoiner` is created for 224 passing `list[ChatMessage]`. This determines the type of data that `BranchJoiner` will receive from the upstream 225 connected components and also the type of data that `BranchJoiner` will send through its output. 226 227 In the code example, `BranchJoiner` receives a looped back `list[ChatMessage]` from the `JsonSchemaValidator` and 228 sends it down to the `OpenAIChatGenerator` for re-generation. We can have multiple loopback connections in the 229 pipeline. In this instance, the downstream component is only one (the `OpenAIChatGenerator`), but the pipeline could 230 have more than one downstream component. 231 232 <a id="branch.BranchJoiner.__init__"></a> 233 234 #### BranchJoiner.\_\_init\_\_ 235 236 ```python 237 def __init__(type_: type) 238 ``` 239 240 Creates a `BranchJoiner` component. 241 242 **Arguments**: 243 244 - `type_`: The expected data type of inputs and outputs. 245 246 <a id="branch.BranchJoiner.to_dict"></a> 247 248 #### BranchJoiner.to\_dict 249 250 ```python 251 def to_dict() -> dict[str, Any] 252 ``` 253 254 Serializes the component into a dictionary. 255 256 **Returns**: 257 258 Dictionary with serialized data. 259 260 <a id="branch.BranchJoiner.from_dict"></a> 261 262 #### BranchJoiner.from\_dict 263 264 ```python 265 @classmethod 266 def from_dict(cls, data: dict[str, Any]) -> "BranchJoiner" 267 ``` 268 269 Deserializes a `BranchJoiner` instance from a dictionary. 270 271 **Arguments**: 272 273 - `data`: The dictionary containing serialized component data. 274 275 **Returns**: 276 277 A deserialized `BranchJoiner` instance. 278 279 <a id="branch.BranchJoiner.run"></a> 280 281 #### BranchJoiner.run 282 283 ```python 284 def run(**kwargs) -> dict[str, Any] 285 ``` 286 287 Executes the `BranchJoiner`, selecting the first available input value and passing it downstream. 288 289 **Arguments**: 290 291 - `**kwargs`: The input data. Must be of the type declared by `type_` during initialization. 292 293 **Returns**: 294 295 A dictionary with a single key `value`, containing the first input received. 296 297 <a id="document_joiner"></a> 298 299 # Module document\_joiner 300 301 <a id="document_joiner.JoinMode"></a> 302 303 ## JoinMode 304 305 Enum for join mode. 306 307 <a id="document_joiner.JoinMode.from_str"></a> 308 309 #### JoinMode.from\_str 310 311 ```python 312 @staticmethod 313 def from_str(string: str) -> "JoinMode" 314 ``` 315 316 Convert a string to a JoinMode enum. 317 318 <a id="document_joiner.DocumentJoiner"></a> 319 320 ## DocumentJoiner 321 322 Joins multiple lists of documents into a single list. 323 324 It supports different join modes: 325 - concatenate: Keeps the highest-scored document in case of duplicates. 326 - merge: Calculates a weighted sum of scores for duplicates and merges them. 327 - reciprocal_rank_fusion: Merges and assigns scores based on reciprocal rank fusion. 328 - distribution_based_rank_fusion: Merges and assigns scores based on scores distribution in each Retriever. 329 330 ### Usage example: 331 332 ```python 333 from haystack import Pipeline, Document 334 from haystack.components.embedders import SentenceTransformersTextEmbedder, SentenceTransformersDocumentEmbedder 335 from haystack.components.joiners import DocumentJoiner 336 from haystack.components.retrievers import InMemoryBM25Retriever 337 from haystack.components.retrievers import InMemoryEmbeddingRetriever 338 from haystack.document_stores.in_memory import InMemoryDocumentStore 339 340 document_store = InMemoryDocumentStore() 341 docs = [Document(content="Paris"), Document(content="Berlin"), Document(content="London")] 342 embedder = SentenceTransformersDocumentEmbedder(model="sentence-transformers/all-MiniLM-L6-v2") 343 embedder.warm_up() 344 docs_embeddings = embedder.run(docs) 345 document_store.write_documents(docs_embeddings['documents']) 346 347 p = Pipeline() 348 p.add_component(instance=InMemoryBM25Retriever(document_store=document_store), name="bm25_retriever") 349 p.add_component( 350 instance=SentenceTransformersTextEmbedder(model="sentence-transformers/all-MiniLM-L6-v2"), 351 name="text_embedder", 352 ) 353 p.add_component(instance=InMemoryEmbeddingRetriever(document_store=document_store), name="embedding_retriever") 354 p.add_component(instance=DocumentJoiner(), name="joiner") 355 p.connect("bm25_retriever", "joiner") 356 p.connect("embedding_retriever", "joiner") 357 p.connect("text_embedder", "embedding_retriever") 358 query = "What is the capital of France?" 359 p.run(data={"query": query, "text": query, "top_k": 1}) 360 ``` 361 362 <a id="document_joiner.DocumentJoiner.__init__"></a> 363 364 #### DocumentJoiner.\_\_init\_\_ 365 366 ```python 367 def __init__(join_mode: Union[str, JoinMode] = JoinMode.CONCATENATE, 368 weights: Optional[list[float]] = None, 369 top_k: Optional[int] = None, 370 sort_by_score: bool = True) 371 ``` 372 373 Creates a DocumentJoiner component. 374 375 **Arguments**: 376 377 - `join_mode`: Specifies the join mode to use. Available modes: 378 - `concatenate`: Keeps the highest-scored document in case of duplicates. 379 - `merge`: Calculates a weighted sum of scores for duplicates and merges them. 380 - `reciprocal_rank_fusion`: Merges and assigns scores based on reciprocal rank fusion. 381 - `distribution_based_rank_fusion`: Merges and assigns scores based on scores 382 distribution in each Retriever. 383 - `weights`: Assign importance to each list of documents to influence how they're joined. 384 This parameter is ignored for 385 `concatenate` or `distribution_based_rank_fusion` join modes. 386 Weight for each list of documents must match the number of inputs. 387 - `top_k`: The maximum number of documents to return. 388 - `sort_by_score`: If `True`, sorts the documents by score in descending order. 389 If a document has no score, it is handled as if its score is -infinity. 390 391 <a id="document_joiner.DocumentJoiner.run"></a> 392 393 #### DocumentJoiner.run 394 395 ```python 396 @component.output_types(documents=list[Document]) 397 def run(documents: Variadic[list[Document]], top_k: Optional[int] = None) 398 ``` 399 400 Joins multiple lists of Documents into a single list depending on the `join_mode` parameter. 401 402 **Arguments**: 403 404 - `documents`: List of list of documents to be merged. 405 - `top_k`: The maximum number of documents to return. Overrides the instance's `top_k` if provided. 406 407 **Returns**: 408 409 A dictionary with the following keys: 410 - `documents`: Merged list of Documents 411 412 <a id="document_joiner.DocumentJoiner.to_dict"></a> 413 414 #### DocumentJoiner.to\_dict 415 416 ```python 417 def to_dict() -> dict[str, Any] 418 ``` 419 420 Serializes the component to a dictionary. 421 422 **Returns**: 423 424 Dictionary with serialized data. 425 426 <a id="document_joiner.DocumentJoiner.from_dict"></a> 427 428 #### DocumentJoiner.from\_dict 429 430 ```python 431 @classmethod 432 def from_dict(cls, data: dict[str, Any]) -> "DocumentJoiner" 433 ``` 434 435 Deserializes the component from a dictionary. 436 437 **Arguments**: 438 439 - `data`: The dictionary to deserialize from. 440 441 **Returns**: 442 443 The deserialized component. 444 445 <a id="list_joiner"></a> 446 447 # Module list\_joiner 448 449 <a id="list_joiner.ListJoiner"></a> 450 451 ## ListJoiner 452 453 A component that joins multiple lists into a single flat list. 454 455 The ListJoiner receives multiple lists of the same type and concatenates them into a single flat list. 456 The output order respects the pipeline's execution sequence, with earlier inputs being added first. 457 458 Usage example: 459 ```python 460 from haystack.components.builders import ChatPromptBuilder 461 from haystack.components.generators.chat import OpenAIChatGenerator 462 from haystack.dataclasses import ChatMessage 463 from haystack import Pipeline 464 from haystack.components.joiners import ListJoiner 465 466 467 user_message = [ChatMessage.from_user("Give a brief answer the following question: {{query}}")] 468 469 feedback_prompt = """ 470 You are given a question and an answer. 471 Your task is to provide a score and a brief feedback on the answer. 472 Question: {{query}} 473 Answer: {{response}} 474 """ 475 feedback_message = [ChatMessage.from_system(feedback_prompt)] 476 477 prompt_builder = ChatPromptBuilder(template=user_message) 478 feedback_prompt_builder = ChatPromptBuilder(template=feedback_message) 479 llm = OpenAIChatGenerator(model="gpt-4o-mini") 480 feedback_llm = OpenAIChatGenerator(model="gpt-4o-mini") 481 482 pipe = Pipeline() 483 pipe.add_component("prompt_builder", prompt_builder) 484 pipe.add_component("llm", llm) 485 pipe.add_component("feedback_prompt_builder", feedback_prompt_builder) 486 pipe.add_component("feedback_llm", feedback_llm) 487 pipe.add_component("list_joiner", ListJoiner(list[ChatMessage])) 488 489 pipe.connect("prompt_builder.prompt", "llm.messages") 490 pipe.connect("prompt_builder.prompt", "list_joiner") 491 pipe.connect("llm.replies", "list_joiner") 492 pipe.connect("llm.replies", "feedback_prompt_builder.response") 493 pipe.connect("feedback_prompt_builder.prompt", "feedback_llm.messages") 494 pipe.connect("feedback_llm.replies", "list_joiner") 495 496 query = "What is nuclear physics?" 497 ans = pipe.run(data={"prompt_builder": {"template_variables":{"query": query}}, 498 "feedback_prompt_builder": {"template_variables":{"query": query}}}) 499 500 print(ans["list_joiner"]["values"]) 501 ``` 502 503 <a id="list_joiner.ListJoiner.__init__"></a> 504 505 #### ListJoiner.\_\_init\_\_ 506 507 ```python 508 def __init__(list_type_: Optional[type] = None) 509 ``` 510 511 Creates a ListJoiner component. 512 513 **Arguments**: 514 515 - `list_type_`: The expected type of the lists this component will join (e.g., list[ChatMessage]). 516 If specified, all input lists must conform to this type. If None, the component defaults to handling 517 lists of any type including mixed types. 518 519 <a id="list_joiner.ListJoiner.to_dict"></a> 520 521 #### ListJoiner.to\_dict 522 523 ```python 524 def to_dict() -> dict[str, Any] 525 ``` 526 527 Serializes the component to a dictionary. 528 529 **Returns**: 530 531 Dictionary with serialized data. 532 533 <a id="list_joiner.ListJoiner.from_dict"></a> 534 535 #### ListJoiner.from\_dict 536 537 ```python 538 @classmethod 539 def from_dict(cls, data: dict[str, Any]) -> "ListJoiner" 540 ``` 541 542 Deserializes the component from a dictionary. 543 544 **Arguments**: 545 546 - `data`: Dictionary to deserialize from. 547 548 **Returns**: 549 550 Deserialized component. 551 552 <a id="list_joiner.ListJoiner.run"></a> 553 554 #### ListJoiner.run 555 556 ```python 557 def run(values: Variadic[list[Any]]) -> dict[str, list[Any]] 558 ``` 559 560 Joins multiple lists into a single flat list. 561 562 **Arguments**: 563 564 - `values`: The list to be joined. 565 566 **Returns**: 567 568 Dictionary with 'values' key containing the joined list. 569 570 <a id="string_joiner"></a> 571 572 # Module string\_joiner 573 574 <a id="string_joiner.StringJoiner"></a> 575 576 ## StringJoiner 577 578 Component to join strings from different components to a list of strings. 579 580 ### Usage example 581 582 ```python 583 from haystack.components.joiners import StringJoiner 584 from haystack.components.builders import PromptBuilder 585 from haystack.core.pipeline import Pipeline 586 587 from haystack.components.generators.chat import OpenAIChatGenerator 588 from haystack.dataclasses import ChatMessage 589 590 string_1 = "What's Natural Language Processing?" 591 string_2 = "What is life?" 592 593 pipeline = Pipeline() 594 pipeline.add_component("prompt_builder_1", PromptBuilder("Builder 1: {{query}}")) 595 pipeline.add_component("prompt_builder_2", PromptBuilder("Builder 2: {{query}}")) 596 pipeline.add_component("string_joiner", StringJoiner()) 597 598 pipeline.connect("prompt_builder_1.prompt", "string_joiner.strings") 599 pipeline.connect("prompt_builder_2.prompt", "string_joiner.strings") 600 601 print(pipeline.run(data={"prompt_builder_1": {"query": string_1}, "prompt_builder_2": {"query": string_2}})) 602 603 >> {"string_joiner": {"strings": ["Builder 1: What's Natural Language Processing?", "Builder 2: What is life?"]}} 604 ``` 605 606 <a id="string_joiner.StringJoiner.run"></a> 607 608 #### StringJoiner.run 609 610 ```python 611 @component.output_types(strings=list[str]) 612 def run(strings: Variadic[str]) 613 ``` 614 615 Joins strings into a list of strings 616 617 **Arguments**: 618 619 - `strings`: strings from different components 620 621 **Returns**: 622 623 A dictionary with the following keys: 624 - `strings`: Merged list of strings