branchjoiner.mdx
1 --- 2 title: "BranchJoiner" 3 id: branchjoiner 4 slug: "/branchjoiner" 5 description: "Use this component to join different branches of a pipeline into a single output." 6 --- 7 8 import ClickableImage from "@site/src/components/ClickableImage"; 9 10 # BranchJoiner 11 12 Use this component to join different branches of a pipeline into a single output. 13 14 <div className="key-value-table"> 15 16 | | | 17 | --- | --- | 18 | **Most common position in a pipeline** | Flexible: Can appear at the beginning of a pipeline or at the start of loops. | 19 | **Mandatory init variables** | `type`: The type of data expected from preceding components | 20 | **Mandatory run variables** | `**kwargs`: Any input data type defined at the initialization. This input is variadic, meaning you can connect a variable number of components to it. | 21 | **Output variables** | `value`: The first value received from the connected components. | 22 | **API reference** | [Joiners](/reference/joiners-api) | 23 | **GitHub link** | https://github.com/deepset-ai/haystack/blob/main/haystack/components/joiners/branch.py | 24 25 </div> 26 27 ## Overview 28 29 `BranchJoiner` joins multiple branches in a pipeline, allowing their outputs to be reconciled into a single branch. This is especially useful in pipelines with multiple branches that need to be unified before moving to the single component that comes next. 30 31 `BranchJoiner` receives multiple data connections of the same type from other components and passes the first value it receives to its single output. This makes it essential for closing loops in pipelines or reconciling multiple branches from a decision component. 32 33 `BranchJoiner` can handle only one input of one data type, declared in the `__init__` function. It ensures that the data type remains consistent across the pipeline branches. If more than one value is received for the input when `run` is invoked, the component will raise an error: 34 35 ```python 36 from haystack.components.joiners import BranchJoiner 37 38 bj = BranchJoiner(int) 39 bj.run(value=[3, 4, 5]) 40 41 >>> ValueError: BranchJoiner expects only one input, but 3 were received. 42 43 ``` 44 45 ## Usage 46 47 ### On its own 48 49 Although only one input value is allowed at every run, due to its variadic nature `BranchJoiner` still expects a list. As an example: 50 51 ```python 52 from haystack.components.joiners import BranchJoiner 53 54 ## an example where input and output are strings 55 bj = BranchJoiner(str) 56 bj.run(value=["hello"]) 57 >>> {"value" : "hello"} 58 59 ## an example where input and output are integers 60 bj = BranchJoiner(int) 61 bj.run(value=[3]) 62 >>> {"value": 3} 63 ``` 64 65 ### In a pipeline 66 67 #### Enabling loops 68 69 Below is an example where `BranchJoiner` is used for closing a loop. In this example, `BranchJoiner` receives a looped-back list of `ChatMessage` objects from the `JsonSchemaValidator` and sends it down to the `OpenAIChatGenerator` for re-generation. 70 71 ```python 72 import json 73 74 from haystack import Pipeline 75 from haystack.components.generators.chat import OpenAIChatGenerator 76 from haystack.components.joiners import BranchJoiner 77 from haystack.components.validators import JsonSchemaValidator 78 from haystack.dataclasses import ChatMessage 79 80 person_schema = { 81 "type": "object", 82 "properties": { 83 "first_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"}, 84 "last_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"}, 85 "nationality": { 86 "type": "string", 87 "enum": ["Italian", "Portuguese", "American"], 88 }, 89 }, 90 "required": ["first_name", "last_name", "nationality"], 91 } 92 93 ## Initialize a pipeline 94 pipe = Pipeline() 95 96 ## Add components to the pipeline 97 pipe.add_component("joiner", BranchJoiner(list[ChatMessage])) 98 pipe.add_component("fc_llm", OpenAIChatGenerator(model="gpt-4.1-mini")) 99 pipe.add_component("validator", JsonSchemaValidator(json_schema=person_schema)) 100 101 ## Connect components 102 pipe.connect("joiner", "fc_llm") 103 pipe.connect("fc_llm.replies", "validator.messages") 104 pipe.connect("validator.validation_error", "joiner") 105 106 result = pipe.run( 107 data={ 108 "fc_llm": {"generation_kwargs": {"response_format": {"type": "json_object"}}}, 109 "joiner": { 110 "value": [ChatMessage.from_user("Create json object from Peter Parker")], 111 }, 112 }, 113 ) 114 115 print(json.loads(result["validator"]["validated"][0].text)) 116 117 ## Output: 118 ## {'first_name': 'Peter', 'last_name': 'Parker', 'nationality': 'American', 'name': 'Spider-Man', 'occupation': 119 ## 'Superhero', 'age': 23, 'location': 'New York City'} 120 ``` 121 122 <details> 123 124 <summary>Expand to see the pipeline graph</summary> 125 <ClickableImage src="/img/9dc767d-loop_chart.png" alt="Pipeline flowchart showing a validation loop with joiner, language model, and validator components forming a cycle until validation succeeds" size="large" /> 126 127 </details> 128 129 #### Reconciling branches 130 131 In this example, the `TextLanguageRouter` component directs the query to one of three language-specific Retrievers. The next component would be a `PromptBuilder`, but we cannot connect multiple Retrievers to a single `PromptBuilder` directly. Instead, we connect all the Retrievers to the `BranchJoiner` component. The `BranchJoiner` then takes the output from the Retriever that was actually called and passes it as a single list of documents to the `PromptBuilder`. The `BranchJoiner` ensures that the pipeline can handle multiple languages seamlessly by consolidating different outputs from the Retrievers into a unified connection for further processing. 132 133 ```python 134 from haystack import Document, Pipeline 135 from haystack.document_stores.in_memory import InMemoryDocumentStore 136 from haystack.components.retrievers.in_memory import InMemoryBM25Retriever 137 from haystack.components.joiners import BranchJoiner 138 from haystack.components.builders import PromptBuilder 139 from haystack.components.generators import OpenAIGenerator 140 from haystack.components.routers import TextLanguageRouter 141 142 prompt_template = """ 143 Answer the question based on the given reviews. 144 Reviews: 145 {% for doc in documents %} 146 {{ doc.content }} 147 {% endfor %} 148 Question: {{ query}} 149 Answer: 150 """ 151 152 documents = [ 153 Document( 154 content="Super appartement. Juste au dessus de plusieurs bars qui ferment très tard. A savoir à l'avance. (Bouchons d'oreilles fournis !)", 155 ), 156 Document( 157 content="El apartamento estaba genial y muy céntrico, todo a mano. Al lado de la librería Lello y De la Torre de los clérigos. Está situado en una zona de marcha, así que si vais en fin de semana , habrá ruido, aunque a nosotros no nos molestaba para dormir", 158 ), 159 Document( 160 content="The keypad with a code is convenient and the location is convenient. Basically everything else, very noisy, wi-fi didn't work, check-in person didn't explain anything about facilities, shower head was broken, there's no cleaning and everything else one may need is charged.", 161 ), 162 Document( 163 content="It is very central and appartement has a nice appearance (even though a lot IKEA stuff), *W A R N I N G** the appartement presents itself as a elegant and as a place to relax, very wrong place to relax - you cannot sleep in this appartement, even the beds are vibrating from the bass of the clubs in the same building - you get ear plugs from the hotel.", 164 ), 165 Document( 166 content="Céntrico. Muy cómodo para moverse y ver Oporto. Edificio con terraza propia en la última planta. Todo reformado y nuevo. The staff brings a great breakfast every morning to the apartment. Solo que se puede escuchar algo de ruido de la street a primeras horas de la noche. Es un zona de ocio nocturno. Pero respetan los horarios.", 167 ), 168 ] 169 170 en_document_store = InMemoryDocumentStore() 171 fr_document_store = InMemoryDocumentStore() 172 es_document_store = InMemoryDocumentStore() 173 174 rag_pipeline = Pipeline() 175 rag_pipeline.add_component( 176 instance=TextLanguageRouter(["en", "fr", "es"]), 177 name="router", 178 ) 179 rag_pipeline.add_component( 180 instance=InMemoryBM25Retriever(document_store=en_document_store), 181 name="en_retriever", 182 ) 183 rag_pipeline.add_component( 184 instance=InMemoryBM25Retriever(document_store=fr_document_store), 185 name="fr_retriever", 186 ) 187 rag_pipeline.add_component( 188 instance=InMemoryBM25Retriever(document_store=es_document_store), 189 name="es_retriever", 190 ) 191 rag_pipeline.add_component(instance=BranchJoiner(type_=list[Document]), name="joiner") 192 rag_pipeline.add_component( 193 instance=PromptBuilder(template=prompt_template), 194 name="prompt_builder", 195 ) 196 rag_pipeline.add_component(instance=OpenAIGenerator(), name="llm") 197 198 rag_pipeline.connect("router.en", "en_retriever.query") 199 rag_pipeline.connect("router.fr", "fr_retriever.query") 200 rag_pipeline.connect("router.es", "es_retriever.query") 201 rag_pipeline.connect("en_retriever", "joiner") 202 rag_pipeline.connect("fr_retriever", "joiner") 203 rag_pipeline.connect("es_retriever", "joiner") 204 rag_pipeline.connect("joiner", "prompt_builder.documents") 205 rag_pipeline.connect("prompt_builder", "llm") 206 207 en_question = "Does this apartment has a noise problem?" 208 209 result = rag_pipeline.run( 210 {"router": {"text": en_question}, "prompt_builder": {"query": en_question}}, 211 ) 212 213 print(result["llm"]["replies"][0]) 214 ``` 215 216 <details> 217 218 <summary>Expand to see the pipeline graph</summary> 219 <ClickableImage src="/img/6da5ddd-join_chart.png" alt="Pipeline flowchart demonstrating BranchJoiner reconciling outputs from three language-specific retrievers into a single prompt builder" /> 220 221 </details>