pipeline_api.md
  1  ---
  2  title: "Pipeline"
  3  id: pipeline-api
  4  description: "Arranges components and integrations in flow."
  5  slug: "/pipeline-api"
  6  ---
  7  
  8  
  9  ## async_pipeline
 10  
 11  ### AsyncPipeline
 12  
 13  Bases: <code>PipelineBase</code>
 14  
 15  Asynchronous version of the Pipeline orchestration engine.
 16  
 17  Manages components in a pipeline allowing for concurrent processing when the pipeline's execution graph permits.
 18  This enables efficient processing of components by minimizing idle time and maximizing resource utilization.
 19  
 20  #### run_async_generator
 21  
 22  ```python
 23  run_async_generator(
 24      data: dict[str, Any],
 25      include_outputs_from: set[str] | None = None,
 26      concurrency_limit: int = 4,
 27  ) -> AsyncIterator[dict[str, Any]]
 28  ```
 29  
 30  Executes the pipeline step by step asynchronously, yielding partial outputs when any component finishes.
 31  
 32  Usage:
 33  
 34  ```python
 35  from haystack import Document
 36  from haystack.components.builders import ChatPromptBuilder
 37  from haystack.dataclasses import ChatMessage
 38  from haystack.utils import Secret
 39  from haystack.document_stores.in_memory import InMemoryDocumentStore
 40  from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
 41  from haystack.components.generators.chat import OpenAIChatGenerator
 42  from haystack.components.builders.prompt_builder import PromptBuilder
 43  from haystack import AsyncPipeline
 44  import asyncio
 45  
 46  # Write documents to InMemoryDocumentStore
 47  document_store = InMemoryDocumentStore()
 48  document_store.write_documents([
 49      Document(content="My name is Jean and I live in Paris."),
 50      Document(content="My name is Mark and I live in Berlin."),
 51      Document(content="My name is Giorgio and I live in Rome.")
 52  ])
 53  
 54  prompt_template = [
 55      ChatMessage.from_user(
 56          '''
 57          Given these documents, answer the question.
 58          Documents:
 59          {% for doc in documents %}
 60              {{ doc.content }}
 61          {% endfor %}
 62          Question: {{question}}
 63          Answer:
 64          ''')
 65  ]
 66  
 67  # Create and connect pipeline components
 68  retriever = InMemoryBM25Retriever(document_store=document_store)
 69  prompt_builder = ChatPromptBuilder(template=prompt_template)
 70  llm = OpenAIChatGenerator()
 71  
 72  rag_pipeline = AsyncPipeline()
 73  rag_pipeline.add_component("retriever", retriever)
 74  rag_pipeline.add_component("prompt_builder", prompt_builder)
 75  rag_pipeline.add_component("llm", llm)
 76  rag_pipeline.connect("retriever", "prompt_builder.documents")
 77  rag_pipeline.connect("prompt_builder", "llm")
 78  
 79  # Prepare input data
 80  question = "Who lives in Paris?"
 81  data = {
 82      "retriever": {"query": question},
 83      "prompt_builder": {"question": question},
 84  }
 85  
 86  
 87  # Process results as they become available
 88  async def process_results():
 89      async for partial_output in rag_pipeline.run_async_generator(
 90              data=data,
 91              include_outputs_from={"retriever", "llm"}
 92      ):
 93          # Each partial_output contains the results from a completed component
 94          if "retriever" in partial_output:
 95              print("Retrieved documents:", len(partial_output["retriever"]["documents"]))
 96          if "llm" in partial_output:
 97              print("Generated answer:", partial_output["llm"]["replies"][0])
 98  
 99  
100  asyncio.run(process_results())
101  ```
102  
103  **Parameters:**
104  
105  - **data** (<code>dict\[str, Any\]</code>) – Initial input data to the pipeline.
106  - **concurrency_limit** (<code>int</code>) – The maximum number of components that are allowed to run concurrently.
107  - **include_outputs_from** (<code>set\[str\] | None</code>) – Set of component names whose individual outputs are to be
108    included in the pipeline's output. For components that are
109    invoked multiple times (in a loop), only the last-produced
110    output is included.
111  
112  **Returns:**
113  
114  - <code>AsyncIterator\[dict\[str, Any\]\]</code> – An async iterator containing partial (and final) outputs.
115  
116  **Raises:**
117  
118  - <code>ValueError</code> – If invalid inputs are provided to the pipeline.
119  - <code>PipelineMaxComponentRuns</code> – If a component exceeds the maximum number of allowed executions within the pipeline.
120  - <code>PipelineRuntimeError</code> – If the Pipeline contains cycles with unsupported connections that would cause
121    it to get stuck and fail running.
122    Or if a Component fails or returns output in an unsupported type.
123  
124  #### run_async
125  
126  ```python
127  run_async(
128      data: dict[str, Any],
129      include_outputs_from: set[str] | None = None,
130      concurrency_limit: int = 4,
131  ) -> dict[str, Any]
132  ```
133  
134  Provides an asynchronous interface to run the pipeline with provided input data.
135  
136  This method allows the pipeline to be integrated into an asynchronous workflow, enabling non-blocking
137  execution of pipeline components.
138  
139  Usage:
140  
141  ```python
142  import asyncio
143  
144  from haystack import Document
145  from haystack.components.builders import ChatPromptBuilder
146  from haystack.components.generators.chat import OpenAIChatGenerator
147  from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
148  from haystack.core.pipeline import AsyncPipeline
149  from haystack.dataclasses import ChatMessage
150  from haystack.document_stores.in_memory import InMemoryDocumentStore
151  
152  # Write documents to InMemoryDocumentStore
153  document_store = InMemoryDocumentStore()
154  document_store.write_documents([
155      Document(content="My name is Jean and I live in Paris."),
156      Document(content="My name is Mark and I live in Berlin."),
157      Document(content="My name is Giorgio and I live in Rome.")
158  ])
159  
160  prompt_template = [
161      ChatMessage.from_user(
162          '''
163          Given these documents, answer the question.
164          Documents:
165          {% for doc in documents %}
166              {{ doc.content }}
167          {% endfor %}
168          Question: {{question}}
169          Answer:
170          ''')
171  ]
172  
173  retriever = InMemoryBM25Retriever(document_store=document_store)
174  prompt_builder = ChatPromptBuilder(template=prompt_template)
175  llm = OpenAIChatGenerator()
176  
177  rag_pipeline = AsyncPipeline()
178  rag_pipeline.add_component("retriever", retriever)
179  rag_pipeline.add_component("prompt_builder", prompt_builder)
180  rag_pipeline.add_component("llm", llm)
181  rag_pipeline.connect("retriever", "prompt_builder.documents")
182  rag_pipeline.connect("prompt_builder", "llm")
183  
184  # Ask a question
185  question = "Who lives in Paris?"
186  
187  async def run_inner(data, include_outputs_from):
188      return await rag_pipeline.run_async(data=data, include_outputs_from=include_outputs_from)
189  
190  data = {
191      "retriever": {"query": question},
192      "prompt_builder": {"question": question},
193  }
194  
195  results = asyncio.run(run_inner(data, include_outputs_from={"retriever", "llm"}))
196  
197  print(results["llm"]["replies"])
198  # [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>, _content=[TextContent(text='Jean lives in Paris.')],
199  # _name=None, _meta={'model': 'gpt-5-mini', 'index': 0, 'finish_reason': 'stop', 'usage':
200  # {'completion_tokens': 6, 'prompt_tokens': 69, 'total_tokens': 75,
201  # 'completion_tokens_details': CompletionTokensDetails(accepted_prediction_tokens=0,
202  # audio_tokens=0, reasoning_tokens=0, rejected_prediction_tokens=0), 'prompt_tokens_details':
203  # PromptTokensDetails(audio_tokens=0, cached_tokens=0)}})]
204  ```
205  
206  **Parameters:**
207  
208  - **data** (<code>dict\[str, Any\]</code>) – A dictionary of inputs for the pipeline's components. Each key is a component name
209    and its value is a dictionary of that component's input parameters:
210  
211  ```
212  data = {
213      "comp1": {"input1": 1, "input2": 2},
214  }
215  ```
216  
217  For convenience, this format is also supported when input names are unique:
218  
219  ```
220  data = {
221      "input1": 1, "input2": 2,
222  }
223  ```
224  
225  - **include_outputs_from** (<code>set\[str\] | None</code>) – Set of component names whose individual outputs are to be
226    included in the pipeline's output. For components that are
227    invoked multiple times (in a loop), only the last-produced
228    output is included.
229  - **concurrency_limit** (<code>int</code>) – The maximum number of components that should be allowed to run concurrently.
230  
231  **Returns:**
232  
233  - <code>dict\[str, Any\]</code> – A dictionary where each entry corresponds to a component name
234    and its output. If `include_outputs_from` is `None`, this dictionary
235    will only contain the outputs of leaf components, i.e., components
236    without outgoing connections.
237  
238  **Raises:**
239  
240  - <code>ValueError</code> – If invalid inputs are provided to the pipeline.
241  - <code>PipelineRuntimeError</code> – If the Pipeline contains cycles with unsupported connections that would cause
242    it to get stuck and fail running.
243    Or if a Component fails or returns output in an unsupported type.
244  - <code>PipelineMaxComponentRuns</code> – If a Component reaches the maximum number of times it can be run in this Pipeline.
245  
246  #### run
247  
248  ```python
249  run(
250      data: dict[str, Any],
251      include_outputs_from: set[str] | None = None,
252      concurrency_limit: int = 4,
253  ) -> dict[str, Any]
254  ```
255  
256  Provides a synchronous interface to run the pipeline with given input data.
257  
258  Internally, the pipeline components are executed asynchronously, but the method itself
259  will block until the entire pipeline execution is complete.
260  
261  In case you need asynchronous methods, consider using `run_async` or `run_async_generator`.
262  
263  Usage:
264  
265  ```python
266  from haystack import Document
267  from haystack.components.builders import ChatPromptBuilder
268  from haystack.components.generators.chat import OpenAIChatGenerator
269  from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
270  from haystack.core.pipeline import AsyncPipeline
271  from haystack.dataclasses import ChatMessage
272  from haystack.document_stores.in_memory import InMemoryDocumentStore
273  
274  # Write documents to InMemoryDocumentStore
275  document_store = InMemoryDocumentStore()
276  document_store.write_documents([
277      Document(content="My name is Jean and I live in Paris."),
278      Document(content="My name is Mark and I live in Berlin."),
279      Document(content="My name is Giorgio and I live in Rome.")
280  ])
281  
282  prompt_template = [
283      ChatMessage.from_user(
284          '''
285          Given these documents, answer the question.
286          Documents:
287          {% for doc in documents %}
288              {{ doc.content }}
289          {% endfor %}
290          Question: {{question}}
291          Answer:
292          ''')
293  ]
294  
295  
296  retriever = InMemoryBM25Retriever(document_store=document_store)
297  prompt_builder = ChatPromptBuilder(template=prompt_template)
298  llm = OpenAIChatGenerator()
299  
300  rag_pipeline = AsyncPipeline()
301  rag_pipeline.add_component("retriever", retriever)
302  rag_pipeline.add_component("prompt_builder", prompt_builder)
303  rag_pipeline.add_component("llm", llm)
304  rag_pipeline.connect("retriever", "prompt_builder.documents")
305  rag_pipeline.connect("prompt_builder", "llm")
306  
307  # Ask a question
308  question = "Who lives in Paris?"
309  
310  data = {
311      "retriever": {"query": question},
312      "prompt_builder": {"question": question},
313  }
314  
315  results = rag_pipeline.run(data)
316  
317  print(results["llm"]["replies"])
318  # [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>, _content=[TextContent(text='Jean lives in Paris.')],
319  # _name=None, _meta={'model': 'gpt-5-mini', 'index': 0, 'finish_reason': 'stop', 'usage':
320  # {'completion_tokens': 6, 'prompt_tokens': 69, 'total_tokens': 75, 'completion_tokens_details':
321  # CompletionTokensDetails(accepted_prediction_tokens=0, audio_tokens=0, reasoning_tokens=0,
322  # rejected_prediction_tokens=0), 'prompt_tokens_details': PromptTokensDetails(audio_tokens=0,
323  # cached_tokens=0)}})]
324  ```
325  
326  **Parameters:**
327  
328  - **data** (<code>dict\[str, Any\]</code>) – A dictionary of inputs for the pipeline's components. Each key is a component name
329    and its value is a dictionary of that component's input parameters:
330  
331  ```
332  data = {
333      "comp1": {"input1": 1, "input2": 2},
334  }
335  ```
336  
337  For convenience, this format is also supported when input names are unique:
338  
339  ```
340  data = {
341      "input1": 1, "input2": 2,
342  }
343  ```
344  
345  - **include_outputs_from** (<code>set\[str\] | None</code>) – Set of component names whose individual outputs are to be
346    included in the pipeline's output. For components that are
347    invoked multiple times (in a loop), only the last-produced
348    output is included.
349  - **concurrency_limit** (<code>int</code>) – The maximum number of components that should be allowed to run concurrently.
350  
351  **Returns:**
352  
353  - <code>dict\[str, Any\]</code> – A dictionary where each entry corresponds to a component name
354    and its output. If `include_outputs_from` is `None`, this dictionary
355    will only contain the outputs of leaf components, i.e., components
356    without outgoing connections.
357  
358  **Raises:**
359  
360  - <code>ValueError</code> – If invalid inputs are provided to the pipeline.
361  - <code>PipelineRuntimeError</code> – If the Pipeline contains cycles with unsupported connections that would cause
362    it to get stuck and fail running.
363    Or if a Component fails or returns output in an unsupported type.
364  - <code>PipelineMaxComponentRuns</code> – If a Component reaches the maximum number of times it can be run in this Pipeline.
365  - <code>RuntimeError</code> – If called from within an async context. Use `run_async` instead.
366  
367  ## pipeline
368  
369  ### Pipeline
370  
371  Bases: <code>PipelineBase</code>
372  
373  Synchronous version of the orchestration engine.
374  
375  Orchestrates component execution according to the execution graph, one after the other.
376  
377  #### run
378  
379  ```python
380  run(
381      data: dict[str, Any],
382      include_outputs_from: set[str] | None = None,
383      *,
384      break_point: Breakpoint | AgentBreakpoint | None = None,
385      pipeline_snapshot: PipelineSnapshot | None = None,
386      snapshot_callback: SnapshotCallback | None = None
387  ) -> dict[str, Any]
388  ```
389  
390  Runs the Pipeline with given input data.
391  
392  Usage:
393  
394  ```python
395  from haystack import Pipeline, Document
396  from haystack.utils import Secret
397  from haystack.document_stores.in_memory import InMemoryDocumentStore
398  from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
399  from haystack.components.generators import OpenAIGenerator
400  from haystack.components.builders.answer_builder import AnswerBuilder
401  from haystack.components.builders.prompt_builder import PromptBuilder
402  
403  # Write documents to InMemoryDocumentStore
404  document_store = InMemoryDocumentStore()
405  document_store.write_documents([
406      Document(content="My name is Jean and I live in Paris."),
407      Document(content="My name is Mark and I live in Berlin."),
408      Document(content="My name is Giorgio and I live in Rome.")
409  ])
410  
411  prompt_template = """
412  Given these documents, answer the question.
413  Documents:
414  {% for doc in documents %}
415      {{ doc.content }}
416  {% endfor %}
417  Question: {{question}}
418  Answer:
419  """
420  
421  retriever = InMemoryBM25Retriever(document_store=document_store)
422  prompt_builder = PromptBuilder(template=prompt_template)
423  llm = OpenAIGenerator(api_key=Secret.from_token(api_key))
424  
425  rag_pipeline = Pipeline()
426  rag_pipeline.add_component("retriever", retriever)
427  rag_pipeline.add_component("prompt_builder", prompt_builder)
428  rag_pipeline.add_component("llm", llm)
429  rag_pipeline.connect("retriever", "prompt_builder.documents")
430  rag_pipeline.connect("prompt_builder", "llm")
431  
432  # Ask a question
433  question = "Who lives in Paris?"
434  results = rag_pipeline.run(
435      {
436          "retriever": {"query": question},
437          "prompt_builder": {"question": question},
438      }
439  )
440  
441  print(results["llm"]["replies"])
442  # Jean lives in Paris
443  ```
444  
445  **Parameters:**
446  
447  - **data** (<code>dict\[str, Any\]</code>) – A dictionary of inputs for the pipeline's components. Each key is a component name
448    and its value is a dictionary of that component's input parameters:
449  
450  ```
451  data = {
452      "comp1": {"input1": 1, "input2": 2},
453  }
454  ```
455  
456  For convenience, this format is also supported when input names are unique:
457  
458  ```
459  data = {
460      "input1": 1, "input2": 2,
461  }
462  ```
463  
464  - **include_outputs_from** (<code>set\[str\] | None</code>) – Set of component names whose individual outputs are to be
465    included in the pipeline's output. For components that are
466    invoked multiple times (in a loop), only the last-produced
467    output is included.
468  - **break_point** (<code>Breakpoint | AgentBreakpoint | None</code>) – A set of breakpoints that can be used to debug the pipeline execution.
469  - **pipeline_snapshot** (<code>PipelineSnapshot | None</code>) – A dictionary containing a snapshot of a previously saved pipeline execution.
470  - **snapshot_callback** (<code>SnapshotCallback | None</code>) – Optional callback function that is invoked when a pipeline snapshot is created.
471    The callback receives a `PipelineSnapshot` object and can return an optional string
472    (e.g., a file path or identifier).
473    If provided, the callback is used instead of the default file-saving behavior,
474    allowing custom handling of snapshots (e.g., saving to a database, sending to a remote service).
475    If not provided, the default behavior saves snapshots to a JSON file.
476  
477  **Returns:**
478  
479  - <code>dict\[str, Any\]</code> – A dictionary where each entry corresponds to a component name
480    and its output. If `include_outputs_from` is `None`, this dictionary
481    will only contain the outputs of leaf components, i.e., components
482    without outgoing connections.
483  
484  **Raises:**
485  
486  - <code>ValueError</code> – If invalid inputs are provided to the pipeline.
487  - <code>PipelineRuntimeError</code> – If the Pipeline contains cycles with unsupported connections that would cause
488    it to get stuck and fail running.
489    Or if a Component fails or returns output in an unsupported type.
490  - <code>PipelineMaxComponentRuns</code> – If a Component reaches the maximum number of times it can be run in this Pipeline.
491  - <code>PipelineBreakpointException</code> – When a pipeline_breakpoint is triggered. Contains the component name, state, and partial results.