/ haystack / core / pipeline / async_pipeline.py
async_pipeline.py
  1  # SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
  2  #
  3  # SPDX-License-Identifier: Apache-2.0
  4  
  5  import asyncio
  6  import contextvars
  7  from collections.abc import AsyncIterator, Mapping
  8  from typing import Any
  9  
 10  from haystack import logging, tracing
 11  from haystack.core.component import Component
 12  from haystack.core.errors import BreakpointException, PipelineRuntimeError
 13  from haystack.core.pipeline.base import (
 14      _COMPONENT_INPUT,
 15      _COMPONENT_OUTPUT,
 16      _COMPONENT_VISITS,
 17      ComponentPriority,
 18      PipelineBase,
 19      _validate_component_output_keys,
 20  )
 21  from haystack.core.pipeline.utils import _deepcopy_with_exceptions
 22  from haystack.dataclasses.breakpoints import Breakpoint
 23  from haystack.telemetry import pipeline_running
 24  
 25  logger = logging.getLogger(__name__)
 26  
 27  
 28  class AsyncPipeline(PipelineBase):
 29      """
 30      Asynchronous version of the Pipeline orchestration engine.
 31  
 32      Manages components in a pipeline allowing for concurrent processing when the pipeline's execution graph permits.
 33      This enables efficient processing of components by minimizing idle time and maximizing resource utilization.
 34      """
 35  
 36      @staticmethod
 37      async def _run_component_async(
 38          component_name: str,
 39          component: dict[str, Any],
 40          component_inputs: dict[str, Any],
 41          component_visits: dict[str, int],
 42          parent_span: tracing.Span | None = None,
 43          *,
 44          break_point: Breakpoint | None = None,
 45      ) -> Mapping[str, Any]:
 46          """
 47          Executes a single component asynchronously.
 48  
 49          If the component supports async execution, it is awaited directly as it will run async;
 50          otherwise the component is offloaded to executor.
 51  
 52          The method also updates the `visits` count of the component, writes outputs to `inputs_state`,
 53          and returns pruned outputs that get stored in `pipeline_outputs`.
 54  
 55          :param component_name: The name of the component.
 56          :param component_inputs: Inputs for the component.
 57          :returns: Outputs from the component that can be yielded from run_async_generator.
 58          """
 59          if (
 60              isinstance(break_point, Breakpoint)
 61              and break_point.component_name == component_name
 62              and break_point.visit_count == component_visits[component_name]
 63          ):
 64              raise BreakpointException.from_triggered_breakpoint(break_point=break_point)
 65  
 66          instance: Component = component["instance"]
 67  
 68          with PipelineBase._create_component_span(
 69              component_name=component_name, instance=instance, inputs=component_inputs, parent_span=parent_span
 70          ) as span:
 71              # deepcopy inputs before passing to the tracer so that even if a tracer mutates them
 72              # the component always receives the original unmodified values
 73              component_inputs_copy = _deepcopy_with_exceptions(component_inputs)
 74              span.set_content_tag(_COMPONENT_INPUT, component_inputs)
 75              logger.info("Running component {component_name}", component_name=component_name)
 76  
 77              if getattr(instance, "__haystack_supports_async__", False):
 78                  try:
 79                      outputs = await instance.run_async(**component_inputs_copy)  # type: ignore
 80                  except Exception as error:
 81                      raise PipelineRuntimeError.from_exception(component_name, instance.__class__, error) from error
 82              else:
 83                  loop = asyncio.get_running_loop()
 84                  # Important: contextvars (e.g. active tracing Span) don’t propagate to running loop's ThreadPoolExecutor
 85                  # We use ctx.run(...) to preserve context like the active tracing span
 86                  ctx = contextvars.copy_context()
 87                  try:
 88                      outputs = await loop.run_in_executor(
 89                          None, lambda: ctx.run(lambda: instance.run(**component_inputs_copy))
 90                      )
 91                  except Exception as error:
 92                      raise PipelineRuntimeError.from_exception(component_name, instance.__class__, error) from error
 93  
 94              component_visits[component_name] += 1
 95  
 96              if not isinstance(outputs, Mapping):
 97                  raise PipelineRuntimeError.from_invalid_output(component_name, instance.__class__, outputs)
 98  
 99              _validate_component_output_keys(component_name, component, outputs)
