branchjoiner.mdx
1 --- 2 title: "BranchJoiner" 3 id: branchjoiner 4 slug: "/branchjoiner" 5 description: "Use this component to join different branches of a pipeline into a single output." 6 --- 7 8 import ClickableImage from "@site/src/components/ClickableImage"; 9 10 # BranchJoiner 11 12 Use this component to join different branches of a pipeline into a single output. 13 14 <div className="key-value-table"> 15 16 | | | 17 | --- | --- | 18 | **Most common position in a pipeline** | Flexible: Can appear at the beginning of a pipeline or at the start of loops. | 19 | **Mandatory init variables** | `type`: The type of data expected from preceding components | 20 | **Mandatory run variables** | `**kwargs`: Any input data type defined at the initialization. This input is variadic, meaning you can connect a variable number of components to it. | 21 | **Output variables** | `value`: The first value received from the connected components. | 22 | **API reference** | [Joiners](/reference/joiners-api) | 23 | **GitHub link** | https://github.com/deepset-ai/haystack/blob/main/haystack/components/joiners/branch.py | 24 25 </div> 26 27 ## Overview 28 29 `BranchJoiner` joins multiple branches in a pipeline, allowing their outputs to be reconciled into a single branch. This is especially useful in pipelines with multiple branches that need to be unified before moving to the single component that comes next. 30 31 `BranchJoiner` receives multiple data connections of the same type from other components and passes the first value it receives to its single output. This makes it essential for closing loops in pipelines or reconciling multiple branches from a decision component. 32 33 `BranchJoiner` can handle only one input of one data type, declared in the `__init__` function. It ensures that the data type remains consistent across the pipeline branches. If more than one value is received for the input when `run` is invoked, the component will raise an error: 34 35 ```python 36 from haystack.components.joiners import BranchJoiner 37 38 bj = BranchJoiner(int) 39 bj.run(value=[3, 4, 5]) 40 41 >>> ValueError: BranchJoiner expects only one input, but 3 were received. 42 43 ``` 44 45 ## Usage 46 47 ### On its own 48 49 Although only one input value is allowed at every run, due to its variadic nature `BranchJoiner` still expects a list. As an example: 50 51 ```python 52 from haystack.components.joiners import BranchJoiner 53 54 ## an example where input and output are strings 55 bj = BranchJoiner(str) 56 bj.run(value=["hello"]) 57 >>> {"value" : "hello"} 58 59 ## an example where input and output are integers 60 bj = BranchJoiner(int) 61 bj.run(value=[3]) 62 >>> {"value": 3} 63 ``` 64 65 ### In a pipeline 66 67 #### Enabling loops 68 69 Below is an example where `BranchJoiner` is used for closing a loop. In this example, `BranchJoiner` receives a looped-back list of `ChatMessage` objects from the `JsonSchemaValidator` and sends it down to the `OpenAIChatGenerator` for re-generation. 70 71 ```python 72 import json 73 from typing import List 74 75 from haystack import Pipeline 76 from haystack.components.converters import OutputAdapter 77 from haystack.components.generators.chat import OpenAIChatGenerator 78 from haystack.components.joiners import BranchJoiner 79 from haystack.components.validators import JsonSchemaValidator 80 from haystack.dataclasses import ChatMessage 81 82 person_schema = { 83 "type": "object", 84 "properties": { 85 "first_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"}, 86 "last_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"}, 87 "nationality": { 88 "type": "string", 89 "enum": ["Italian", "Portuguese", "American"], 90 }, 91 }, 92 "required": ["first_name", "last_name", "nationality"], 93 } 94 95 ## Initialize a pipeline 96 pipe = Pipeline() 97 98 ## Add components to the pipeline 99 pipe.add_component("joiner", BranchJoiner(List[ChatMessage])) 100 pipe.add_component("fc_llm", OpenAIChatGenerator(model="gpt-4o-mini")) 101 pipe.add_component("validator", JsonSchemaValidator(json_schema=person_schema)) 102 pipe.add_component( 103 "adapter", 104 OutputAdapter("{{chat_message}}", List[ChatMessage], unsafe=True), 105 ) 106 107 ## Connect components 108 pipe.connect("adapter", "joiner") 109 pipe.connect("joiner", "fc_llm") 110 pipe.connect("fc_llm.replies", "validator.messages") 111 pipe.connect("validator.validation_error", "joiner") 112 113 result = pipe.run( 114 data={ 115 "fc_llm": {"generation_kwargs": {"response_format": {"type": "json_object"}}}, 116 "adapter": { 117 "chat_message": [ 118 ChatMessage.from_user("Create json object from Peter Parker"), 119 ], 120 }, 121 }, 122 ) 123 124 print(json.loads(result["validator"]["validated"][0].text)) 125 126 ## Output: 127 ## {'first_name': 'Peter', 'last_name': 'Parker', 'nationality': 'American', 'name': 'Spider-Man', 'occupation': 128 ## 'Superhero', 'age': 23, 'location': 'New York City'} 129 ``` 130 131 <details> 132 133 <summary>Expand to see the pipeline graph</summary> 134 <ClickableImage src="/img/9dc767d-loop_chart.png" alt="Pipeline flowchart showing a validation loop with adapter, joiner, language model, and validator components forming a cycle until validation succeeds" size="large" /> 135 136 </details> 137 138 #### Reconciling branches 139 140 In this example, the `TextLanguageRouter` component directs the query to one of three language-specific Retrievers. The next component would be a `PromptBuilder`, but we cannot connect multiple Retrievers to a single `PromptBuilder` directly. Instead, we connect all the Retrievers to the `BranchJoiner` component. The `BranchJoiner` then takes the output from the Retriever that was actually called and passes it as a single list of documents to the `PromptBuilder`. The `BranchJoiner` ensures that the pipeline can handle multiple languages seamlessly by consolidating different outputs from the Retrievers into a unified connection for further processing. 141 142 ```python 143 from haystack import Document, Pipeline 144 from haystack.document_stores.in_memory import InMemoryDocumentStore 145 from haystack.components.retrievers.in_memory import InMemoryBM25Retriever 146 from haystack.components.joiners import BranchJoiner 147 from haystack.components.builders import PromptBuilder 148 from haystack.components.generators import OpenAIGenerator 149 from haystack.components.routers import TextLanguageRouter 150 151 prompt_template = """ 152 Answer the question based on the given reviews. 153 Reviews: 154 {% for doc in documents %} 155 {{ doc.content }} 156 {% endfor %} 157 Question: {{ query}} 158 Answer: 159 """ 160 161 documents = [ 162 Document( 163 content="Super appartement. Juste au dessus de plusieurs bars qui ferment très tard. A savoir à l'avance. (Bouchons d'oreilles fournis !)", 164 ), 165 Document( 166 content="El apartamento estaba genial y muy céntrico, todo a mano. Al lado de la librería Lello y De la Torre de los clérigos. Está situado en una zona de marcha, así que si vais en fin de semana , habrá ruido, aunque a nosotros no nos molestaba para dormir", 167 ), 168 Document( 169 content="The keypad with a code is convenient and the location is convenient. Basically everything else, very noisy, wi-fi didn't work, check-in person didn't explain anything about facilities, shower head was broken, there's no cleaning and everything else one may need is charged.", 170 ), 171 Document( 172 content="It is very central and appartement has a nice appearance (even though a lot IKEA stuff), *W A R N I N G** the appartement presents itself as a elegant and as a place to relax, very wrong place to relax - you cannot sleep in this appartement, even the beds are vibrating from the bass of the clubs in the same building - you get ear plugs from the hotel.", 173 ), 174 Document( 175 content="Céntrico. Muy cómodo para moverse y ver Oporto. Edificio con terraza propia en la última planta. Todo reformado y nuevo. The staff brings a great breakfast every morning to the apartment. Solo que se puede escuchar algo de ruido de la street a primeras horas de la noche. Es un zona de ocio nocturno. Pero respetan los horarios.", 176 ), 177 ] 178 179 en_document_store = InMemoryDocumentStore() 180 fr_document_store = InMemoryDocumentStore() 181 es_document_store = InMemoryDocumentStore() 182 183 rag_pipeline = Pipeline() 184 rag_pipeline.add_component( 185 instance=TextLanguageRouter(["en", "fr", "es"]), 186 name="router", 187 ) 188 rag_pipeline.add_component( 189 instance=InMemoryBM25Retriever(document_store=en_document_store), 190 name="en_retriever", 191 ) 192 rag_pipeline.add_component( 193 instance=InMemoryBM25Retriever(document_store=fr_document_store), 194 name="fr_retriever", 195 ) 196 rag_pipeline.add_component( 197 instance=InMemoryBM25Retriever(document_store=es_document_store), 198 name="es_retriever", 199 ) 200 rag_pipeline.add_component(instance=BranchJoiner(type_=list[Document]), name="joiner") 201 rag_pipeline.add_component( 202 instance=PromptBuilder(template=prompt_template), 203 name="prompt_builder", 204 ) 205 rag_pipeline.add_component(instance=OpenAIGenerator(), name="llm") 206 207 rag_pipeline.connect("router.en", "en_retriever.query") 208 rag_pipeline.connect("router.fr", "fr_retriever.query") 209 rag_pipeline.connect("router.es", "es_retriever.query") 210 rag_pipeline.connect("en_retriever", "joiner") 211 rag_pipeline.connect("fr_retriever", "joiner") 212 rag_pipeline.connect("es_retriever", "joiner") 213 rag_pipeline.connect("joiner", "prompt_builder.documents") 214 rag_pipeline.connect("prompt_builder", "llm") 215 216 en_question = "Does this apartment has a noise problem?" 217 218 result = rag_pipeline.run( 219 {"router": {"text": en_question}, "prompt_builder": {"query": en_question}}, 220 ) 221 222 print(result["llm"]["replies"][0]) 223 ``` 224 225 <details> 226 227 <summary>Expand to see the pipeline graph</summary> 228 <ClickableImage src="/img/6da5ddd-join_chart.png" alt="Pipeline flowchart demonstrating BranchJoiner reconciling outputs from three language-specific retrievers into a single prompt builder" /> 229 230 </details>