pipeline_api.md
1 --- 2 title: "Pipeline" 3 id: pipeline-api 4 description: "Arranges components and integrations in flow." 5 slug: "/pipeline-api" 6 --- 7 8 9 ## async_pipeline 10 11 ### AsyncPipeline 12 13 Bases: <code>PipelineBase</code> 14 15 Asynchronous version of the Pipeline orchestration engine. 16 17 Manages components in a pipeline allowing for concurrent processing when the pipeline's execution graph permits. 18 This enables efficient processing of components by minimizing idle time and maximizing resource utilization. 19 20 #### run_async_generator 21 22 ```python 23 run_async_generator( 24 data: dict[str, Any], 25 include_outputs_from: set[str] | None = None, 26 concurrency_limit: int = 4, 27 ) -> AsyncIterator[dict[str, Any]] 28 ``` 29 30 Executes the pipeline step by step asynchronously, yielding partial outputs when any component finishes. 31 32 Usage: 33 34 ```python 35 from haystack import Document 36 from haystack.components.builders import ChatPromptBuilder 37 from haystack.dataclasses import ChatMessage 38 from haystack.utils import Secret 39 from haystack.document_stores.in_memory import InMemoryDocumentStore 40 from haystack.components.retrievers.in_memory import InMemoryBM25Retriever 41 from haystack.components.generators.chat import OpenAIChatGenerator 42 from haystack.components.builders.prompt_builder import PromptBuilder 43 from haystack import AsyncPipeline 44 import asyncio 45 46 # Write documents to InMemoryDocumentStore 47 document_store = InMemoryDocumentStore() 48 document_store.write_documents([ 49 Document(content="My name is Jean and I live in Paris."), 50 Document(content="My name is Mark and I live in Berlin."), 51 Document(content="My name is Giorgio and I live in Rome.") 52 ]) 53 54 prompt_template = [ 55 ChatMessage.from_user( 56 ''' 57 Given these documents, answer the question. 58 Documents: 59 {% for doc in documents %} 60 {{ doc.content }} 61 {% endfor %} 62 Question: {{question}} 63 Answer: 64 ''') 65 ] 66 67 # Create and connect pipeline components 68 retriever = InMemoryBM25Retriever(document_store=document_store) 69 prompt_builder = ChatPromptBuilder(template=prompt_template) 70 llm = OpenAIChatGenerator() 71 72 rag_pipeline = AsyncPipeline() 73 rag_pipeline.add_component("retriever", retriever) 74 rag_pipeline.add_component("prompt_builder", prompt_builder) 75 rag_pipeline.add_component("llm", llm) 76 rag_pipeline.connect("retriever", "prompt_builder.documents") 77 rag_pipeline.connect("prompt_builder", "llm") 78 79 # Prepare input data 80 question = "Who lives in Paris?" 81 data = { 82 "retriever": {"query": question}, 83 "prompt_builder": {"question": question}, 84 } 85 86 87 # Process results as they become available 88 async def process_results(): 89 async for partial_output in rag_pipeline.run_async_generator( 90 data=data, 91 include_outputs_from={"retriever", "llm"} 92 ): 93 # Each partial_output contains the results from a completed component 94 if "retriever" in partial_output: 95 print("Retrieved documents:", len(partial_output["retriever"]["documents"])) 96 if "llm" in partial_output: 97 print("Generated answer:", partial_output["llm"]["replies"][0]) 98 99 100 asyncio.run(process_results()) 101 ``` 102 103 **Parameters:** 104 105 - **data** (<code>dict\[str, Any\]</code>) – Initial input data to the pipeline. 106 - **concurrency_limit** (<code>int</code>) – The maximum number of components that are allowed to run concurrently. 107 - **include_outputs_from** (<code>set\[str\] | None</code>) – Set of component names whose individual outputs are to be 108 included in the pipeline's output. For components that are 109 invoked multiple times (in a loop), only the last-produced 110 output is included. 111 112 **Returns:** 113 114 - <code>AsyncIterator\[dict\[str, Any\]\]</code> – An async iterator containing partial (and final) outputs. 115 116 **Raises:** 117 118 - <code>ValueError</code> – If invalid inputs are provided to the pipeline. 119 - <code>PipelineMaxComponentRuns</code> – If a component exceeds the maximum number of allowed executions within the pipeline. 120 - <code>PipelineRuntimeError</code> – If the Pipeline contains cycles with unsupported connections that would cause 121 it to get stuck and fail running. 122 Or if a Component fails or returns output in an unsupported type. 123 124 #### run_async 125 126 ```python 127 run_async( 128 data: dict[str, Any], 129 include_outputs_from: set[str] | None = None, 130 concurrency_limit: int = 4, 131 ) -> dict[str, Any] 132 ``` 133 134 Provides an asynchronous interface to run the pipeline with provided input data. 135 136 This method allows the pipeline to be integrated into an asynchronous workflow, enabling non-blocking 137 execution of pipeline components. 138 139 Usage: 140 141 ```python 142 import asyncio 143 144 from haystack import Document 145 from haystack.components.builders import ChatPromptBuilder 146 from haystack.components.generators.chat import OpenAIChatGenerator 147 from haystack.components.retrievers.in_memory import InMemoryBM25Retriever 148 from haystack.core.pipeline import AsyncPipeline 149 from haystack.dataclasses import ChatMessage 150 from haystack.document_stores.in_memory import InMemoryDocumentStore 151 152 # Write documents to InMemoryDocumentStore 153 document_store = InMemoryDocumentStore() 154 document_store.write_documents([ 155 Document(content="My name is Jean and I live in Paris."), 156 Document(content="My name is Mark and I live in Berlin."), 157 Document(content="My name is Giorgio and I live in Rome.") 158 ]) 159 160 prompt_template = [ 161 ChatMessage.from_user( 162 ''' 163 Given these documents, answer the question. 164 Documents: 165 {% for doc in documents %} 166 {{ doc.content }} 167 {% endfor %} 168 Question: {{question}} 169 Answer: 170 ''') 171 ] 172 173 retriever = InMemoryBM25Retriever(document_store=document_store) 174 prompt_builder = ChatPromptBuilder(template=prompt_template) 175 llm = OpenAIChatGenerator() 176 177 rag_pipeline = AsyncPipeline() 178 rag_pipeline.add_component("retriever", retriever) 179 rag_pipeline.add_component("prompt_builder", prompt_builder) 180 rag_pipeline.add_component("llm", llm) 181 rag_pipeline.connect("retriever", "prompt_builder.documents") 182 rag_pipeline.connect("prompt_builder", "llm") 183 184 # Ask a question 185 question = "Who lives in Paris?" 186 187 async def run_inner(data, include_outputs_from): 188 return await rag_pipeline.run_async(data=data, include_outputs_from=include_outputs_from) 189 190 data = { 191 "retriever": {"query": question}, 192 "prompt_builder": {"question": question}, 193 } 194 195 results = asyncio.run(run_inner(data, include_outputs_from={"retriever", "llm"})) 196 197 print(results["llm"]["replies"]) 198 # [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>, _content=[TextContent(text='Jean lives in Paris.')], 199 # _name=None, _meta={'model': 'gpt-5-mini', 'index': 0, 'finish_reason': 'stop', 'usage': 200 # {'completion_tokens': 6, 'prompt_tokens': 69, 'total_tokens': 75, 201 # 'completion_tokens_details': CompletionTokensDetails(accepted_prediction_tokens=0, 202 # audio_tokens=0, reasoning_tokens=0, rejected_prediction_tokens=0), 'prompt_tokens_details': 203 # PromptTokensDetails(audio_tokens=0, cached_tokens=0)}})] 204 ``` 205 206 **Parameters:** 207 208 - **data** (<code>dict\[str, Any\]</code>) – A dictionary of inputs for the pipeline's components. Each key is a component name 209 and its value is a dictionary of that component's input parameters: 210 211 ``` 212 data = { 213 "comp1": {"input1": 1, "input2": 2}, 214 } 215 ``` 216 217 For convenience, this format is also supported when input names are unique: 218 219 ``` 220 data = { 221 "input1": 1, "input2": 2, 222 } 223 ``` 224 225 - **include_outputs_from** (<code>set\[str\] | None</code>) – Set of component names whose individual outputs are to be 226 included in the pipeline's output. For components that are 227 invoked multiple times (in a loop), only the last-produced 228 output is included. 229 - **concurrency_limit** (<code>int</code>) – The maximum number of components that should be allowed to run concurrently. 230 231 **Returns:** 232 233 - <code>dict\[str, Any\]</code> – A dictionary where each entry corresponds to a component name 234 and its output. If `include_outputs_from` is `None`, this dictionary 235 will only contain the outputs of leaf components, i.e., components 236 without outgoing connections. 237 238 **Raises:** 239 240 - <code>ValueError</code> – If invalid inputs are provided to the pipeline. 241 - <code>PipelineRuntimeError</code> – If the Pipeline contains cycles with unsupported connections that would cause 242 it to get stuck and fail running. 243 Or if a Component fails or returns output in an unsupported type. 244 - <code>PipelineMaxComponentRuns</code> – If a Component reaches the maximum number of times it can be run in this Pipeline. 245 246 #### run 247 248 ```python 249 run( 250 data: dict[str, Any], 251 include_outputs_from: set[str] | None = None, 252 concurrency_limit: int = 4, 253 ) -> dict[str, Any] 254 ``` 255 256 Provides a synchronous interface to run the pipeline with given input data. 257 258 Internally, the pipeline components are executed asynchronously, but the method itself 259 will block until the entire pipeline execution is complete. 260 261 In case you need asynchronous methods, consider using `run_async` or `run_async_generator`. 262 263 Usage: 264 265 ```python 266 from haystack import Document 267 from haystack.components.builders import ChatPromptBuilder 268 from haystack.components.generators.chat import OpenAIChatGenerator 269 from haystack.components.retrievers.in_memory import InMemoryBM25Retriever 270 from haystack.core.pipeline import AsyncPipeline 271 from haystack.dataclasses import ChatMessage 272 from haystack.document_stores.in_memory import InMemoryDocumentStore 273 274 # Write documents to InMemoryDocumentStore 275 document_store = InMemoryDocumentStore() 276 document_store.write_documents([ 277 Document(content="My name is Jean and I live in Paris."), 278 Document(content="My name is Mark and I live in Berlin."), 279 Document(content="My name is Giorgio and I live in Rome.") 280 ]) 281 282 prompt_template = [ 283 ChatMessage.from_user( 284 ''' 285 Given these documents, answer the question. 286 Documents: 287 {% for doc in documents %} 288 {{ doc.content }} 289 {% endfor %} 290 Question: {{question}} 291 Answer: 292 ''') 293 ] 294 295 296 retriever = InMemoryBM25Retriever(document_store=document_store) 297 prompt_builder = ChatPromptBuilder(template=prompt_template) 298 llm = OpenAIChatGenerator() 299 300 rag_pipeline = AsyncPipeline() 301 rag_pipeline.add_component("retriever", retriever) 302 rag_pipeline.add_component("prompt_builder", prompt_builder) 303 rag_pipeline.add_component("llm", llm) 304 rag_pipeline.connect("retriever", "prompt_builder.documents") 305 rag_pipeline.connect("prompt_builder", "llm") 306 307 # Ask a question 308 question = "Who lives in Paris?" 309 310 data = { 311 "retriever": {"query": question}, 312 "prompt_builder": {"question": question}, 313 } 314 315 results = rag_pipeline.run(data) 316 317 print(results["llm"]["replies"]) 318 # [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>, _content=[TextContent(text='Jean lives in Paris.')], 319 # _name=None, _meta={'model': 'gpt-5-mini', 'index': 0, 'finish_reason': 'stop', 'usage': 320 # {'completion_tokens': 6, 'prompt_tokens': 69, 'total_tokens': 75, 'completion_tokens_details': 321 # CompletionTokensDetails(accepted_prediction_tokens=0, audio_tokens=0, reasoning_tokens=0, 322 # rejected_prediction_tokens=0), 'prompt_tokens_details': PromptTokensDetails(audio_tokens=0, 323 # cached_tokens=0)}})] 324 ``` 325 326 **Parameters:** 327 328 - **data** (<code>dict\[str, Any\]</code>) – A dictionary of inputs for the pipeline's components. Each key is a component name 329 and its value is a dictionary of that component's input parameters: 330 331 ``` 332 data = { 333 "comp1": {"input1": 1, "input2": 2}, 334 } 335 ``` 336 337 For convenience, this format is also supported when input names are unique: 338 339 ``` 340 data = { 341 "input1": 1, "input2": 2, 342 } 343 ``` 344 345 - **include_outputs_from** (<code>set\[str\] | None</code>) – Set of component names whose individual outputs are to be 346 included in the pipeline's output. For components that are 347 invoked multiple times (in a loop), only the last-produced 348 output is included. 349 - **concurrency_limit** (<code>int</code>) – The maximum number of components that should be allowed to run concurrently. 350 351 **Returns:** 352 353 - <code>dict\[str, Any\]</code> – A dictionary where each entry corresponds to a component name 354 and its output. If `include_outputs_from` is `None`, this dictionary 355 will only contain the outputs of leaf components, i.e., components 356 without outgoing connections. 357 358 **Raises:** 359 360 - <code>ValueError</code> – If invalid inputs are provided to the pipeline. 361 - <code>PipelineRuntimeError</code> – If the Pipeline contains cycles with unsupported connections that would cause 362 it to get stuck and fail running. 363 Or if a Component fails or returns output in an unsupported type. 364 - <code>PipelineMaxComponentRuns</code> – If a Component reaches the maximum number of times it can be run in this Pipeline. 365 - <code>RuntimeError</code> – If called from within an async context. Use `run_async` instead. 366 367 ## pipeline 368 369 ### Pipeline 370 371 Bases: <code>PipelineBase</code> 372 373 Synchronous version of the orchestration engine. 374 375 Orchestrates component execution according to the execution graph, one after the other. 376 377 #### run 378 379 ```python 380 run( 381 data: dict[str, Any], 382 include_outputs_from: set[str] | None = None, 383 *, 384 break_point: Breakpoint | AgentBreakpoint | None = None, 385 pipeline_snapshot: PipelineSnapshot | None = None, 386 snapshot_callback: SnapshotCallback | None = None 387 ) -> dict[str, Any] 388 ``` 389 390 Runs the Pipeline with given input data. 391 392 Usage: 393 394 ```python 395 from haystack import Pipeline, Document 396 from haystack.utils import Secret 397 from haystack.document_stores.in_memory import InMemoryDocumentStore 398 from haystack.components.retrievers.in_memory import InMemoryBM25Retriever 399 from haystack.components.generators import OpenAIGenerator 400 from haystack.components.builders.answer_builder import AnswerBuilder 401 from haystack.components.builders.prompt_builder import PromptBuilder 402 403 # Write documents to InMemoryDocumentStore 404 document_store = InMemoryDocumentStore() 405 document_store.write_documents([ 406 Document(content="My name is Jean and I live in Paris."), 407 Document(content="My name is Mark and I live in Berlin."), 408 Document(content="My name is Giorgio and I live in Rome.") 409 ]) 410 411 prompt_template = """ 412 Given these documents, answer the question. 413 Documents: 414 {% for doc in documents %} 415 {{ doc.content }} 416 {% endfor %} 417 Question: {{question}} 418 Answer: 419 """ 420 421 retriever = InMemoryBM25Retriever(document_store=document_store) 422 prompt_builder = PromptBuilder(template=prompt_template) 423 llm = OpenAIGenerator(api_key=Secret.from_token(api_key)) 424 425 rag_pipeline = Pipeline() 426 rag_pipeline.add_component("retriever", retriever) 427 rag_pipeline.add_component("prompt_builder", prompt_builder) 428 rag_pipeline.add_component("llm", llm) 429 rag_pipeline.connect("retriever", "prompt_builder.documents") 430 rag_pipeline.connect("prompt_builder", "llm") 431 432 # Ask a question 433 question = "Who lives in Paris?" 434 results = rag_pipeline.run( 435 { 436 "retriever": {"query": question}, 437 "prompt_builder": {"question": question}, 438 } 439 ) 440 441 print(results["llm"]["replies"]) 442 # Jean lives in Paris 443 ``` 444 445 **Parameters:** 446 447 - **data** (<code>dict\[str, Any\]</code>) – A dictionary of inputs for the pipeline's components. Each key is a component name 448 and its value is a dictionary of that component's input parameters: 449 450 ``` 451 data = { 452 "comp1": {"input1": 1, "input2": 2}, 453 } 454 ``` 455 456 For convenience, this format is also supported when input names are unique: 457 458 ``` 459 data = { 460 "input1": 1, "input2": 2, 461 } 462 ``` 463 464 - **include_outputs_from** (<code>set\[str\] | None</code>) – Set of component names whose individual outputs are to be 465 included in the pipeline's output. For components that are 466 invoked multiple times (in a loop), only the last-produced 467 output is included. 468 - **break_point** (<code>Breakpoint | AgentBreakpoint | None</code>) – A set of breakpoints that can be used to debug the pipeline execution. 469 - **pipeline_snapshot** (<code>PipelineSnapshot | None</code>) – A dictionary containing a snapshot of a previously saved pipeline execution. 470 - **snapshot_callback** (<code>SnapshotCallback | None</code>) – Optional callback function that is invoked when a pipeline snapshot is created. 471 The callback receives a `PipelineSnapshot` object and can return an optional string 472 (e.g., a file path or identifier). 473 If provided, the callback is used instead of the default file-saving behavior, 474 allowing custom handling of snapshots (e.g., saving to a database, sending to a remote service). 475 If not provided, the default behavior saves snapshots to a JSON file. 476 477 **Returns:** 478 479 - <code>dict\[str, Any\]</code> – A dictionary where each entry corresponds to a component name 480 and its output. If `include_outputs_from` is `None`, this dictionary 481 will only contain the outputs of leaf components, i.e., components 482 without outgoing connections. 483 484 **Raises:** 485 486 - <code>ValueError</code> – If invalid inputs are provided to the pipeline. 487 - <code>PipelineRuntimeError</code> – If the Pipeline contains cycles with unsupported connections that would cause 488 it to get stuck and fail running. 489 Or if a Component fails or returns output in an unsupported type. 490 - <code>PipelineMaxComponentRuns</code> – If a Component reaches the maximum number of times it can be run in this Pipeline. 491 - <code>PipelineBreakpointException</code> – When a pipeline_breakpoint is triggered. Contains the component name, state, and partial results.