pipeline_api.md
  1  ---
  2  title: "Pipeline"
  3  id: pipeline-api
  4  description: "Arranges components and integrations in flow."
  5  slug: "/pipeline-api"
  6  ---
  7  
  8  
  9  ## async_pipeline
 10  
 11  ### AsyncPipeline
 12  
 13  Bases: <code>PipelineBase</code>
 14  
 15  Asynchronous version of the Pipeline orchestration engine.
 16  
 17  Manages components in a pipeline allowing for concurrent processing when the pipeline's execution graph permits.
 18  This enables efficient processing of components by minimizing idle time and maximizing resource utilization.
 19  
 20  #### run_async_generator
 21  
 22  ```python
 23  run_async_generator(
 24      data: dict[str, Any],
 25      include_outputs_from: set[str] | None = None,
 26      concurrency_limit: int = 4,
 27  ) -> AsyncIterator[dict[str, Any]]
 28  ```
 29  
 30  Executes the pipeline step by step asynchronously, yielding partial outputs when any component finishes.
 31  
 32  Usage:
 33  
 34  ```python
 35  from haystack import Document
 36  from haystack.components.builders import ChatPromptBuilder
 37  from haystack.dataclasses import ChatMessage
 38  from haystack.utils import Secret
 39  from haystack.document_stores.in_memory import InMemoryDocumentStore
 40  from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
 41  from haystack.components.generators.chat import OpenAIChatGenerator
 42  from haystack.components.builders.prompt_builder import PromptBuilder
 43  from haystack import AsyncPipeline
 44  import asyncio
 45  
 46  # Write documents to InMemoryDocumentStore
 47  document_store = InMemoryDocumentStore()
 48  document_store.write_documents([
 49      Document(content="My name is Jean and I live in Paris."),
 50      Document(content="My name is Mark and I live in Berlin."),
 51      Document(content="My name is Giorgio and I live in Rome.")
 52  ])
 53  
 54  prompt_template = [
 55      ChatMessage.from_user(
 56          '''
 57          Given these documents, answer the question.
 58          Documents:
 59          {% for doc in documents %}
 60              {{ doc.content }}
 61          {% endfor %}
 62          Question: {{question}}
 63          Answer:
 64          ''')
 65  ]
 66  
 67  # Create and connect pipeline components
 68  retriever = InMemoryBM25Retriever(document_store=document_store)
 69  prompt_builder = ChatPromptBuilder(template=prompt_template)
 70  llm = OpenAIChatGenerator()
 71  
 72  rag_pipeline = AsyncPipeline()
 73  rag_pipeline.add_component("retriever", retriever)
 74  rag_pipeline.add_component("prompt_builder", prompt_builder)
 75  rag_pipeline.add_component("llm", llm)
 76  rag_pipeline.connect("retriever", "prompt_builder.documents")
 77  rag_pipeline.connect("prompt_builder", "llm")
 78  
 79  # Prepare input data
 80  question = "Who lives in Paris?"
 81  data = {
 82      "retriever": {"query": question},
 83      "prompt_builder": {"question": question},
 84  }
 85  
 86  
 87  # Process results as they become available
 88  async def process_results():
 89      async for partial_output in rag_pipeline.run_async_generator(
 90              data=data,
 91              include_outputs_from={"retriever", "llm"}
 92      ):
 93          # Each partial_output contains the results from a completed component
 94          if "retriever" in partial_output:
 95              print("Retrieved documents:", len(partial_output["retriever"]["documents"]))
 96          if "llm" in partial_output:
 97              print("Generated answer:", partial_output["llm"]["replies"][0])
 98  
 99  
100  asyncio.run(process_results())
101  ```
102  
103  **Parameters:**
104  
105  - **data** (<code>dict\[str, Any\]</code>) – Initial input data to the pipeline.
106  - **concurrency_limit** (<code>int</code>) – The maximum number of components that are allowed to run concurrently.
107  - **include_outputs_from** (<code>set\[str\] | None</code>) – Set of component names whose individual outputs are to be
108    included in the pipeline's output. For components that are
109    invoked multiple times (in a loop), only the last-produced
110    output is included.
111  
112  **Returns:**
113  
114  - <code>AsyncIterator\[dict\[str, Any\]\]</code> – An async iterator containing partial (and final) outputs.
115  
116  **Raises:**
117  
118  - <code>ValueError</code> – If invalid inputs are provided to the pipeline.
119  - <code>PipelineMaxComponentRuns</code> – If a component exceeds the maximum number of allowed executions within the pipeline.
120  - <code>PipelineRuntimeError</code> – If the Pipeline contains cycles with unsupported connections that would cause
121    it to get stuck and fail running.
122    Or if a Component fails or returns output in an unsupported type.
123  
124  #### run_async
125  
126  ```python
127  run_async(
128      data: dict[str, Any],
129      include_outputs_from: set[str] | None = None,
130      concurrency_limit: int = 4,
131  ) -> dict[str, Any]
132  ```
133  
134  Provides an asynchronous interface to run the pipeline with provided input data.
135  
136  This method allows the pipeline to be integrated into an asynchronous workflow, enabling non-blocking
137  execution of pipeline components.
138  
139  Usage:
140  
141  ```python
142  import asyncio
143  
144  from haystack import Document
145  from haystack.components.builders import ChatPromptBuilder
146  from haystack.components.generators.chat import OpenAIChatGenerator
147  from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
148  from haystack.core.pipeline import AsyncPipeline
149  from haystack.dataclasses import ChatMessage
150  from haystack.document_stores.in_memory import InMemoryDocumentStore
151  
152  # Write documents to InMemoryDocumentStore
153  document_store = InMemoryDocumentStore()
154  document_store.write_documents([
155      Document(content="My name is Jean and I live in Paris."),
156      Document(content="My name is Mark and I live in Berlin."),
157      Document(content="My name is Giorgio and I live in Rome.")
158  ])
159  
160  prompt_template = [
161      ChatMessage.from_user(
162          '''
163          Given these documents, answer the question.
164          Documents:
165          {% for doc in documents %}
166              {{ doc.content }}
167          {% endfor %}
168          Question: {{question}}
169          Answer:
170          ''')
171  ]
172  
173  retriever = InMemoryBM25Retriever(document_store=document_store)
174  prompt_builder = ChatPromptBuilder(template=prompt_template)
175  llm = OpenAIChatGenerator()
176  
177  rag_pipeline = AsyncPipeline()
178  rag_pipeline.add_component("retriever", retriever)
179  rag_pipeline.add_component("prompt_builder", prompt_builder)
180  rag_pipeline.add_component("llm", llm)
181  rag_pipeline.connect("retriever", "prompt_builder.documents")
182  rag_pipeline.connect("prompt_builder", "llm")
183  
184  # Ask a question
185  question = "Who lives in Paris?"
186  
187  async def run_inner(data, include_outputs_from):
188      return await rag_pipeline.run_async(data=data, include_outputs_from=include_outputs_from)
189  
190  data = {
191      "retriever": {"query": question},
192      "prompt_builder": {"question": question},
193  }
194  
195  results = asyncio.run(run_inner(data, include_outputs_from={"retriever", "llm"}))
196  
197  print(results["llm"]["replies"])
198  # [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>, _content=[TextContent(text='Jean lives in Paris.')],
199  # _name=None, _meta={'model': 'gpt-5-mini', 'index': 0, 'finish_reason': 'stop', 'usage':
200  # {'completion_tokens': 6, 'prompt_tokens': 69, 'total_tokens': 75,
201  # 'completion_tokens_details': CompletionTokensDetails(accepted_prediction_tokens=0,
202  # audio_tokens=0, reasoning_tokens=0, rejected_prediction_tokens=0), 'prompt_tokens_details':
203  # PromptTokensDetails(audio_tokens=0, cached_tokens=0)}})]
204  ```
205  
206  **Parameters:**
207  
208  - **data** (<code>dict\[str, Any\]</code>) – A dictionary of inputs for the pipeline's components. Each key is a component name
209    and its value is a dictionary of that component's input parameters:
210  
211  ```
212  data = {
213      "comp1": {"input1": 1, "input2": 2},
214  }
215  ```
216  
217  For convenience, this format is also supported when input names are unique:
218  
219  ```
220  data = {
221      "input1": 1, "input2": 2,
222  }
223  ```
224  
225  - **include_outputs_from** (<code>set\[str\] | None</code>) – Set of component names whose individual outputs are to be
226    included in the pipeline's output. For components that are
227    invoked multiple times (in a loop), only the last-produced
228    output is included.
229  - **concurrency_limit** (<code>int</code>) – The maximum number of components that should be allowed to run concurrently.
230  
231  **Returns:**
232  
233  - <code>dict\[str, Any\]</code> – A dictionary where each entry corresponds to a component name
234    and its output. If `include_outputs_from` is `None`, this dictionary
235    will only contain the outputs of leaf components, i.e., components
236    without outgoing connections.
237  
238  **Raises:**
239  
240  - <code>ValueError</code> – If invalid inputs are provided to the pipeline.
241  - <code>PipelineRuntimeError</code> – If the Pipeline contains cycles with unsupported connections that would cause
242    it to get stuck and fail running.
243    Or if a Component fails or returns output in an unsupported type.
244  - <code>PipelineMaxComponentRuns</code> – If a Component reaches the maximum number of times it can be run in this Pipeline.
245  
246  #### run
247  
248  ```python
249  run(
250      data: dict[str, Any],
251      include_outputs_from: set[str] | None = None,
252      concurrency_limit: int = 4,
253  ) -> dict[str, Any]
254  ```
255  
256  Provides a synchronous interface to run the pipeline with given input data.
257  
258  Internally, the pipeline components are executed asynchronously, but the method itself
259  will block until the entire pipeline execution is complete.
260  
261  In case you need asynchronous methods, consider using `run_async` or `run_async_generator`.
262  
263  Usage:
264  
265  ```python
266  from haystack import Document
267  from haystack.components.builders import ChatPromptBuilder
268  from haystack.components.generators.chat import OpenAIChatGenerator
269  from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
270  from haystack.core.pipeline import AsyncPipeline
271  from haystack.dataclasses import ChatMessage
272  from haystack.document_stores.in_memory import InMemoryDocumentStore
273  
274  # Write documents to InMemoryDocumentStore
275  document_store = InMemoryDocumentStore()
276  document_store.write_documents([
277      Document(content="My name is Jean and I live in Paris."),
278      Document(content="My name is Mark and I live in Berlin."),
279      Document(content="My name is Giorgio and I live in Rome.")
280  ])
281  
282  prompt_template = [
283      ChatMessage.from_user(
284          '''
285          Given these documents, answer the question.
286          Documents:
287          {% for doc in documents %}
288              {{ doc.content }}
289          {% endfor %}
290          Question: {{question}}
291          Answer:
292          ''')
293  ]
294  
295  
296  retriever = InMemoryBM25Retriever(document_store=document_store)
297  prompt_builder = ChatPromptBuilder(template=prompt_template)
298  llm = OpenAIChatGenerator()
299  
300  rag_pipeline = AsyncPipeline()
301  rag_pipeline.add_component("retriever", retriever)
302  rag_pipeline.add_component("prompt_builder", prompt_builder)
303  rag_pipeline.add_component("llm", llm)
304  rag_pipeline.connect("retriever", "prompt_builder.documents")
305  rag_pipeline.connect("prompt_builder", "llm")
306  
307  # Ask a question
308  question = "Who lives in Paris?"
309  
310  data = {
311      "retriever": {"query": question},
312      "prompt_builder": {"question": question},
313  }
314  
315  results = rag_pipeline.run(data)
316  
317  print(results["llm"]["replies"])
318  # [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>, _content=[TextContent(text='Jean lives in Paris.')],
319  # _name=None, _meta={'model': 'gpt-5-mini', 'index': 0, 'finish_reason': 'stop', 'usage':
320  # {'completion_tokens': 6, 'prompt_tokens': 69, 'total_tokens': 75, 'completion_tokens_details':
321  # CompletionTokensDetails(accepted_prediction_tokens=0, audio_tokens=0, reasoning_tokens=0,
322  # rejected_prediction_tokens=0), 'prompt_tokens_details': PromptTokensDetails(audio_tokens=0,
323  # cached_tokens=0)}})]
324  ```
325  
326  **Parameters:**
327  
328  - **data** (<code>dict\[str, Any\]</code>) – A dictionary of inputs for the pipeline's components. Each key is a component name
329    and its value is a dictionary of that component's input parameters:
330  
331  ```
332  data = {
333      "comp1": {"input1": 1, "input2": 2},
334  }
335  ```
336  
337  For convenience, this format is also supported when input names are unique:
338  
339  ```
340  data = {
341      "input1": 1, "input2": 2,
342  }
343  ```
344  
345  - **include_outputs_from** (<code>set\[str\] | None</code>) – Set of component names whose individual outputs are to be
346    included in the pipeline's output. For components that are
347    invoked multiple times (in a loop), only the last-produced
348    output is included.
349  - **concurrency_limit** (<code>int</code>) – The maximum number of components that should be allowed to run concurrently.
350  
351  **Returns:**
352  
353  - <code>dict\[str, Any\]</code> – A dictionary where each entry corresponds to a component name
354    and its output. If `include_outputs_from` is `None`, this dictionary
355    will only contain the outputs of leaf components, i.e., components
356    without outgoing connections.
357  
358  **Raises:**
359  
360  - <code>ValueError</code> – If invalid inputs are provided to the pipeline.
361  - <code>PipelineRuntimeError</code> – If the Pipeline contains cycles with unsupported connections that would cause
362    it to get stuck and fail running.
363    Or if a Component fails or returns output in an unsupported type.
364  - <code>PipelineMaxComponentRuns</code> – If a Component reaches the maximum number of times it can be run in this Pipeline.
365  - <code>RuntimeError</code> – If called from within an async context. Use `run_async` instead.
366  
367  ## pipeline
368  
369  ### Pipeline
370  
371  Bases: <code>PipelineBase</code>
372  
373  Synchronous version of the orchestration engine.
374  
375  Orchestrates component execution according to the execution graph, one after the other.
376  
377  #### run
378  
379  ```python
380  run(
381      data: dict[str, Any],
382      include_outputs_from: set[str] | None = None,
383      *,
384      break_point: Breakpoint | AgentBreakpoint | None = None,
385      pipeline_snapshot: PipelineSnapshot | None = None,
386      snapshot_callback: SnapshotCallback | None = None
387  ) -> dict[str, Any]
388  ```
389  
390  Runs the Pipeline with given input data.
391  
392  Usage:
393  
394  ```python
395  from haystack import Pipeline, Document
396  from haystack.components.builders.answer_builder import AnswerBuilder
397  from haystack.components.builders.chat_prompt_builder import ChatPromptBuilder
398  from haystack.components.generators.chat import OpenAIChatGenerator
399  from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
400  from haystack.dataclasses import ChatMessage
401  from haystack.document_stores.in_memory import InMemoryDocumentStore
402  from haystack.utils import Secret
403  
404  # Write documents to InMemoryDocumentStore
405  document_store = InMemoryDocumentStore()
406  document_store.write_documents([
407      Document(content="My name is Jean and I live in Paris."),
408      Document(content="My name is Mark and I live in Berlin."),
409      Document(content="My name is Giorgio and I live in Rome.")
410  ])
411  
412  retriever = InMemoryBM25Retriever(document_store=document_store)
413  
414  prompt_template = """
415  Given these documents, answer the question.
416  Documents:
417  {% for doc in documents %}
418      {{ doc.content }}
419  {% endfor %}
420  Question: {{question}}
421  Answer:
422  """
423  
424  template = [ChatMessage.from_user(prompt_template)]
425  prompt_builder = ChatPromptBuilder(
426      template=template,
427      required_variables=["question", "documents"],
428      variables=["question", "documents"]
429  )
430  
431  llm = OpenAIChatGenerator()
432  rag_pipeline = Pipeline()
433  rag_pipeline.add_component("retriever", retriever)
434  rag_pipeline.add_component("prompt_builder", prompt_builder)
435  rag_pipeline.add_component("llm", llm)
436  rag_pipeline.connect("retriever", "prompt_builder.documents")
437  rag_pipeline.connect("prompt_builder", "llm")
438  
439  question = "Who lives in Paris?"
440  results = rag_pipeline.run(
441      {
442          "retriever": {"query": question},
443          "prompt_builder": {"question": question},
444      }
445  )
446  
447  print(results["llm"]["replies"][0].text)
448  # Jean lives in Paris
449  ```
450  
451  **Parameters:**
452  
453  - **data** (<code>dict\[str, Any\]</code>) – A dictionary of inputs for the pipeline's components. Each key is a component name
454    and its value is a dictionary of that component's input parameters:
455  
456  ```
457  data = {
458      "comp1": {"input1": 1, "input2": 2},
459  }
460  ```
461  
462  For convenience, this format is also supported when input names are unique:
463  
464  ```
465  data = {
466      "input1": 1, "input2": 2,
467  }
468  ```
469  
470  - **include_outputs_from** (<code>set\[str\] | None</code>) – Set of component names whose individual outputs are to be
471    included in the pipeline's output. For components that are
472    invoked multiple times (in a loop), only the last-produced
473    output is included.
474  - **break_point** (<code>Breakpoint | AgentBreakpoint | None</code>) – A set of breakpoints that can be used to debug the pipeline execution.
475  - **pipeline_snapshot** (<code>PipelineSnapshot | None</code>) – A dictionary containing a snapshot of a previously saved pipeline execution.
476  - **snapshot_callback** (<code>SnapshotCallback | None</code>) – Optional callback function that is invoked when a pipeline snapshot is created.
477    The callback receives a `PipelineSnapshot` object and can return an optional string
478    (e.g., a file path or identifier).
479    If provided, the callback is used instead of the default file-saving behavior,
480    allowing custom handling of snapshots (e.g., saving to a database, sending to a remote service).
481    If not provided, the default behavior saves snapshots to a JSON file.
482  
483  **Returns:**
484  
485  - <code>dict\[str, Any\]</code> – A dictionary where each entry corresponds to a component name
486    and its output. If `include_outputs_from` is `None`, this dictionary
487    will only contain the outputs of leaf components, i.e., components
488    without outgoing connections.
489  
490  **Raises:**
491  
492  - <code>ValueError</code> – If invalid inputs are provided to the pipeline.
493  - <code>PipelineRuntimeError</code> – If the Pipeline contains cycles with unsupported connections that would cause
494    it to get stuck and fail running.
495    Or if a Component fails or returns output in an unsupported type.
496  - <code>PipelineMaxComponentRuns</code> – If a Component reaches the maximum number of times it can be run in this Pipeline.
497  - <code>PipelineBreakpointException</code> – When a pipeline_breakpoint is triggered. Contains the component name, state, and partial results.