100  
101              span.set_tag(_COMPONENT_VISITS, component_visits[component_name])
102              span.set_content_tag(_COMPONENT_OUTPUT, outputs)
103  
104              return outputs
105  
106      async def run_async_generator(  # noqa: PLR0915,C901
107          self, data: dict[str, Any], include_outputs_from: set[str] | None = None, concurrency_limit: int = 4
108      ) -> AsyncIterator[dict[str, Any]]:
109          """
110          Executes the pipeline step by step asynchronously, yielding partial outputs when any component finishes.
111  
112          Usage:
113          ```python
114          from haystack import Document
115          from haystack.components.builders import ChatPromptBuilder
116          from haystack.dataclasses import ChatMessage
117          from haystack.utils import Secret
118          from haystack.document_stores.in_memory import InMemoryDocumentStore
119          from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
120          from haystack.components.generators.chat import OpenAIChatGenerator
121          from haystack.components.builders.prompt_builder import PromptBuilder
122          from haystack import AsyncPipeline
123          import asyncio
124  
125          # Write documents to InMemoryDocumentStore
126          document_store = InMemoryDocumentStore()
127          document_store.write_documents([
128              Document(content="My name is Jean and I live in Paris."),
129              Document(content="My name is Mark and I live in Berlin."),
130              Document(content="My name is Giorgio and I live in Rome.")
131          ])
132  
133          prompt_template = [
134              ChatMessage.from_user(
135                  '''
136                  Given these documents, answer the question.
137                  Documents:
138                  {% for doc in documents %}
139                      {{ doc.content }}
140                  {% endfor %}
141                  Question: {{question}}
142                  Answer:
143                  ''')
144          ]
145  
146          # Create and connect pipeline components
147          retriever = InMemoryBM25Retriever(document_store=document_store)
148          prompt_builder = ChatPromptBuilder(template=prompt_template)
149          llm = OpenAIChatGenerator()
150  
151          rag_pipeline = AsyncPipeline()
152          rag_pipeline.add_component("retriever", retriever)
153          rag_pipeline.add_component("prompt_builder", prompt_builder)
154          rag_pipeline.add_component("llm", llm)
155          rag_pipeline.connect("retriever", "prompt_builder.documents")
156          rag_pipeline.connect("prompt_builder", "llm")
157  
158          # Prepare input data
159          question = "Who lives in Paris?"
160          data = {
161              "retriever": {"query": question},
162              "prompt_builder": {"question": question},
163          }
164  
165  
166          # Process results as they become available
167          async def process_results():
168              async for partial_output in rag_pipeline.run_async_generator(
169                      data=data,
170                      include_outputs_from={"retriever", "llm"}
171              ):
172                  # Each partial_output contains the results from a completed component
173                  if "retriever" in partial_output:
174                      print("Retrieved documents:", len(partial_output["retriever"]["documents"]))
175                  if "llm" in partial_output:
176                      print("Generated answer:", partial_output["llm"]["replies"][0])
177  
178  
179          asyncio.run(process_results())
180          ```
181  
182          :param data: Initial input data to the pipeline.
183          :param concurrency_limit: The maximum number of components that are allowed to run concurrently.
184          :param include_outputs_from:
185              Set of component names whose individual outputs are to be
186              included in the pipeline's output. For components that are
187              invoked multiple times (in a loop), only the last-produced
188              output is included.
189          :return: An async iterator containing partial (and final) outputs.
190  
191          :raises ValueError:
192              If invalid inputs are provided to the pipeline.
193          :raises PipelineMaxComponentRuns:
194              If a component exceeds the maximum number of allowed executions within the pipeline.
195          :raises PipelineRuntimeError:
196              If the Pipeline contains cycles with unsupported connections that would cause
197              it to get stuck and fail running.
198              Or if a Component fails or returns output in an unsupported type.
199          """
200          if include_outputs_from is None:
201              include_outputs_from = set()
202  
203          # 0) Basic pipeline init
204          pipeline_running(self)  # telemetry
205          self.warm_up()  # optional warm-up (if needed)
206  
207          # 1) Prepare ephemeral state
208          ready_sem = asyncio.Semaphore(max(1, concurrency_limit))
209          inputs_state: dict[str, dict[str, list[dict[str, Any]]]] = {}
210          pipeline_outputs: dict[str, Any] = {}
211          running_tasks: dict[asyncio.Task, str] = {}
212  
213          # A set of component names that have been scheduled but not finished:
214          scheduled_components: set[str] = set()
215  
216          # 2) Convert input data
217          prepared_data = self._prepare_component_input_data(data)
218  
219          # raises ValueError if input is malformed in some way
220          self.validate_input(prepared_data)
221          inputs_state = self._convert_to_internal_format(prepared_data)
222  
223          # For quick lookup of downstream receivers
224          ordered_names = sorted(self.graph.nodes.keys())
225          cached_receivers = {n: self._find_receivers_from(n) for n in ordered_names}
226          component_visits = dict.fromkeys(ordered_names, 0)
227          cached_topological_sort = None
228  
229          # We fill the queue once and raise if all components are BLOCKED
230          self.validate_pipeline(self._fill_queue(ordered_names, inputs_state, component_visits))
231  
232          # Single parent span for entire pipeline execution
233          with tracing.tracer.trace(
234              "haystack.async_pipeline.run",
235              tags={
236                  "haystack.pipeline.input_data": prepared_data,
237                  "haystack.pipeline.output_data": pipeline_outputs,
238                  "haystack.pipeline.metadata": self.metadata,
239                  "haystack.pipeline.max_runs_per_component": self._max_runs_per_component,
240              },
241          ) as parent_span:
242              # -------------------------------------------------
243              # We define some functions here so that they have access to local runtime state
244              # (inputs, tasks, scheduled components) via closures.
245              # -------------------------------------------------
246              async def _run_highest_in_isolation(component_name: str) -> AsyncIterator[dict[str, Any]]:
247                  """
248                  Runs a component with HIGHEST priority in isolation.
249  
250                  We need to run components with HIGHEST priority (i.e. components with GreedyVariadic input socket)
251                  by themselves, without any other components running concurrently. Otherwise, downstream components
252                  could produce additional inputs for the GreedyVariadic socket.
253  
254                  :param component_name: The name of the component.
255                  :return: An async iterator of partial outputs.
256                  """
257                  # 1) Wait for all in-flight tasks to finish
258                  while running_tasks:
259                      done, _pending = await asyncio.wait(running_tasks.keys(), return_when=asyncio.ALL_COMPLETED)
260                      for finished in done:
261                          finished_component_name = running_tasks.pop(finished)
262                          partial_result = finished.result()
263                          scheduled_components.discard(finished_component_name)
264                          if partial_result:
265                              yield_dict = {finished_component_name: _deepcopy_with_exceptions(partial_result)}
266                              yield yield_dict  # partial outputs
267  
268                  if component_name in scheduled_components:
269                      # If it's already scheduled for some reason, skip
270                      return
271  
272                  # 2) Run the HIGHEST component by itself
273                  scheduled_components.add(component_name)
274                  comp_dict = self._get_component_with_graph_metadata_and_visits(
275                      component_name, component_visits[component_name]
276                  )
277                  component_inputs = self._consume_component_inputs(component_name, comp_dict, inputs_state)
278                  component_inputs = self._add_missing_input_defaults(component_inputs, comp_dict["input_sockets"])
279  
280                  component_pipeline_outputs = await self._run_component_async(
281                      component_name=component_name,
282                      component=comp_dict,
283                      component_inputs=component_inputs,
284                      component_visits=component_visits,
285                      parent_span=parent_span,
286                  )
287  
288                  # Distribute outputs to downstream inputs; also prune outputs based on `include_outputs_from`
289                  pruned = self._write_component_outputs(
290                      component_name=component_name,
291                      component_outputs=component_pipeline_outputs,
292                      inputs=inputs_state,
293                      receivers=cached_receivers[component_name],
294                      include_outputs_from=include_outputs_from,
295                  )
296                  if pruned or component_name in include_outputs_from:
297                      pipeline_outputs[component_name] = pruned
298  
299                  scheduled_components.remove(component_name)
300                  if pruned or component_name in include_outputs_from:
301                      yield {component_name: _deepcopy_with_exceptions(pruned)}
302  
303              async def _schedule_task(component_name: str) -> None:
304                  """
305                  Schedule a component to run.
306  
307                  We do NOT wait for it to finish here. This allows us to run other components concurrently.
308  
309                  :param component_name: The name of the component.
310                  """
311  
312                  if component_name in scheduled_components:
313                      return  # already scheduled, do nothing
314  
315                  scheduled_components.add(component_name)
316  
317                  comp_dict = self._get_component_with_graph_metadata_and_visits(
318                      component_name, component_visits[component_name]
319                  )
320                  component_inputs = self._consume_component_inputs(component_name, comp_dict, inputs_state)
321                  component_inputs = self._add_missing_input_defaults(component_inputs, comp_dict["input_sockets"])
322  
323                  async def _runner() -> Mapping[str, Any]:
324                      async with ready_sem:
325                          component_pipeline_outputs = await self._run_component_async(
326                              component_name=component_name,
327                              component=comp_dict,
328                              component_inputs=component_inputs,
329                              component_visits=component_visits,
330                              parent_span=parent_span,
331                          )
332  
333                      # Distribute outputs to downstream inputs; also prune outputs based on `include_outputs_from`
334                      pruned = self._write_component_outputs(
335                          component_name=component_name,
336                          component_outputs=component_pipeline_outputs,
337                          inputs=inputs_state,
338                          receivers=cached_receivers[component_name],
339                          include_outputs_from=include_outputs_from,
340                      )
341                      if pruned or component_name in include_outputs_from:
342                          pipeline_outputs[component_name] = pruned
343  
344                      scheduled_components.remove(component_name)
345                      return pruned
346  
347                  task = asyncio.create_task(_runner())
348                  running_tasks[task] = component_name
349  
350              async def _wait_for_one_task_to_complete() -> AsyncIterator[dict[str, Any]]:
351                  """
352                  Wait for exactly one running task to finish, yield partial outputs.
353  
354                  If no tasks are running, does nothing.
355                  """
356                  if running_tasks:
357                      done, _ = await asyncio.wait(running_tasks.keys(), return_when=asyncio.FIRST_COMPLETED)
358                      for finished in done:
359                          finished_component_name = running_tasks.pop(finished)
360                          partial_result = finished.result()
361                          scheduled_components.discard(finished_component_name)
362                          if partial_result:
363                              yield {finished_component_name: _deepcopy_with_exceptions(partial_result)}
364  
365              async def _wait_for_all_tasks_to_complete() -> AsyncIterator[dict[str, Any]]:
366                  """
367                  Wait for all running tasks to finish, yield partial outputs.
368                  """
369                  if running_tasks:
370                      done, _ = await asyncio.wait(running_tasks.keys(), return_when=asyncio.ALL_COMPLETED)
371                      for finished in done:
372                          finished_component_name = running_tasks.pop(finished)
373                          partial_result = finished.result()
374                          scheduled_components.discard(finished_component_name)
375                          if partial_result:
376                              yield {finished_component_name: _deepcopy_with_exceptions(partial_result)}
377  
378              # -------------------------------------------------
379              # MAIN SCHEDULING LOOP
380              # -------------------------------------------------
381              while True:
382                  # 2) Build the priority queue of candidates
383                  priority_queue = self._fill_queue(ordered_names, inputs_state, component_visits)
384                  candidate = self._get_next_runnable_component(priority_queue, component_visits)
385  
386                  if (candidate is None or candidate[0] == ComponentPriority.BLOCKED) and running_tasks:
387                      # We need to wait for one task to finish to make progress and potentially unblock the priority_queue
388                      async for partial_res in _wait_for_one_task_to_complete():
389                          yield partial_res
390                      continue
391  
392                  if candidate is None and not running_tasks:
393                      # done
394                      break
395  
396                  priority, comp_name, comp = candidate  # type: ignore
397  
398                  # If the next component is blocked, we do a check to see if the pipeline is possibly blocked and raise
399                  # a warning if it is.
400                  if priority == ComponentPriority.BLOCKED and not running_tasks:
401                      if self._is_pipeline_possibly_blocked(current_pipeline_outputs=pipeline_outputs):
402                          # Pipeline is most likely blocked (most likely a configuration issue) so we raise a warning.
403                          self._find_components_blocking_pipeline(
404                              priority_queue=priority_queue, component_visits=component_visits, inputs=inputs_state
405                          )
406                      # We always exit the loop since we cannot run the next component.
407                      break
408  
409                  if comp_name in scheduled_components:
410                      # We need to wait for one task to finish to make progress
411                      async for partial_res in _wait_for_one_task_to_complete():
412                          yield partial_res
413                      continue
414  
415                  if priority == ComponentPriority.HIGHEST:
416                      # 1) run alone
417                      async for partial_res in _run_highest_in_isolation(comp_name):
418                          yield partial_res
419                      # then continue the loop
420                      continue
421  
422                  if priority == ComponentPriority.READY:
423                      # 1) schedule this one
424                      await _schedule_task(comp_name)
425  
426                      # 2) Possibly schedule more READY tasks if concurrency not fully used
427                      while len(priority_queue) > 0 and not ready_sem.locked():
428                          peek_prio, peek_name = priority_queue.peek()
429                          if peek_prio in (ComponentPriority.BLOCKED, ComponentPriority.HIGHEST):
430                              # can't run or must run alone => skip
431                              break
432                          if peek_prio == ComponentPriority.READY:
433                              priority_queue.pop()
434                              await _schedule_task(peek_name)
435                              # keep adding while concurrency is not locked
436                              continue
437  
438                          # The next is DEFER/DEFER_LAST => we only schedule it if it "becomes READY"
439                          # We'll handle it in the next iteration or with incremental waiting
440                          break
441  
442                  # We only schedule components with priority DEFER or DEFER_LAST when no other tasks are running
443                  elif priority in (ComponentPriority.DEFER, ComponentPriority.DEFER_LAST) and not running_tasks:
444                      if len(priority_queue) > 0:
445                          comp_name, topological_sort = self._tiebreak_waiting_components(
446                              component_name=comp_name,
447                              priority=priority,
448                              priority_queue=priority_queue,
449                              topological_sort=cached_topological_sort,
450                          )
451                          cached_topological_sort = topological_sort
452  
453                      await _schedule_task(comp_name)
454  
455                  # To make progress, we wait for one task to complete before re-starting the loop
456                  async for partial_res in _wait_for_one_task_to_complete():
457                      yield partial_res
458  
459              # End main loop
460  
461              # 3) Drain leftover tasks
462              async for partial_res in _wait_for_all_tasks_to_complete():
463                  yield partial_res
464  
465              # 4) Yield final pipeline outputs
466              yield pipeline_outputs
467  
468      async def run_async(
469          self, data: dict[str, Any], include_outputs_from: set[str] | None = None, concurrency_limit: int = 4
470      ) -> dict[str, Any]:
471          """
472          Provides an asynchronous interface to run the pipeline with provided input data.
473  
474          This method allows the pipeline to be integrated into an asynchronous workflow, enabling non-blocking
475          execution of pipeline components.
476  
477          Usage:
478          ```python
479          import asyncio
480  
481          from haystack import Document
482          from haystack.components.builders import ChatPromptBuilder
483          from haystack.components.generators.chat import OpenAIChatGenerator
484          from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
485          from haystack.core.pipeline import AsyncPipeline
486          from haystack.dataclasses import ChatMessage
487          from haystack.document_stores.in_memory import InMemoryDocumentStore
488  
489          # Write documents to InMemoryDocumentStore
490          document_store = InMemoryDocumentStore()
491          document_store.write_documents([
492              Document(content="My name is Jean and I live in Paris."),
493              Document(content="My name is Mark and I live in Berlin."),
494              Document(content="My name is Giorgio and I live in Rome.")
495          ])
496  
497          prompt_template = [
498              ChatMessage.from_user(
499                  '''
500                  Given these documents, answer the question.
501                  Documents:
502                  {% for doc in documents %}
503                      {{ doc.content }}
504                  {% endfor %}
505                  Question: {{question}}
506                  Answer:
507                  ''')
508          ]
509  
510          retriever = InMemoryBM25Retriever(document_store=document_store)
511          prompt_builder = ChatPromptBuilder(template=prompt_template)
512          llm = OpenAIChatGenerator()
513  
514          rag_pipeline = AsyncPipeline()
515          rag_pipeline.add_component("retriever", retriever)
516          rag_pipeline.add_component("prompt_builder", prompt_builder)
517          rag_pipeline.add_component("llm", llm)
518          rag_pipeline.connect("retriever", "prompt_builder.documents")
519          rag_pipeline.connect("prompt_builder", "llm")
520  
521          # Ask a question
522          question = "Who lives in Paris?"
523  
524          async def run_inner(data, include_outputs_from):
525              return await rag_pipeline.run_async(data=data, include_outputs_from=include_outputs_from)
526  
527          data = {
528              "retriever": {"query": question},
529              "prompt_builder": {"question": question},
530          }
531  
532          results = asyncio.run(run_inner(data, include_outputs_from={"retriever", "llm"}))
533  
534          print(results["llm"]["replies"])
535          # [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>, _content=[TextContent(text='Jean lives in Paris.')],
536          # _name=None, _meta={'model': 'gpt-5-mini', 'index': 0, 'finish_reason': 'stop', 'usage':
537          # {'completion_tokens': 6, 'prompt_tokens': 69, 'total_tokens': 75,
538          # 'completion_tokens_details': CompletionTokensDetails(accepted_prediction_tokens=0,
539          # audio_tokens=0, reasoning_tokens=0, rejected_prediction_tokens=0), 'prompt_tokens_details':
540          # PromptTokensDetails(audio_tokens=0, cached_tokens=0)}})]
541          ```
542  
543          :param data:
544              A dictionary of inputs for the pipeline's components. Each key is a component name
545              and its value is a dictionary of that component's input parameters:
546              ```
547              data = {
548                  "comp1": {"input1": 1, "input2": 2},
549              }
550              ```
551              For convenience, this format is also supported when input names are unique:
552              ```
553              data = {
554                  "input1": 1, "input2": 2,
555              }
556              ```
557          :param include_outputs_from:
558              Set of component names whose individual outputs are to be
559              included in the pipeline's output. For components that are
560              invoked multiple times (in a loop), only the last-produced
561              output is included.
562          :param concurrency_limit: The maximum number of components that should be allowed to run concurrently.
563          :returns:
564              A dictionary where each entry corresponds to a component name
565              and its output. If `include_outputs_from` is `None`, this dictionary
566              will only contain the outputs of leaf components, i.e., components
567              without outgoing connections.
568  
569          :raises ValueError:
570              If invalid inputs are provided to the pipeline.
571          :raises PipelineRuntimeError:
572              If the Pipeline contains cycles with unsupported connections that would cause
573              it to get stuck and fail running.
574              Or if a Component fails or returns output in an unsupported type.
575          :raises PipelineMaxComponentRuns:
576              If a Component reaches the maximum number of times it can be run in this Pipeline.
577          """
578          final: dict[str, Any] = {}
579          async for partial in self.run_async_generator(
580              data=data, concurrency_limit=concurrency_limit, include_outputs_from=include_outputs_from
581          ):
582              final = partial
583          return final or {}
584  
585      def run(
586          self, data: dict[str, Any], include_outputs_from: set[str] | None = None, concurrency_limit: int = 4
587      ) -> dict[str, Any]:
588          """
589          Provides a synchronous interface to run the pipeline with given input data.
590  
591          Internally, the pipeline components are executed asynchronously, but the method itself
592          will block until the entire pipeline execution is complete.
593  
594          In case you need asynchronous methods, consider using `run_async` or `run_async_generator`.
595  
596          Usage:
597          ```python
598          from haystack import Document
599          from haystack.components.builders import ChatPromptBuilder
600          from haystack.components.generators.chat import OpenAIChatGenerator
601          from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
602          from haystack.core.pipeline import AsyncPipeline
603          from haystack.dataclasses import ChatMessage
604          from haystack.document_stores.in_memory import InMemoryDocumentStore
605  
606          # Write documents to InMemoryDocumentStore
607          document_store = InMemoryDocumentStore()
608          document_store.write_documents([
609              Document(content="My name is Jean and I live in Paris."),
610              Document(content="My name is Mark and I live in Berlin."),
611              Document(content="My name is Giorgio and I live in Rome.")
612          ])
613  
614          prompt_template = [
615              ChatMessage.from_user(
616                  '''
617                  Given these documents, answer the question.
618                  Documents:
619                  {% for doc in documents %}
620                      {{ doc.content }}
621                  {% endfor %}
622                  Question: {{question}}
623                  Answer:
624                  ''')
625          ]
626  
627  
628          retriever = InMemoryBM25Retriever(document_store=document_store)
629          prompt_builder = ChatPromptBuilder(template=prompt_template)
630          llm = OpenAIChatGenerator()
631  
632          rag_pipeline = AsyncPipeline()
633          rag_pipeline.add_component("retriever", retriever)
634          rag_pipeline.add_component("prompt_builder", prompt_builder)
635          rag_pipeline.add_component("llm", llm)
636          rag_pipeline.connect("retriever", "prompt_builder.documents")
637          rag_pipeline.connect("prompt_builder", "llm")
638  
639          # Ask a question
640          question = "Who lives in Paris?"
641  
642          data = {
643              "retriever": {"query": question},
644              "prompt_builder": {"question": question},
645          }
646  
647          results = rag_pipeline.run(data)
648  
649          print(results["llm"]["replies"])
650          # [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>, _content=[TextContent(text='Jean lives in Paris.')],
651          # _name=None, _meta={'model': 'gpt-5-mini', 'index': 0, 'finish_reason': 'stop', 'usage':
652          # {'completion_tokens': 6, 'prompt_tokens': 69, 'total_tokens': 75, 'completion_tokens_details':
653          # CompletionTokensDetails(accepted_prediction_tokens=0, audio_tokens=0, reasoning_tokens=0,
654          # rejected_prediction_tokens=0), 'prompt_tokens_details': PromptTokensDetails(audio_tokens=0,
655          # cached_tokens=0)}})]
656          ```
657  
658          :param data:
659              A dictionary of inputs for the pipeline's components. Each key is a component name
660              and its value is a dictionary of that component's input parameters:
661              ```
662              data = {
663                  "comp1": {"input1": 1, "input2": 2},
664              }
665              ```
666              For convenience, this format is also supported when input names are unique:
667              ```
668              data = {
669                  "input1": 1, "input2": 2,
670              }
671              ```
672          :param include_outputs_from:
673              Set of component names whose individual outputs are to be
674              included in the pipeline's output. For components that are
675              invoked multiple times (in a loop), only the last-produced
676              output is included.
677          :param concurrency_limit: The maximum number of components that should be allowed to run concurrently.
678  
679          :returns:
680              A dictionary where each entry corresponds to a component name
681              and its output. If `include_outputs_from` is `None`, this dictionary
682              will only contain the outputs of leaf components, i.e., components
683              without outgoing connections.
684  
685          :raises ValueError:
686              If invalid inputs are provided to the pipeline.
687          :raises PipelineRuntimeError:
688              If the Pipeline contains cycles with unsupported connections that would cause
689              it to get stuck and fail running.
690              Or if a Component fails or returns output in an unsupported type.
691          :raises PipelineMaxComponentRuns:
692              If a Component reaches the maximum number of times it can be run in this Pipeline.
693          :raises RuntimeError:
694              If called from within an async context. Use `run_async` instead.
695          """
696          try:
697              asyncio.get_running_loop()
698          except RuntimeError:
699              # No running loop: safe to use asyncio.run()
700              return asyncio.run(
701                  self.run_async(
702                      data=data, include_outputs_from=include_outputs_from, concurrency_limit=concurrency_limit
703                  )
704              )
705          else:
706              # Running loop present: do not create the coroutine and do not call asyncio.run()
707              raise RuntimeError(
708                  "Cannot call run() from within an async context. Use 'await pipeline.run_async(...)' instead."
709              )