branchjoiner.mdx
1 --- 2 title: "BranchJoiner" 3 id: branchjoiner 4 slug: "/branchjoiner" 5 description: "Use this component to join different branches of a pipeline into a single output." 6 --- 7 8 import ClickableImage from "@site/src/components/ClickableImage"; 9 10 # BranchJoiner 11 12 Use this component to join different branches of a pipeline into a single output. 13 14 | | | 15 | :------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------ | 16 | **Most common position in a pipeline** | Flexible: Can appear at the beginning of a pipeline or at the start of loops. | 17 | **Mandatory init variables** | "type": The type of data expected from preceding components | 18 | **Mandatory run variables** | “\*\*kwargs”: Any input data type defined at the initialization. This input is variadic, meaning you can connect a variable number of components to it. | 19 | **Output variables** | “value”: The first value received from the connected components. | 20 | **API reference** | [Joiners](/reference/joiners-api) | 21 | **GitHub link** | https://github.com/deepset-ai/haystack/blob/main/haystack/components/joiners/branch.py | 22 23 ## Overview 24 25 `BranchJoiner` joins multiple branches in a pipeline, allowing their outputs to be reconciled into a single branch. This is especially useful in pipelines with multiple branches that need to be unified before moving to the single component that comes next. 26 27 `BranchJoiner` receives multiple data connections of the same type from other components and passes the first value it receives to its single output. This makes it essential for closing loops in pipelines or reconciling multiple branches from a decision component. 28 29 `BranchJoiner` can handle only one input of one data type, declared in the `__init__` function. It ensures that the data type remains consistent across the pipeline branches. If more than one value is received for the input when `run` is invoked, the component will raise an error: 30 31 ```python 32 from haystack.components.joiners import BranchJoiner 33 34 bj = BranchJoiner(int) 35 bj.run(value=[3, 4, 5]) 36 ``` 37 38 ## Usage 39 40 ### On its own 41 42 Although only one input value is allowed at every run, due to its variadic nature `BranchJoiner` still expects a list. As an example: 43 44 ```python 45 from haystack.components.joiners import BranchJoiner 46 47 ## an example where input and output are strings 48 bj = BranchJoiner(str) 49 bj.run(value=["hello"]) 50 51 ## an example where input and output are integers 52 bj = BranchJoiner(int) 53 bj.run(value=[3]) 54 ``` 55 56 ### In a pipeline 57 58 #### Enabling loops 59 60 Below is an example where `BranchJoiner` is used for closing a loop. In this example, `BranchJoiner` receives a looped-back list of `ChatMessage` objects from the `JsonSchemaValidator` and sends it down to the `OpenAIChatGenerator` for re-generation. 61 62 ```python 63 import json 64 from typing import List 65 66 from haystack import Pipeline 67 from haystack.components.converters import OutputAdapter 68 from haystack.components.generators.chat import OpenAIChatGenerator 69 from haystack.components.joiners import BranchJoiner 70 from haystack.components.validators import JsonSchemaValidator 71 from haystack.dataclasses import ChatMessage 72 73 person_schema = { 74 "type": "object", 75 "properties": { 76 "first_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"}, 77 "last_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"}, 78 "nationality": { 79 "type": "string", 80 "enum": ["Italian", "Portuguese", "American"], 81 }, 82 }, 83 "required": ["first_name", "last_name", "nationality"], 84 } 85 86 ## Initialize a pipeline 87 pipe = Pipeline() 88 89 ## Add components to the pipeline 90 pipe.add_component("joiner", BranchJoiner(List[ChatMessage])) 91 pipe.add_component("fc_llm", OpenAIChatGenerator(model="gpt-4o-mini")) 92 pipe.add_component("validator", JsonSchemaValidator(json_schema=person_schema)) 93 pipe.add_component( 94 "adapter", 95 OutputAdapter("{{chat_message}}", List[ChatMessage], unsafe=True), 96 ) 97 98 ## Connect components 99 pipe.connect("adapter", "joiner") 100 pipe.connect("joiner", "fc_llm") 101 pipe.connect("fc_llm.replies", "validator.messages") 102 pipe.connect("validator.validation_error", "joiner") 103 104 result = pipe.run( 105 data={ 106 "fc_llm": {"generation_kwargs": {"response_format": {"type": "json_object"}}}, 107 "adapter": { 108 "chat_message": [ 109 ChatMessage.from_user("Create json object from Peter Parker"), 110 ], 111 }, 112 }, 113 ) 114 115 print(json.loads(result["validator"]["validated"][0].text)) 116 117 ## Output: 118 ## {'first_name': 'Peter', 'last_name': 'Parker', 'nationality': 'American', 'name': 'Spider-Man', 'occupation': 119 ## 'Superhero', 'age': 23, 'location': 'New York City'} 120 ``` 121 122 <details> 123 124 <summary>Expand to see the pipeline graph</summary> 125 126 <ClickableImage src="/img/9dc767d-loop_chart.png" alt="Pipeline flowchart showing a validation loop with adapter, joiner, language model, and validator components forming a cycle until validation succeeds" size="large" /> 127 128 </details> 129 130 #### Reconciling branches 131 132 In this example, the `TextLanguageRouter` component directs the query to one of three language-specific Retrievers. The next component would be a `PromptBuilder`, but we cannot connect multiple Retrievers to a single `PromptBuilder` directly. Instead, we connect all the Retrievers to the `BranchJoiner` component. The `BranchJoiner` then takes the output from the Retriever that was actually called and passes it as a single list of documents to the `PromptBuilder`. The `BranchJoiner` ensures that the pipeline can handle multiple languages seamlessly by consolidating different outputs from the Retrievers into a unified connection for further processing. 133 134 ```python 135 from haystack import Document, Pipeline 136 from haystack.document_stores.in_memory import InMemoryDocumentStore 137 from haystack.components.retrievers.in_memory import InMemoryBM25Retriever 138 from haystack.components.joiners import BranchJoiner 139 from haystack.components.builders import PromptBuilder 140 from haystack.components.generators import OpenAIGenerator 141 from haystack.components.routers import TextLanguageRouter 142 143 prompt_template = """ 144 Answer the question based on the given reviews. 145 Reviews: 146 {% for doc in documents %} 147 {{ doc.content }} 148 {% endfor %} 149 Question: {{ query}} 150 Answer: 151 """ 152 153 documents = [ 154 Document( 155 content="Super appartement. Juste au dessus de plusieurs bars qui ferment très tard. A savoir à l'avance. (Bouchons d'oreilles fournis !)", 156 ), 157 Document( 158 content="El apartamento estaba genial y muy céntrico, todo a mano. Al lado de la librería Lello y De la Torre de los clérigos. Está situado en una zona de marcha, así que si vais en fin de semana , habrá ruido, aunque a nosotros no nos molestaba para dormir", 159 ), 160 Document( 161 content="The keypad with a code is convenient and the location is convenient. Basically everything else, very noisy, wi-fi didn't work, check-in person didn't explain anything about facilities, shower head was broken, there's no cleaning and everything else one may need is charged.", 162 ), 163 Document( 164 content="It is very central and appartement has a nice appearance (even though a lot IKEA stuff), *W A R N I N G** the appartement presents itself as a elegant and as a place to relax, very wrong place to relax - you cannot sleep in this appartement, even the beds are vibrating from the bass of the clubs in the same building - you get ear plugs from the hotel.", 165 ), 166 Document( 167 content="Céntrico. Muy cómodo para moverse y ver Oporto. Edificio con terraza propia en la última planta. Todo reformado y nuevo. They traen un estupendo desayuno todas las mañanas al apartamento. Solo que se puede escuchar algo de ruido de la called a primeras horas de la noche. Es un zona de ocio nocturno. Pero respetan los horarios.", 168 ), 169 ] 170 171 en_document_store = InMemoryDocumentStore() 172 fr_document_store = InMemoryDocumentStore() 173 es_document_store = InMemoryDocumentStore() 174 175 rag_pipeline = Pipeline() 176 rag_pipeline.add_component( 177 instance=TextLanguageRouter(["en", "fr", "es"]), 178 name="router", 179 ) 180 rag_pipeline.add_component( 181 instance=InMemoryBM25Retriever(document_store=en_document_store), 182 name="en_retriever", 183 ) 184 rag_pipeline.add_component( 185 instance=InMemoryBM25Retriever(document_store=fr_document_store), 186 name="fr_retriever", 187 ) 188 rag_pipeline.add_component( 189 instance=InMemoryBM25Retriever(document_store=es_document_store), 190 name="es_retriever", 191 ) 192 rag_pipeline.add_component(instance=BranchJoiner(type_=list[Document]), name="joiner") 193 rag_pipeline.add_component( 194 instance=PromptBuilder(template=prompt_template), 195 name="prompt_builder", 196 ) 197 rag_pipeline.add_component(instance=OpenAIGenerator(), name="llm") 198 199 rag_pipeline.connect("router.en", "en_retriever.query") 200 rag_pipeline.connect("router.fr", "fr_retriever.query") 201 rag_pipeline.connect("router.es", "es_retriever.query") 202 rag_pipeline.connect("en_retriever", "joiner") 203 rag_pipeline.connect("fr_retriever", "joiner") 204 rag_pipeline.connect("es_retriever", "joiner") 205 rag_pipeline.connect("joiner", "prompt_builder.documents") 206 rag_pipeline.connect("prompt_builder", "llm") 207 208 en_question = "Does this apartment has a noise problem?" 209 210 result = rag_pipeline.run( 211 {"router": {"text": en_question}, "prompt_builder": {"query": en_question}}, 212 ) 213 214 print(result["llm"]["replies"][0]) 215 ``` 216 217 <details> 218 219 <summary>Expand to see the pipeline graph</summary> 220 221 <ClickableImage src="/img/6da5ddd-join_chart.png" alt="Pipeline flowchart demonstrating BranchJoiner reconciling outputs from three language-specific retrievers into a single prompt builder" /> 222 223 </details>