async_pipeline.py
1 # SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai> 2 # 3 # SPDX-License-Identifier: Apache-2.0 4 5 import asyncio 6 import contextvars 7 from collections.abc import AsyncIterator, Mapping 8 from typing import Any 9 10 from haystack import logging, tracing 11 from haystack.core.component import Component 12 from haystack.core.errors import BreakpointException, PipelineRuntimeError 13 from haystack.core.pipeline.base import ( 14 _COMPONENT_INPUT, 15 _COMPONENT_OUTPUT, 16 _COMPONENT_VISITS, 17 ComponentPriority, 18 PipelineBase, 19 _validate_component_output_keys, 20 ) 21 from haystack.core.pipeline.utils import _deepcopy_with_exceptions 22 from haystack.dataclasses.breakpoints import Breakpoint 23 from haystack.telemetry import pipeline_running 24 25 logger = logging.getLogger(__name__) 26 27 28 class AsyncPipeline(PipelineBase): 29 """ 30 Asynchronous version of the Pipeline orchestration engine. 31 32 Manages components in a pipeline allowing for concurrent processing when the pipeline's execution graph permits. 33 This enables efficient processing of components by minimizing idle time and maximizing resource utilization. 34 """ 35 36 @staticmethod 37 async def _run_component_async( 38 component_name: str, 39 component: dict[str, Any], 40 component_inputs: dict[str, Any], 41 component_visits: dict[str, int], 42 parent_span: tracing.Span | None = None, 43 *, 44 break_point: Breakpoint | None = None, 45 ) -> Mapping[str, Any]: 46 """ 47 Executes a single component asynchronously. 48 49 If the component supports async execution, it is awaited directly as it will run async; 50 otherwise the component is offloaded to executor. 51 52 The method also updates the `visits` count of the component, writes outputs to `inputs_state`, 53 and returns pruned outputs that get stored in `pipeline_outputs`. 54 55 :param component_name: The name of the component. 56 :param component_inputs: Inputs for the component. 57 :returns: Outputs from the component that can be yielded from run_async_generator. 58 """ 59 if ( 60 isinstance(break_point, Breakpoint) 61 and break_point.component_name == component_name 62 and break_point.visit_count == component_visits[component_name] 63 ): 64 raise BreakpointException.from_triggered_breakpoint(break_point=break_point) 65 66 instance: Component = component["instance"] 67 68 with PipelineBase._create_component_span( 69 component_name=component_name, instance=instance, inputs=component_inputs, parent_span=parent_span 70 ) as span: 71 # deepcopy inputs before passing to the tracer so that even if a tracer mutates them 72 # the component always receives the original unmodified values 73 component_inputs_copy = _deepcopy_with_exceptions(component_inputs) 74 span.set_content_tag(_COMPONENT_INPUT, component_inputs) 75 logger.info("Running component {component_name}", component_name=component_name) 76 77 if getattr(instance, "__haystack_supports_async__", False): 78 try: 79 outputs = await instance.run_async(**component_inputs_copy) # type: ignore 80 except Exception as error: 81 raise PipelineRuntimeError.from_exception(component_name, instance.__class__, error) from error 82 else: 83 loop = asyncio.get_running_loop() 84 # Important: contextvars (e.g. active tracing Span) don’t propagate to running loop's ThreadPoolExecutor 85 # We use ctx.run(...) to preserve context like the active tracing span 86 ctx = contextvars.copy_context() 87 try: 88 outputs = await loop.run_in_executor( 89 None, lambda: ctx.run(lambda: instance.run(**component_inputs_copy)) 90 ) 91 except Exception as error: 92 raise PipelineRuntimeError.from_exception(component_name, instance.__class__, error) from error 93 94 component_visits[component_name] += 1 95 96 if not isinstance(outputs, Mapping): 97 raise PipelineRuntimeError.from_invalid_output(component_name, instance.__class__, outputs) 98 99 _validate_component_output_keys(component_name, component, outputs) 100 101 span.set_tag(_COMPONENT_VISITS, component_visits[component_name]) 102 span.set_content_tag(_COMPONENT_OUTPUT, outputs) 103 104 return outputs 105 106 async def run_async_generator( # noqa: PLR0915,C901 107 self, data: dict[str, Any], include_outputs_from: set[str] | None = None, concurrency_limit: int = 4 108 ) -> AsyncIterator[dict[str, Any]]: 109 """ 110 Executes the pipeline step by step asynchronously, yielding partial outputs when any component finishes. 111 112 Usage: 113 ```python 114 from haystack import Document 115 from haystack.components.builders import ChatPromptBuilder 116 from haystack.dataclasses import ChatMessage 117 from haystack.utils import Secret 118 from haystack.document_stores.in_memory import InMemoryDocumentStore 119 from haystack.components.retrievers.in_memory import InMemoryBM25Retriever 120 from haystack.components.generators.chat import OpenAIChatGenerator 121 from haystack.components.builders.prompt_builder import PromptBuilder 122 from haystack import AsyncPipeline 123 import asyncio 124 125 # Write documents to InMemoryDocumentStore 126 document_store = InMemoryDocumentStore() 127 document_store.write_documents([ 128 Document(content="My name is Jean and I live in Paris."), 129 Document(content="My name is Mark and I live in Berlin."), 130 Document(content="My name is Giorgio and I live in Rome.") 131 ]) 132 133 prompt_template = [ 134 ChatMessage.from_user( 135 ''' 136 Given these documents, answer the question. 137 Documents: 138 {% for doc in documents %} 139 {{ doc.content }} 140 {% endfor %} 141 Question: {{question}} 142 Answer: 143 ''') 144 ] 145 146 # Create and connect pipeline components 147 retriever = InMemoryBM25Retriever(document_store=document_store) 148 prompt_builder = ChatPromptBuilder(template=prompt_template) 149 llm = OpenAIChatGenerator() 150 151 rag_pipeline = AsyncPipeline() 152 rag_pipeline.add_component("retriever", retriever) 153 rag_pipeline.add_component("prompt_builder", prompt_builder) 154 rag_pipeline.add_component("llm", llm) 155 rag_pipeline.connect("retriever", "prompt_builder.documents") 156 rag_pipeline.connect("prompt_builder", "llm") 157 158 # Prepare input data 159 question = "Who lives in Paris?" 160 data = { 161 "retriever": {"query": question}, 162 "prompt_builder": {"question": question}, 163 } 164 165 166 # Process results as they become available 167 async def process_results(): 168 async for partial_output in rag_pipeline.run_async_generator( 169 data=data, 170 include_outputs_from={"retriever", "llm"} 171 ): 172 # Each partial_output contains the results from a completed component 173 if "retriever" in partial_output: 174 print("Retrieved documents:", len(partial_output["retriever"]["documents"])) 175 if "llm" in partial_output: 176 print("Generated answer:", partial_output["llm"]["replies"][0]) 177 178 179 asyncio.run(process_results()) 180 ``` 181 182 :param data: Initial input data to the pipeline. 183 :param concurrency_limit: The maximum number of components that are allowed to run concurrently. 184 :param include_outputs_from: 185 Set of component names whose individual outputs are to be 186 included in the pipeline's output. For components that are 187 invoked multiple times (in a loop), only the last-produced 188 output is included. 189 :return: An async iterator containing partial (and final) outputs. 190 191 :raises ValueError: 192 If invalid inputs are provided to the pipeline. 193 :raises PipelineMaxComponentRuns: 194 If a component exceeds the maximum number of allowed executions within the pipeline. 195 :raises PipelineRuntimeError: 196 If the Pipeline contains cycles with unsupported connections that would cause 197 it to get stuck and fail running. 198 Or if a Component fails or returns output in an unsupported type. 199 """ 200 if include_outputs_from is None: 201 include_outputs_from = set() 202 203 # 0) Basic pipeline init 204 pipeline_running(self) # telemetry 205 self.warm_up() # optional warm-up (if needed) 206 207 # 1) Prepare ephemeral state 208 ready_sem = asyncio.Semaphore(max(1, concurrency_limit)) 209 inputs_state: dict[str, dict[str, list[dict[str, Any]]]] = {} 210 pipeline_outputs: dict[str, Any] = {} 211 running_tasks: dict[asyncio.Task, str] = {} 212 213 # A set of component names that have been scheduled but not finished: 214 scheduled_components: set[str] = set() 215 216 # 2) Convert input data 217 prepared_data = self._prepare_component_input_data(data) 218 219 # raises ValueError if input is malformed in some way 220 self.validate_input(prepared_data) 221 inputs_state = self._convert_to_internal_format(prepared_data) 222 223 # For quick lookup of downstream receivers 224 ordered_names = sorted(self.graph.nodes.keys()) 225 cached_receivers = {n: self._find_receivers_from(n) for n in ordered_names} 226 component_visits = dict.fromkeys(ordered_names, 0) 227 cached_topological_sort = None 228 229 # We fill the queue once and raise if all components are BLOCKED 230 self.validate_pipeline(self._fill_queue(ordered_names, inputs_state, component_visits)) 231 232 # Single parent span for entire pipeline execution 233 with tracing.tracer.trace( 234 "haystack.async_pipeline.run", 235 tags={ 236 "haystack.pipeline.input_data": prepared_data, 237 "haystack.pipeline.output_data": pipeline_outputs, 238 "haystack.pipeline.metadata": self.metadata, 239 "haystack.pipeline.max_runs_per_component": self._max_runs_per_component, 240 }, 241 ) as parent_span: 242 # ------------------------------------------------- 243 # We define some functions here so that they have access to local runtime state 244 # (inputs, tasks, scheduled components) via closures. 245 # ------------------------------------------------- 246 async def _run_highest_in_isolation(component_name: str) -> AsyncIterator[dict[str, Any]]: 247 """ 248 Runs a component with HIGHEST priority in isolation. 249 250 We need to run components with HIGHEST priority (i.e. components with GreedyVariadic input socket) 251 by themselves, without any other components running concurrently. Otherwise, downstream components 252 could produce additional inputs for the GreedyVariadic socket. 253 254 :param component_name: The name of the component. 255 :return: An async iterator of partial outputs. 256 """ 257 # 1) Wait for all in-flight tasks to finish 258 while running_tasks: 259 done, _pending = await asyncio.wait(running_tasks.keys(), return_when=asyncio.ALL_COMPLETED) 260 for finished in done: 261 finished_component_name = running_tasks.pop(finished) 262 partial_result = finished.result() 263 scheduled_components.discard(finished_component_name) 264 if partial_result: 265 yield_dict = {finished_component_name: _deepcopy_with_exceptions(partial_result)} 266 yield yield_dict # partial outputs 267 268 if component_name in scheduled_components: 269 # If it's already scheduled for some reason, skip 270 return 271 272 # 2) Run the HIGHEST component by itself 273 scheduled_components.add(component_name) 274 comp_dict = self._get_component_with_graph_metadata_and_visits( 275 component_name, component_visits[component_name] 276 ) 277 component_inputs = self._consume_component_inputs(component_name, comp_dict, inputs_state) 278 component_inputs = self._add_missing_input_defaults(component_inputs, comp_dict["input_sockets"]) 279 280 component_pipeline_outputs = await self._run_component_async( 281 component_name=component_name, 282 component=comp_dict, 283 component_inputs=component_inputs, 284 component_visits=component_visits, 285 parent_span=parent_span, 286 ) 287 288 # Distribute outputs to downstream inputs; also prune outputs based on `include_outputs_from` 289 pruned = self._write_component_outputs( 290 component_name=component_name, 291 component_outputs=component_pipeline_outputs, 292 inputs=inputs_state, 293 receivers=cached_receivers[component_name], 294 include_outputs_from=include_outputs_from, 295 ) 296 if pruned or component_name in include_outputs_from: 297 pipeline_outputs[component_name] = pruned 298 299 scheduled_components.remove(component_name) 300 if pruned or component_name in include_outputs_from: 301 yield {component_name: _deepcopy_with_exceptions(pruned)} 302 303 async def _schedule_task(component_name: str) -> None: 304 """ 305 Schedule a component to run. 306 307 We do NOT wait for it to finish here. This allows us to run other components concurrently. 308 309 :param component_name: The name of the component. 310 """ 311 312 if component_name in scheduled_components: 313 return # already scheduled, do nothing 314 315 scheduled_components.add(component_name) 316 317 comp_dict = self._get_component_with_graph_metadata_and_visits( 318 component_name, component_visits[component_name] 319 ) 320 component_inputs = self._consume_component_inputs(component_name, comp_dict, inputs_state) 321 component_inputs = self._add_missing_input_defaults(component_inputs, comp_dict["input_sockets"]) 322 323 async def _runner() -> Mapping[str, Any]: 324 async with ready_sem: 325 component_pipeline_outputs = await self._run_component_async( 326 component_name=component_name, 327 component=comp_dict, 328 component_inputs=component_inputs, 329 component_visits=component_visits, 330 parent_span=parent_span, 331 ) 332 333 # Distribute outputs to downstream inputs; also prune outputs based on `include_outputs_from` 334 pruned = self._write_component_outputs( 335 component_name=component_name, 336 component_outputs=component_pipeline_outputs, 337 inputs=inputs_state, 338 receivers=cached_receivers[component_name], 339 include_outputs_from=include_outputs_from, 340 ) 341 if pruned or component_name in include_outputs_from: 342 pipeline_outputs[component_name] = pruned 343 344 scheduled_components.remove(component_name) 345 return pruned 346 347 task = asyncio.create_task(_runner()) 348 running_tasks[task] = component_name 349 350 async def _wait_for_one_task_to_complete() -> AsyncIterator[dict[str, Any]]: 351 """ 352 Wait for exactly one running task to finish, yield partial outputs. 353 354 If no tasks are running, does nothing. 355 """ 356 if running_tasks: 357 done, _ = await asyncio.wait(running_tasks.keys(), return_when=asyncio.FIRST_COMPLETED) 358 for finished in done: 359 finished_component_name = running_tasks.pop(finished) 360 partial_result = finished.result() 361 scheduled_components.discard(finished_component_name) 362 if partial_result: 363 yield {finished_component_name: _deepcopy_with_exceptions(partial_result)} 364 365 async def _wait_for_all_tasks_to_complete() -> AsyncIterator[dict[str, Any]]: 366 """ 367 Wait for all running tasks to finish, yield partial outputs. 368 """ 369 if running_tasks: 370 done, _ = await asyncio.wait(running_tasks.keys(), return_when=asyncio.ALL_COMPLETED) 371 for finished in done: 372 finished_component_name = running_tasks.pop(finished) 373 partial_result = finished.result() 374 scheduled_components.discard(finished_component_name) 375 if partial_result: 376 yield {finished_component_name: _deepcopy_with_exceptions(partial_result)} 377 378 # ------------------------------------------------- 379 # MAIN SCHEDULING LOOP 380 # ------------------------------------------------- 381 while True: 382 # 2) Build the priority queue of candidates 383 priority_queue = self._fill_queue(ordered_names, inputs_state, component_visits) 384 candidate = self._get_next_runnable_component(priority_queue, component_visits) 385 386 if (candidate is None or candidate[0] == ComponentPriority.BLOCKED) and running_tasks: 387 # We need to wait for one task to finish to make progress and potentially unblock the priority_queue 388 async for partial_res in _wait_for_one_task_to_complete(): 389 yield partial_res 390 continue 391 392 if candidate is None and not running_tasks: 393 # done 394 break 395 396 priority, comp_name, comp = candidate # type: ignore 397 398 # If the next component is blocked, we do a check to see if the pipeline is possibly blocked and raise 399 # a warning if it is. 400 if priority == ComponentPriority.BLOCKED and not running_tasks: 401 if self._is_pipeline_possibly_blocked(current_pipeline_outputs=pipeline_outputs): 402 # Pipeline is most likely blocked (most likely a configuration issue) so we raise a warning. 403 self._find_components_blocking_pipeline( 404 priority_queue=priority_queue, component_visits=component_visits, inputs=inputs_state 405 ) 406 # We always exit the loop since we cannot run the next component. 407 break 408 409 if comp_name in scheduled_components: 410 # We need to wait for one task to finish to make progress 411 async for partial_res in _wait_for_one_task_to_complete(): 412 yield partial_res 413 continue 414 415 if priority == ComponentPriority.HIGHEST: 416 # 1) run alone 417 async for partial_res in _run_highest_in_isolation(comp_name): 418 yield partial_res 419 # then continue the loop 420 continue 421 422 if priority == ComponentPriority.READY: 423 # 1) schedule this one 424 await _schedule_task(comp_name) 425 426 # 2) Possibly schedule more READY tasks if concurrency not fully used 427 while len(priority_queue) > 0 and not ready_sem.locked(): 428 peek_prio, peek_name = priority_queue.peek() 429 if peek_prio in (ComponentPriority.BLOCKED, ComponentPriority.HIGHEST): 430 # can't run or must run alone => skip 431 break 432 if peek_prio == ComponentPriority.READY: 433 priority_queue.pop() 434 await _schedule_task(peek_name) 435 # keep adding while concurrency is not locked 436 continue 437 438 # The next is DEFER/DEFER_LAST => we only schedule it if it "becomes READY" 439 # We'll handle it in the next iteration or with incremental waiting 440 break 441 442 # We only schedule components with priority DEFER or DEFER_LAST when no other tasks are running 443 elif priority in (ComponentPriority.DEFER, ComponentPriority.DEFER_LAST) and not running_tasks: 444 if len(priority_queue) > 0: 445 comp_name, topological_sort = self._tiebreak_waiting_components( 446 component_name=comp_name, 447 priority=priority, 448 priority_queue=priority_queue, 449 topological_sort=cached_topological_sort, 450 ) 451 cached_topological_sort = topological_sort 452 453 await _schedule_task(comp_name) 454 455 # To make progress, we wait for one task to complete before re-starting the loop 456 async for partial_res in _wait_for_one_task_to_complete(): 457 yield partial_res 458 459 # End main loop 460 461 # 3) Drain leftover tasks 462 async for partial_res in _wait_for_all_tasks_to_complete(): 463 yield partial_res 464 465 # 4) Yield final pipeline outputs 466 yield pipeline_outputs 467 468 async def run_async( 469 self, data: dict[str, Any], include_outputs_from: set[str] | None = None, concurrency_limit: int = 4 470 ) -> dict[str, Any]: 471 """ 472 Provides an asynchronous interface to run the pipeline with provided input data. 473 474 This method allows the pipeline to be integrated into an asynchronous workflow, enabling non-blocking 475 execution of pipeline components. 476 477 Usage: 478 ```python 479 import asyncio 480 481 from haystack import Document 482 from haystack.components.builders import ChatPromptBuilder 483 from haystack.components.generators.chat import OpenAIChatGenerator 484 from haystack.components.retrievers.in_memory import InMemoryBM25Retriever 485 from haystack.core.pipeline import AsyncPipeline 486 from haystack.dataclasses import ChatMessage 487 from haystack.document_stores.in_memory import InMemoryDocumentStore 488 489 # Write documents to InMemoryDocumentStore 490 document_store = InMemoryDocumentStore() 491 document_store.write_documents([ 492 Document(content="My name is Jean and I live in Paris."), 493 Document(content="My name is Mark and I live in Berlin."), 494 Document(content="My name is Giorgio and I live in Rome.") 495 ]) 496 497 prompt_template = [ 498 ChatMessage.from_user( 499 ''' 500 Given these documents, answer the question. 501 Documents: 502 {% for doc in documents %} 503 {{ doc.content }} 504 {% endfor %} 505 Question: {{question}} 506 Answer: 507 ''') 508 ] 509 510 retriever = InMemoryBM25Retriever(document_store=document_store) 511 prompt_builder = ChatPromptBuilder(template=prompt_template) 512 llm = OpenAIChatGenerator() 513 514 rag_pipeline = AsyncPipeline() 515 rag_pipeline.add_component("retriever", retriever) 516 rag_pipeline.add_component("prompt_builder", prompt_builder) 517 rag_pipeline.add_component("llm", llm) 518 rag_pipeline.connect("retriever", "prompt_builder.documents") 519 rag_pipeline.connect("prompt_builder", "llm") 520 521 # Ask a question 522 question = "Who lives in Paris?" 523 524 async def run_inner(data, include_outputs_from): 525 return await rag_pipeline.run_async(data=data, include_outputs_from=include_outputs_from) 526 527 data = { 528 "retriever": {"query": question}, 529 "prompt_builder": {"question": question}, 530 } 531 532 results = asyncio.run(run_inner(data, include_outputs_from={"retriever", "llm"})) 533 534 print(results["llm"]["replies"]) 535 # [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>, _content=[TextContent(text='Jean lives in Paris.')], 536 # _name=None, _meta={'model': 'gpt-5-mini', 'index': 0, 'finish_reason': 'stop', 'usage': 537 # {'completion_tokens': 6, 'prompt_tokens': 69, 'total_tokens': 75, 538 # 'completion_tokens_details': CompletionTokensDetails(accepted_prediction_tokens=0, 539 # audio_tokens=0, reasoning_tokens=0, rejected_prediction_tokens=0), 'prompt_tokens_details': 540 # PromptTokensDetails(audio_tokens=0, cached_tokens=0)}})] 541 ``` 542 543 :param data: 544 A dictionary of inputs for the pipeline's components. Each key is a component name 545 and its value is a dictionary of that component's input parameters: 546 ``` 547 data = { 548 "comp1": {"input1": 1, "input2": 2}, 549 } 550 ``` 551 For convenience, this format is also supported when input names are unique: 552 ``` 553 data = { 554 "input1": 1, "input2": 2, 555 } 556 ``` 557 :param include_outputs_from: 558 Set of component names whose individual outputs are to be 559 included in the pipeline's output. For components that are 560 invoked multiple times (in a loop), only the last-produced 561 output is included. 562 :param concurrency_limit: The maximum number of components that should be allowed to run concurrently. 563 :returns: 564 A dictionary where each entry corresponds to a component name 565 and its output. If `include_outputs_from` is `None`, this dictionary 566 will only contain the outputs of leaf components, i.e., components 567 without outgoing connections. 568 569 :raises ValueError: 570 If invalid inputs are provided to the pipeline. 571 :raises PipelineRuntimeError: 572 If the Pipeline contains cycles with unsupported connections that would cause 573 it to get stuck and fail running. 574 Or if a Component fails or returns output in an unsupported type. 575 :raises PipelineMaxComponentRuns: 576 If a Component reaches the maximum number of times it can be run in this Pipeline. 577 """ 578 final: dict[str, Any] = {} 579 async for partial in self.run_async_generator( 580 data=data, concurrency_limit=concurrency_limit, include_outputs_from=include_outputs_from 581 ): 582 final = partial 583 return final or {} 584 585 def run( 586 self, data: dict[str, Any], include_outputs_from: set[str] | None = None, concurrency_limit: int = 4 587 ) -> dict[str, Any]: 588 """ 589 Provides a synchronous interface to run the pipeline with given input data. 590 591 Internally, the pipeline components are executed asynchronously, but the method itself 592 will block until the entire pipeline execution is complete. 593 594 In case you need asynchronous methods, consider using `run_async` or `run_async_generator`. 595 596 Usage: 597 ```python 598 from haystack import Document 599 from haystack.components.builders import ChatPromptBuilder 600 from haystack.components.generators.chat import OpenAIChatGenerator 601 from haystack.components.retrievers.in_memory import InMemoryBM25Retriever 602 from haystack.core.pipeline import AsyncPipeline 603 from haystack.dataclasses import ChatMessage 604 from haystack.document_stores.in_memory import InMemoryDocumentStore 605 606 # Write documents to InMemoryDocumentStore 607 document_store = InMemoryDocumentStore() 608 document_store.write_documents([ 609 Document(content="My name is Jean and I live in Paris."), 610 Document(content="My name is Mark and I live in Berlin."), 611 Document(content="My name is Giorgio and I live in Rome.") 612 ]) 613 614 prompt_template = [ 615 ChatMessage.from_user( 616 ''' 617 Given these documents, answer the question. 618 Documents: 619 {% for doc in documents %} 620 {{ doc.content }} 621 {% endfor %} 622 Question: {{question}} 623 Answer: 624 ''') 625 ] 626 627 628 retriever = InMemoryBM25Retriever(document_store=document_store) 629 prompt_builder = ChatPromptBuilder(template=prompt_template) 630 llm = OpenAIChatGenerator() 631 632 rag_pipeline = AsyncPipeline() 633 rag_pipeline.add_component("retriever", retriever) 634 rag_pipeline.add_component("prompt_builder", prompt_builder) 635 rag_pipeline.add_component("llm", llm) 636 rag_pipeline.connect("retriever", "prompt_builder.documents") 637 rag_pipeline.connect("prompt_builder", "llm") 638 639 # Ask a question 640 question = "Who lives in Paris?" 641 642 data = { 643 "retriever": {"query": question}, 644 "prompt_builder": {"question": question}, 645 } 646 647 results = rag_pipeline.run(data) 648 649 print(results["llm"]["replies"]) 650 # [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>, _content=[TextContent(text='Jean lives in Paris.')], 651 # _name=None, _meta={'model': 'gpt-5-mini', 'index': 0, 'finish_reason': 'stop', 'usage': 652 # {'completion_tokens': 6, 'prompt_tokens': 69, 'total_tokens': 75, 'completion_tokens_details': 653 # CompletionTokensDetails(accepted_prediction_tokens=0, audio_tokens=0, reasoning_tokens=0, 654 # rejected_prediction_tokens=0), 'prompt_tokens_details': PromptTokensDetails(audio_tokens=0, 655 # cached_tokens=0)}})] 656 ``` 657 658 :param data: 659 A dictionary of inputs for the pipeline's components. Each key is a component name 660 and its value is a dictionary of that component's input parameters: 661 ``` 662 data = { 663 "comp1": {"input1": 1, "input2": 2}, 664 } 665 ``` 666 For convenience, this format is also supported when input names are unique: 667 ``` 668 data = { 669 "input1": 1, "input2": 2, 670 } 671 ``` 672 :param include_outputs_from: 673 Set of component names whose individual outputs are to be 674 included in the pipeline's output. For components that are 675 invoked multiple times (in a loop), only the last-produced 676 output is included. 677 :param concurrency_limit: The maximum number of components that should be allowed to run concurrently. 678 679 :returns: 680 A dictionary where each entry corresponds to a component name 681 and its output. If `include_outputs_from` is `None`, this dictionary 682 will only contain the outputs of leaf components, i.e., components 683 without outgoing connections. 684 685 :raises ValueError: 686 If invalid inputs are provided to the pipeline. 687 :raises PipelineRuntimeError: 688 If the Pipeline contains cycles with unsupported connections that would cause 689 it to get stuck and fail running. 690 Or if a Component fails or returns output in an unsupported type. 691 :raises PipelineMaxComponentRuns: 692 If a Component reaches the maximum number of times it can be run in this Pipeline. 693 :raises RuntimeError: 694 If called from within an async context. Use `run_async` instead. 695 """ 696 try: 697 asyncio.get_running_loop() 698 except RuntimeError: 699 # No running loop: safe to use asyncio.run() 700 return asyncio.run( 701 self.run_async( 702 data=data, include_outputs_from=include_outputs_from, concurrency_limit=concurrency_limit 703 ) 704 ) 705 else: 706 # Running loop present: do not create the coroutine and do not call asyncio.run() 707 raise RuntimeError( 708 "Cannot call run() from within an async context. Use 'await pipeline.run_async(...)' instead." 709 )