joiners_api.md
  1  ---
  2  title: Joiners
  3  id: joiners-api
  4  description: Components that join list of different objects
  5  slug: "/joiners-api"
  6  ---
  7  
  8  <a id="answer_joiner"></a>
  9  
 10  # Module answer\_joiner
 11  
 12  <a id="answer_joiner.JoinMode"></a>
 13  
 14  ## JoinMode
 15  
 16  Enum for AnswerJoiner join modes.
 17  
 18  <a id="answer_joiner.JoinMode.from_str"></a>
 19  
 20  #### JoinMode.from\_str
 21  
 22  ```python
 23  @staticmethod
 24  def from_str(string: str) -> "JoinMode"
 25  ```
 26  
 27  Convert a string to a JoinMode enum.
 28  
 29  <a id="answer_joiner.AnswerJoiner"></a>
 30  
 31  ## AnswerJoiner
 32  
 33  Merges multiple lists of `Answer` objects into a single list.
 34  
 35  Use this component to combine answers from different Generators into a single list.
 36  Currently, the component supports only one join mode: `CONCATENATE`.
 37  This mode concatenates multiple lists of answers into a single list.
 38  
 39  ### Usage example
 40  
 41  In this example, AnswerJoiner merges answers from two different Generators:
 42  
 43  ```python
 44  from haystack.components.builders import AnswerBuilder
 45  from haystack.components.joiners import AnswerJoiner
 46  
 47  from haystack.core.pipeline import Pipeline
 48  
 49  from haystack.components.generators.chat import OpenAIChatGenerator
 50  from haystack.dataclasses import ChatMessage
 51  
 52  
 53  query = "What's Natural Language Processing?"
 54  messages = [ChatMessage.from_system("You are a helpful, respectful and honest assistant. Be super concise."),
 55              ChatMessage.from_user(query)]
 56  
 57  pipe = Pipeline()
 58  pipe.add_component("gpt-4o", OpenAIChatGenerator(model="gpt-4o"))
 59  pipe.add_component("gpt-4o-mini", OpenAIChatGenerator(model="gpt-4o-mini"))
 60  pipe.add_component("aba", AnswerBuilder())
 61  pipe.add_component("abb", AnswerBuilder())
 62  pipe.add_component("joiner", AnswerJoiner())
 63  
 64  pipe.connect("gpt-4o.replies", "aba")
 65  pipe.connect("gpt-4o-mini.replies", "abb")
 66  pipe.connect("aba.answers", "joiner")
 67  pipe.connect("abb.answers", "joiner")
 68  
 69  results = pipe.run(data={"gpt-4o": {"messages": messages},
 70                              "gpt-4o-mini": {"messages": messages},
 71                              "aba": {"query": query},
 72                              "abb": {"query": query}})
 73  ```
 74  
 75  <a id="answer_joiner.AnswerJoiner.__init__"></a>
 76  
 77  #### AnswerJoiner.\_\_init\_\_
 78  
 79  ```python
 80  def __init__(join_mode: Union[str, JoinMode] = JoinMode.CONCATENATE,
 81               top_k: Optional[int] = None,
 82               sort_by_score: bool = False)
 83  ```
 84  
 85  Creates an AnswerJoiner component.
 86  
 87  **Arguments**:
 88  
 89  - `join_mode`: Specifies the join mode to use. Available modes:
 90  - `concatenate`: Concatenates multiple lists of Answers into a single list.
 91  - `top_k`: The maximum number of Answers to return.
 92  - `sort_by_score`: If `True`, sorts the documents by score in descending order.
 93  If a document has no score, it is handled as if its score is -infinity.
 94  
 95  <a id="answer_joiner.AnswerJoiner.run"></a>
 96  
 97  #### AnswerJoiner.run
 98  
 99  ```python
100  @component.output_types(answers=list[AnswerType])
101  def run(answers: Variadic[list[AnswerType]], top_k: Optional[int] = None)
102  ```
103  
104  Joins multiple lists of Answers into a single list depending on the `join_mode` parameter.
105  
106  **Arguments**:
107  
108  - `answers`: Nested list of Answers to be merged.
109  - `top_k`: The maximum number of Answers to return. Overrides the instance's `top_k` if provided.
110  
111  **Returns**:
112  
113  A dictionary with the following keys:
114  - `answers`: Merged list of Answers
115  
116  <a id="answer_joiner.AnswerJoiner.to_dict"></a>
117  
118  #### AnswerJoiner.to\_dict
119  
120  ```python
121  def to_dict() -> dict[str, Any]
122  ```
123  
124  Serializes the component to a dictionary.
125  
126  **Returns**:
127  
128  Dictionary with serialized data.
129  
130  <a id="answer_joiner.AnswerJoiner.from_dict"></a>
131  
132  #### AnswerJoiner.from\_dict
133  
134  ```python
135  @classmethod
136  def from_dict(cls, data: dict[str, Any]) -> "AnswerJoiner"
137  ```
138  
139  Deserializes the component from a dictionary.
140  
141  **Arguments**:
142  
143  - `data`: The dictionary to deserialize from.
144  
145  **Returns**:
146  
147  The deserialized component.
148  
149  <a id="branch"></a>
150  
151  # Module branch
152  
153  <a id="branch.BranchJoiner"></a>
154  
155  ## BranchJoiner
156  
157  A component that merges multiple input branches of a pipeline into a single output stream.
158  
159  `BranchJoiner` receives multiple inputs of the same data type and forwards the first received value
160  to its output. This is useful for scenarios where multiple branches need to converge before proceeding.
161  
162  ### Common Use Cases:
163  - **Loop Handling:** `BranchJoiner` helps close loops in pipelines. For example, if a pipeline component validates
164    or modifies incoming data and produces an error-handling branch, `BranchJoiner` can merge both branches and send
165    (or resend in the case of a loop) the data to the component that evaluates errors. See "Usage example" below.
166  
167  - **Decision-Based Merging:** `BranchJoiner` reconciles branches coming from Router components (such as
168    `ConditionalRouter`, `TextLanguageRouter`). Suppose a `TextLanguageRouter` directs user queries to different
169    Retrievers based on the detected language. Each Retriever processes its assigned query and passes the results
170    to `BranchJoiner`, which consolidates them into a single output before passing them to the next component, such
171    as a `PromptBuilder`.
172  
173  ### Example Usage:
174  ```python
175  import json
176  
177  from haystack import Pipeline
178  from haystack.components.converters import OutputAdapter
179  from haystack.components.generators.chat import OpenAIChatGenerator
180  from haystack.components.joiners import BranchJoiner
181  from haystack.components.validators import JsonSchemaValidator
182  from haystack.dataclasses import ChatMessage
183  
184  # Define a schema for validation
185  person_schema = {
186      "type": "object",
187      "properties": {
188          "first_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"},
189          "last_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"},
190          "nationality": {"type": "string", "enum": ["Italian", "Portuguese", "American"]},
191      },
192      "required": ["first_name", "last_name", "nationality"]
193  }
194  
195  # Initialize a pipeline
196  pipe = Pipeline()
197  
198  # Add components to the pipeline
199  pipe.add_component('joiner', BranchJoiner(list[ChatMessage]))
200  pipe.add_component('generator', OpenAIChatGenerator(model="gpt-4o-mini"))
201  pipe.add_component('validator', JsonSchemaValidator(json_schema=person_schema))
202  pipe.add_component('adapter', OutputAdapter("{{chat_message}}", list[ChatMessage], unsafe=True))
203  
204  # And connect them
205  pipe.connect("adapter", "joiner")
206  pipe.connect("joiner", "generator")
207  pipe.connect("generator.replies", "validator.messages")
208  pipe.connect("validator.validation_error", "joiner")
209  
210  result = pipe.run(
211      data={
212      "generator": {"generation_kwargs": {"response_format": {"type": "json_object"}}},
213      "adapter": {"chat_message": [ChatMessage.from_user("Create json from Peter Parker")]}}
214  )
215  
216  print(json.loads(result["validator"]["validated"][0].text))
217  
218  
219  >> {'first_name': 'Peter', 'last_name': 'Parker', 'nationality': 'American', 'name': 'Spider-Man', 'occupation':
220  >> 'Superhero', 'age': 23, 'location': 'New York City'}
221  ```
222  
223  Note that `BranchJoiner` can manage only one data type at a time. In this case, `BranchJoiner` is created for
224  passing `list[ChatMessage]`. This determines the type of data that `BranchJoiner` will receive from the upstream
225  connected components and also the type of data that `BranchJoiner` will send through its output.
226  
227  In the code example, `BranchJoiner` receives a looped back `list[ChatMessage]` from the `JsonSchemaValidator` and
228  sends it down to the `OpenAIChatGenerator` for re-generation. We can have multiple loopback connections in the
229  pipeline. In this instance, the downstream component is only one (the `OpenAIChatGenerator`), but the pipeline could
230  have more than one downstream component.
231  
232  <a id="branch.BranchJoiner.__init__"></a>
233  
234  #### BranchJoiner.\_\_init\_\_
235  
236  ```python
237  def __init__(type_: type)
238  ```
239  
240  Creates a `BranchJoiner` component.
241  
242  **Arguments**:
243  
244  - `type_`: The expected data type of inputs and outputs.
245  
246  <a id="branch.BranchJoiner.to_dict"></a>
247  
248  #### BranchJoiner.to\_dict
249  
250  ```python
251  def to_dict() -> dict[str, Any]
252  ```
253  
254  Serializes the component into a dictionary.
255  
256  **Returns**:
257  
258  Dictionary with serialized data.
259  
260  <a id="branch.BranchJoiner.from_dict"></a>
261  
262  #### BranchJoiner.from\_dict
263  
264  ```python
265  @classmethod
266  def from_dict(cls, data: dict[str, Any]) -> "BranchJoiner"
267  ```
268  
269  Deserializes a `BranchJoiner` instance from a dictionary.
270  
271  **Arguments**:
272  
273  - `data`: The dictionary containing serialized component data.
274  
275  **Returns**:
276  
277  A deserialized `BranchJoiner` instance.
278  
279  <a id="branch.BranchJoiner.run"></a>
280  
281  #### BranchJoiner.run
282  
283  ```python
284  def run(**kwargs) -> dict[str, Any]
285  ```
286  
287  Executes the `BranchJoiner`, selecting the first available input value and passing it downstream.
288  
289  **Arguments**:
290  
291  - `**kwargs`: The input data. Must be of the type declared by `type_` during initialization.
292  
293  **Returns**:
294  
295  A dictionary with a single key `value`, containing the first input received.
296  
297  <a id="document_joiner"></a>
298  
299  # Module document\_joiner
300  
301  <a id="document_joiner.JoinMode"></a>
302  
303  ## JoinMode
304  
305  Enum for join mode.
306  
307  <a id="document_joiner.JoinMode.from_str"></a>
308  
309  #### JoinMode.from\_str
310  
311  ```python
312  @staticmethod
313  def from_str(string: str) -> "JoinMode"
314  ```
315  
316  Convert a string to a JoinMode enum.
317  
318  <a id="document_joiner.DocumentJoiner"></a>
319  
320  ## DocumentJoiner
321  
322  Joins multiple lists of documents into a single list.
323  
324  It supports different join modes:
325  - concatenate: Keeps the highest-scored document in case of duplicates.
326  - merge: Calculates a weighted sum of scores for duplicates and merges them.
327  - reciprocal_rank_fusion: Merges and assigns scores based on reciprocal rank fusion.
328  - distribution_based_rank_fusion: Merges and assigns scores based on scores distribution in each Retriever.
329  
330  ### Usage example:
331  
332  ```python
333  from haystack import Pipeline, Document
334  from haystack.components.embedders import SentenceTransformersTextEmbedder, SentenceTransformersDocumentEmbedder
335  from haystack.components.joiners import DocumentJoiner
336  from haystack.components.retrievers import InMemoryBM25Retriever
337  from haystack.components.retrievers import InMemoryEmbeddingRetriever
338  from haystack.document_stores.in_memory import InMemoryDocumentStore
339  
340  document_store = InMemoryDocumentStore()
341  docs = [Document(content="Paris"), Document(content="Berlin"), Document(content="London")]
342  embedder = SentenceTransformersDocumentEmbedder(model="sentence-transformers/all-MiniLM-L6-v2")
343  embedder.warm_up()
344  docs_embeddings = embedder.run(docs)
345  document_store.write_documents(docs_embeddings['documents'])
346  
347  p = Pipeline()
348  p.add_component(instance=InMemoryBM25Retriever(document_store=document_store), name="bm25_retriever")
349  p.add_component(
350          instance=SentenceTransformersTextEmbedder(model="sentence-transformers/all-MiniLM-L6-v2"),
351          name="text_embedder",
352      )
353  p.add_component(instance=InMemoryEmbeddingRetriever(document_store=document_store), name="embedding_retriever")
354  p.add_component(instance=DocumentJoiner(), name="joiner")
355  p.connect("bm25_retriever", "joiner")
356  p.connect("embedding_retriever", "joiner")
357  p.connect("text_embedder", "embedding_retriever")
358  query = "What is the capital of France?"
359  p.run(data={"query": query, "text": query, "top_k": 1})
360  ```
361  
362  <a id="document_joiner.DocumentJoiner.__init__"></a>
363  
364  #### DocumentJoiner.\_\_init\_\_
365  
366  ```python
367  def __init__(join_mode: Union[str, JoinMode] = JoinMode.CONCATENATE,
368               weights: Optional[list[float]] = None,
369               top_k: Optional[int] = None,
370               sort_by_score: bool = True)
371  ```
372  
373  Creates a DocumentJoiner component.
374  
375  **Arguments**:
376  
377  - `join_mode`: Specifies the join mode to use. Available modes:
378  - `concatenate`: Keeps the highest-scored document in case of duplicates.
379  - `merge`: Calculates a weighted sum of scores for duplicates and merges them.
380  - `reciprocal_rank_fusion`: Merges and assigns scores based on reciprocal rank fusion.
381  - `distribution_based_rank_fusion`: Merges and assigns scores based on scores
382  distribution in each Retriever.
383  - `weights`: Assign importance to each list of documents to influence how they're joined.
384  This parameter is ignored for
385  `concatenate` or `distribution_based_rank_fusion` join modes.
386  Weight for each list of documents must match the number of inputs.
387  - `top_k`: The maximum number of documents to return.
388  - `sort_by_score`: If `True`, sorts the documents by score in descending order.
389  If a document has no score, it is handled as if its score is -infinity.
390  
391  <a id="document_joiner.DocumentJoiner.run"></a>
392  
393  #### DocumentJoiner.run
394  
395  ```python
396  @component.output_types(documents=list[Document])
397  def run(documents: Variadic[list[Document]], top_k: Optional[int] = None)
398  ```
399  
400  Joins multiple lists of Documents into a single list depending on the `join_mode` parameter.
401  
402  **Arguments**:
403  
404  - `documents`: List of list of documents to be merged.
405  - `top_k`: The maximum number of documents to return. Overrides the instance's `top_k` if provided.
406  
407  **Returns**:
408  
409  A dictionary with the following keys:
410  - `documents`: Merged list of Documents
411  
412  <a id="document_joiner.DocumentJoiner.to_dict"></a>
413  
414  #### DocumentJoiner.to\_dict
415  
416  ```python
417  def to_dict() -> dict[str, Any]
418  ```
419  
420  Serializes the component to a dictionary.
421  
422  **Returns**:
423  
424  Dictionary with serialized data.
425  
426  <a id="document_joiner.DocumentJoiner.from_dict"></a>
427  
428  #### DocumentJoiner.from\_dict
429  
430  ```python
431  @classmethod
432  def from_dict(cls, data: dict[str, Any]) -> "DocumentJoiner"
433  ```
434  
435  Deserializes the component from a dictionary.
436  
437  **Arguments**:
438  
439  - `data`: The dictionary to deserialize from.
440  
441  **Returns**:
442  
443  The deserialized component.
444  
445  <a id="list_joiner"></a>
446  
447  # Module list\_joiner
448  
449  <a id="list_joiner.ListJoiner"></a>
450  
451  ## ListJoiner
452  
453  A component that joins multiple lists into a single flat list.
454  
455  The ListJoiner receives multiple lists of the same type and concatenates them into a single flat list.
456  The output order respects the pipeline's execution sequence, with earlier inputs being added first.
457  
458  Usage example:
459  ```python
460  from haystack.components.builders import ChatPromptBuilder
461  from haystack.components.generators.chat import OpenAIChatGenerator
462  from haystack.dataclasses import ChatMessage
463  from haystack import Pipeline
464  from haystack.components.joiners import ListJoiner
465  
466  
467  user_message = [ChatMessage.from_user("Give a brief answer the following question: {{query}}")]
468  
469  feedback_prompt = """
470      You are given a question and an answer.
471      Your task is to provide a score and a brief feedback on the answer.
472      Question: {{query}}
473      Answer: {{response}}
474      """
475  feedback_message = [ChatMessage.from_system(feedback_prompt)]
476  
477  prompt_builder = ChatPromptBuilder(template=user_message)
478  feedback_prompt_builder = ChatPromptBuilder(template=feedback_message)
479  llm = OpenAIChatGenerator(model="gpt-4o-mini")
480  feedback_llm = OpenAIChatGenerator(model="gpt-4o-mini")
481  
482  pipe = Pipeline()
483  pipe.add_component("prompt_builder", prompt_builder)
484  pipe.add_component("llm", llm)
485  pipe.add_component("feedback_prompt_builder", feedback_prompt_builder)
486  pipe.add_component("feedback_llm", feedback_llm)
487  pipe.add_component("list_joiner", ListJoiner(list[ChatMessage]))
488  
489  pipe.connect("prompt_builder.prompt", "llm.messages")
490  pipe.connect("prompt_builder.prompt", "list_joiner")
491  pipe.connect("llm.replies", "list_joiner")
492  pipe.connect("llm.replies", "feedback_prompt_builder.response")
493  pipe.connect("feedback_prompt_builder.prompt", "feedback_llm.messages")
494  pipe.connect("feedback_llm.replies", "list_joiner")
495  
496  query = "What is nuclear physics?"
497  ans = pipe.run(data={"prompt_builder": {"template_variables":{"query": query}},
498      "feedback_prompt_builder": {"template_variables":{"query": query}}})
499  
500  print(ans["list_joiner"]["values"])
501  ```
502  
503  <a id="list_joiner.ListJoiner.__init__"></a>
504  
505  #### ListJoiner.\_\_init\_\_
506  
507  ```python
508  def __init__(list_type_: Optional[type] = None)
509  ```
510  
511  Creates a ListJoiner component.
512  
513  **Arguments**:
514  
515  - `list_type_`: The expected type of the lists this component will join (e.g., list[ChatMessage]).
516  If specified, all input lists must conform to this type. If None, the component defaults to handling
517  lists of any type including mixed types.
518  
519  <a id="list_joiner.ListJoiner.to_dict"></a>
520  
521  #### ListJoiner.to\_dict
522  
523  ```python
524  def to_dict() -> dict[str, Any]
525  ```
526  
527  Serializes the component to a dictionary.
528  
529  **Returns**:
530  
531  Dictionary with serialized data.
532  
533  <a id="list_joiner.ListJoiner.from_dict"></a>
534  
535  #### ListJoiner.from\_dict
536  
537  ```python
538  @classmethod
539  def from_dict(cls, data: dict[str, Any]) -> "ListJoiner"
540  ```
541  
542  Deserializes the component from a dictionary.
543  
544  **Arguments**:
545  
546  - `data`: Dictionary to deserialize from.
547  
548  **Returns**:
549  
550  Deserialized component.
551  
552  <a id="list_joiner.ListJoiner.run"></a>
553  
554  #### ListJoiner.run
555  
556  ```python
557  def run(values: Variadic[list[Any]]) -> dict[str, list[Any]]
558  ```
559  
560  Joins multiple lists into a single flat list.
561  
562  **Arguments**:
563  
564  - `values`: The list to be joined.
565  
566  **Returns**:
567  
568  Dictionary with 'values' key containing the joined list.
569  
570  <a id="string_joiner"></a>
571  
572  # Module string\_joiner
573  
574  <a id="string_joiner.StringJoiner"></a>
575  
576  ## StringJoiner
577  
578  Component to join strings from different components to a list of strings.
579  
580  ### Usage example
581  
582  ```python
583  from haystack.components.joiners import StringJoiner
584  from haystack.components.builders import PromptBuilder
585  from haystack.core.pipeline import Pipeline
586  
587  from haystack.components.generators.chat import OpenAIChatGenerator
588  from haystack.dataclasses import ChatMessage
589  
590  string_1 = "What's Natural Language Processing?"
591  string_2 = "What is life?"
592  
593  pipeline = Pipeline()
594  pipeline.add_component("prompt_builder_1", PromptBuilder("Builder 1: {{query}}"))
595  pipeline.add_component("prompt_builder_2", PromptBuilder("Builder 2: {{query}}"))
596  pipeline.add_component("string_joiner", StringJoiner())
597  
598  pipeline.connect("prompt_builder_1.prompt", "string_joiner.strings")
599  pipeline.connect("prompt_builder_2.prompt", "string_joiner.strings")
600  
601  print(pipeline.run(data={"prompt_builder_1": {"query": string_1}, "prompt_builder_2": {"query": string_2}}))
602  
603  >> {"string_joiner": {"strings": ["Builder 1: What's Natural Language Processing?", "Builder 2: What is life?"]}}
604  ```
605  
606  <a id="string_joiner.StringJoiner.run"></a>
607  
608  #### StringJoiner.run
609  
610  ```python
611  @component.output_types(strings=list[str])
612  def run(strings: Variadic[str])
613  ```
614  
615  Joins strings into a list of strings
616  
617  **Arguments**:
618  
619  - `strings`: strings from different components
620  
621  **Returns**:
622  
623  A dictionary with the following keys:
624  - `strings`: Merged list of strings