pipeline_api.md
   1  ---
   2  title: "Pipeline"
   3  id: pipeline-api
   4  description: "Arranges components and integrations in flow."
   5  slug: "/pipeline-api"
   6  ---
   7  
   8  <a id="async_pipeline"></a>
   9  
  10  ## Module async\_pipeline
  11  
  12  <a id="async_pipeline.AsyncPipeline"></a>
  13  
  14  ### AsyncPipeline
  15  
  16  Asynchronous version of the Pipeline orchestration engine.
  17  
  18  Manages components in a pipeline allowing for concurrent processing when the pipeline's execution graph permits.
  19  This enables efficient processing of components by minimizing idle time and maximizing resource utilization.
  20  
  21  <a id="async_pipeline.AsyncPipeline.run_async_generator"></a>
  22  
  23  #### AsyncPipeline.run\_async\_generator
  24  
  25  ```python
  26  async def run_async_generator(
  27          data: dict[str, Any],
  28          include_outputs_from: Optional[set[str]] = None,
  29          concurrency_limit: int = 4) -> AsyncIterator[dict[str, Any]]
  30  ```
  31  
  32  Executes the pipeline step by step asynchronously, yielding partial outputs when any component finishes.
  33  
  34  Usage:
  35  ```python
  36  from haystack import Document
  37  from haystack.components.builders import ChatPromptBuilder
  38  from haystack.dataclasses import ChatMessage
  39  from haystack.utils import Secret
  40  from haystack.document_stores.in_memory import InMemoryDocumentStore
  41  from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
  42  from haystack.components.generators.chat import OpenAIChatGenerator
  43  from haystack.components.builders.prompt_builder import PromptBuilder
  44  from haystack import AsyncPipeline
  45  import asyncio
  46  
  47  # Write documents to InMemoryDocumentStore
  48  document_store = InMemoryDocumentStore()
  49  document_store.write_documents([
  50      Document(content="My name is Jean and I live in Paris."),
  51      Document(content="My name is Mark and I live in Berlin."),
  52      Document(content="My name is Giorgio and I live in Rome.")
  53  ])
  54  
  55  prompt_template = [
  56      ChatMessage.from_user(
  57          '''
  58          Given these documents, answer the question.
  59          Documents:
  60          {% for doc in documents %}
  61              {{ doc.content }}
  62          {% endfor %}
  63          Question: {{question}}
  64          Answer:
  65          ''')
  66  ]
  67  
  68  # Create and connect pipeline components
  69  retriever = InMemoryBM25Retriever(document_store=document_store)
  70  prompt_builder = ChatPromptBuilder(template=prompt_template)
  71  llm = OpenAIChatGenerator()
  72  
  73  rag_pipeline = AsyncPipeline()
  74  rag_pipeline.add_component("retriever", retriever)
  75  rag_pipeline.add_component("prompt_builder", prompt_builder)
  76  rag_pipeline.add_component("llm", llm)
  77  rag_pipeline.connect("retriever", "prompt_builder.documents")
  78  rag_pipeline.connect("prompt_builder", "llm")
  79  
  80  # Prepare input data
  81  question = "Who lives in Paris?"
  82  data = {
  83      "retriever": {"query": question},
  84      "prompt_builder": {"question": question},
  85  }
  86  
  87  
  88  # Process results as they become available
  89  async def process_results():
  90      async for partial_output in rag_pipeline.run_async_generator(
  91              data=data,
  92              include_outputs_from={"retriever", "llm"}
  93      ):
  94          # Each partial_output contains the results from a completed component
  95          if "retriever" in partial_output:
  96              print("Retrieved documents:", len(partial_output["retriever"]["documents"]))
  97          if "llm" in partial_output:
  98              print("Generated answer:", partial_output["llm"]["replies"][0])
  99  
 100  
 101  asyncio.run(process_results())
 102  ```
 103  
 104  **Arguments**:
 105  
 106  - `data`: Initial input data to the pipeline.
 107  - `concurrency_limit`: The maximum number of components that are allowed to run concurrently.
 108  - `include_outputs_from`: Set of component names whose individual outputs are to be
 109  included in the pipeline's output. For components that are
 110  invoked multiple times (in a loop), only the last-produced
 111  output is included.
 112  
 113  **Raises**:
 114  
 115  - `ValueError`: If invalid inputs are provided to the pipeline.
 116  - `PipelineMaxComponentRuns`: If a component exceeds the maximum number of allowed executions within the pipeline.
 117  - `PipelineRuntimeError`: If the Pipeline contains cycles with unsupported connections that would cause
 118  it to get stuck and fail running.
 119  Or if a Component fails or returns output in an unsupported type.
 120  
 121  **Returns**:
 122  
 123  An async iterator containing partial (and final) outputs.
 124  
 125  <a id="async_pipeline.AsyncPipeline.run_async"></a>
 126  
 127  #### AsyncPipeline.run\_async
 128  
 129  ```python
 130  async def run_async(data: dict[str, Any],
 131                      include_outputs_from: Optional[set[str]] = None,
 132                      concurrency_limit: int = 4) -> dict[str, Any]
 133  ```
 134  
 135  Provides an asynchronous interface to run the pipeline with provided input data.
 136  
 137  This method allows the pipeline to be integrated into an asynchronous workflow, enabling non-blocking
 138  execution of pipeline components.
 139  
 140  Usage:
 141  ```python
 142  import asyncio
 143  
 144  from haystack import Document
 145  from haystack.components.builders import ChatPromptBuilder
 146  from haystack.components.generators.chat import OpenAIChatGenerator
 147  from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
 148  from haystack.core.pipeline import AsyncPipeline
 149  from haystack.dataclasses import ChatMessage
 150  from haystack.document_stores.in_memory import InMemoryDocumentStore
 151  
 152  # Write documents to InMemoryDocumentStore
 153  document_store = InMemoryDocumentStore()
 154  document_store.write_documents([
 155      Document(content="My name is Jean and I live in Paris."),
 156      Document(content="My name is Mark and I live in Berlin."),
 157      Document(content="My name is Giorgio and I live in Rome.")
 158  ])
 159  
 160  prompt_template = [
 161      ChatMessage.from_user(
 162          '''
 163          Given these documents, answer the question.
 164          Documents:
 165          {% for doc in documents %}
 166              {{ doc.content }}
 167          {% endfor %}
 168          Question: {{question}}
 169          Answer:
 170          ''')
 171  ]
 172  
 173  retriever = InMemoryBM25Retriever(document_store=document_store)
 174  prompt_builder = ChatPromptBuilder(template=prompt_template)
 175  llm = OpenAIChatGenerator()
 176  
 177  rag_pipeline = AsyncPipeline()
 178  rag_pipeline.add_component("retriever", retriever)
 179  rag_pipeline.add_component("prompt_builder", prompt_builder)
 180  rag_pipeline.add_component("llm", llm)
 181  rag_pipeline.connect("retriever", "prompt_builder.documents")
 182  rag_pipeline.connect("prompt_builder", "llm")
 183  
 184  # Ask a question
 185  question = "Who lives in Paris?"
 186  
 187  async def run_inner(data, include_outputs_from):
 188      return await rag_pipeline.run_async(data=data, include_outputs_from=include_outputs_from)
 189  
 190  data = {
 191      "retriever": {"query": question},
 192      "prompt_builder": {"question": question},
 193  }
 194  
 195  results = asyncio.run(run_inner(data, include_outputs_from={"retriever", "llm"}))
 196  
 197  print(results["llm"]["replies"])
 198  # [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>, _content=[TextContent(text='Jean lives in Paris.')],
 199  # _name=None, _meta={'model': 'gpt-5-mini', 'index': 0, 'finish_reason': 'stop', 'usage':
 200  # {'completion_tokens': 6, 'prompt_tokens': 69, 'total_tokens': 75,
 201  # 'completion_tokens_details': CompletionTokensDetails(accepted_prediction_tokens=0,
 202  # audio_tokens=0, reasoning_tokens=0, rejected_prediction_tokens=0), 'prompt_tokens_details':
 203  # PromptTokensDetails(audio_tokens=0, cached_tokens=0)}})]
 204  ```
 205  
 206  **Arguments**:
 207  
 208  - `data`: A dictionary of inputs for the pipeline's components. Each key is a component name
 209  and its value is a dictionary of that component's input parameters:
 210  ```
 211  data = {
 212      "comp1": {"input1": 1, "input2": 2},
 213  }
 214  ```
 215  For convenience, this format is also supported when input names are unique:
 216  ```
 217  data = {
 218      "input1": 1, "input2": 2,
 219  }
 220  ```
 221  - `include_outputs_from`: Set of component names whose individual outputs are to be
 222  included in the pipeline's output. For components that are
 223  invoked multiple times (in a loop), only the last-produced
 224  output is included.
 225  - `concurrency_limit`: The maximum number of components that should be allowed to run concurrently.
 226  
 227  **Raises**:
 228  
 229  - `ValueError`: If invalid inputs are provided to the pipeline.
 230  - `PipelineRuntimeError`: If the Pipeline contains cycles with unsupported connections that would cause
 231  it to get stuck and fail running.
 232  Or if a Component fails or returns output in an unsupported type.
 233  - `PipelineMaxComponentRuns`: If a Component reaches the maximum number of times it can be run in this Pipeline.
 234  
 235  **Returns**:
 236  
 237  A dictionary where each entry corresponds to a component name
 238  and its output. If `include_outputs_from` is `None`, this dictionary
 239  will only contain the outputs of leaf components, i.e., components
 240  without outgoing connections.
 241  
 242  <a id="async_pipeline.AsyncPipeline.run"></a>
 243  
 244  #### AsyncPipeline.run
 245  
 246  ```python
 247  def run(data: dict[str, Any],
 248          include_outputs_from: Optional[set[str]] = None,
 249          concurrency_limit: int = 4) -> dict[str, Any]
 250  ```
 251  
 252  Provides a synchronous interface to run the pipeline with given input data.
 253  
 254  Internally, the pipeline components are executed asynchronously, but the method itself
 255  will block until the entire pipeline execution is complete.
 256  
 257  In case you need asynchronous methods, consider using `run_async` or `run_async_generator`.
 258  
 259  Usage:
 260  ```python
 261  from haystack import Document
 262  from haystack.components.builders import ChatPromptBuilder
 263  from haystack.components.generators.chat import OpenAIChatGenerator
 264  from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
 265  from haystack.core.pipeline import AsyncPipeline
 266  from haystack.dataclasses import ChatMessage
 267  from haystack.document_stores.in_memory import InMemoryDocumentStore
 268  
 269  # Write documents to InMemoryDocumentStore
 270  document_store = InMemoryDocumentStore()
 271  document_store.write_documents([
 272      Document(content="My name is Jean and I live in Paris."),
 273      Document(content="My name is Mark and I live in Berlin."),
 274      Document(content="My name is Giorgio and I live in Rome.")
 275  ])
 276  
 277  prompt_template = [
 278      ChatMessage.from_user(
 279          '''
 280          Given these documents, answer the question.
 281          Documents:
 282          {% for doc in documents %}
 283              {{ doc.content }}
 284          {% endfor %}
 285          Question: {{question}}
 286          Answer:
 287          ''')
 288  ]
 289  
 290  
 291  retriever = InMemoryBM25Retriever(document_store=document_store)
 292  prompt_builder = ChatPromptBuilder(template=prompt_template)
 293  llm = OpenAIChatGenerator()
 294  
 295  rag_pipeline = AsyncPipeline()
 296  rag_pipeline.add_component("retriever", retriever)
 297  rag_pipeline.add_component("prompt_builder", prompt_builder)
 298  rag_pipeline.add_component("llm", llm)
 299  rag_pipeline.connect("retriever", "prompt_builder.documents")
 300  rag_pipeline.connect("prompt_builder", "llm")
 301  
 302  # Ask a question
 303  question = "Who lives in Paris?"
 304  
 305  data = {
 306      "retriever": {"query": question},
 307      "prompt_builder": {"question": question},
 308  }
 309  
 310  results = rag_pipeline.run(data)
 311  
 312  print(results["llm"]["replies"])
 313  # [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>, _content=[TextContent(text='Jean lives in Paris.')],
 314  # _name=None, _meta={'model': 'gpt-5-mini', 'index': 0, 'finish_reason': 'stop', 'usage':
 315  # {'completion_tokens': 6, 'prompt_tokens': 69, 'total_tokens': 75, 'completion_tokens_details':
 316  # CompletionTokensDetails(accepted_prediction_tokens=0, audio_tokens=0, reasoning_tokens=0,
 317  # rejected_prediction_tokens=0), 'prompt_tokens_details': PromptTokensDetails(audio_tokens=0,
 318  # cached_tokens=0)}})]
 319  ```
 320  
 321  **Arguments**:
 322  
 323  - `data`: A dictionary of inputs for the pipeline's components. Each key is a component name
 324  and its value is a dictionary of that component's input parameters:
 325  ```
 326  data = {
 327      "comp1": {"input1": 1, "input2": 2},
 328  }
 329  ```
 330  For convenience, this format is also supported when input names are unique:
 331  ```
 332  data = {
 333      "input1": 1, "input2": 2,
 334  }
 335  ```
 336  - `include_outputs_from`: Set of component names whose individual outputs are to be
 337  included in the pipeline's output. For components that are
 338  invoked multiple times (in a loop), only the last-produced
 339  output is included.
 340  - `concurrency_limit`: The maximum number of components that should be allowed to run concurrently.
 341  
 342  **Raises**:
 343  
 344  - `ValueError`: If invalid inputs are provided to the pipeline.
 345  - `PipelineRuntimeError`: If the Pipeline contains cycles with unsupported connections that would cause
 346  it to get stuck and fail running.
 347  Or if a Component fails or returns output in an unsupported type.
 348  - `PipelineMaxComponentRuns`: If a Component reaches the maximum number of times it can be run in this Pipeline.
 349  - `RuntimeError`: If called from within an async context. Use `run_async` instead.
 350  
 351  **Returns**:
 352  
 353  A dictionary where each entry corresponds to a component name
 354  and its output. If `include_outputs_from` is `None`, this dictionary
 355  will only contain the outputs of leaf components, i.e., components
 356  without outgoing connections.
 357  
 358  <a id="async_pipeline.AsyncPipeline.__init__"></a>
 359  
 360  #### AsyncPipeline.\_\_init\_\_
 361  
 362  ```python
 363  def __init__(metadata: Optional[dict[str, Any]] = None,
 364               max_runs_per_component: int = 100,
 365               connection_type_validation: bool = True)
 366  ```
 367  
 368  Creates the Pipeline.
 369  
 370  **Arguments**:
 371  
 372  - `metadata`: Arbitrary dictionary to store metadata about this `Pipeline`. Make sure all the values contained in
 373  this dictionary can be serialized and deserialized if you wish to save this `Pipeline` to file.
 374  - `max_runs_per_component`: How many times the `Pipeline` can run the same Component.
 375  If this limit is reached a `PipelineMaxComponentRuns` exception is raised.
 376  If not set defaults to 100 runs per Component.
 377  - `connection_type_validation`: Whether the pipeline will validate the types of the connections.
 378  Defaults to True.
 379  
 380  <a id="async_pipeline.AsyncPipeline.__eq__"></a>
 381  
 382  #### AsyncPipeline.\_\_eq\_\_
 383  
 384  ```python
 385  def __eq__(other: object) -> bool
 386  ```
 387  
 388  Pipeline equality is defined by their type and the equality of their serialized form.
 389  
 390  Pipelines of the same type share every metadata, node and edge, but they're not required to use
 391  the same node instances: this allows pipeline saved and then loaded back to be equal to themselves.
 392  
 393  <a id="async_pipeline.AsyncPipeline.__repr__"></a>
 394  
 395  #### AsyncPipeline.\_\_repr\_\_
 396  
 397  ```python
 398  def __repr__() -> str
 399  ```
 400  
 401  Returns a text representation of the Pipeline.
 402  
 403  <a id="async_pipeline.AsyncPipeline.to_dict"></a>
 404  
 405  #### AsyncPipeline.to\_dict
 406  
 407  ```python
 408  def to_dict() -> dict[str, Any]
 409  ```
 410  
 411  Serializes the pipeline to a dictionary.
 412  
 413  This is meant to be an intermediate representation but it can be also used to save a pipeline to file.
 414  
 415  **Returns**:
 416  
 417  Dictionary with serialized data.
 418  
 419  <a id="async_pipeline.AsyncPipeline.from_dict"></a>
 420  
 421  #### AsyncPipeline.from\_dict
 422  
 423  ```python
 424  @classmethod
 425  def from_dict(cls: type[T],
 426                data: dict[str, Any],
 427                callbacks: Optional[DeserializationCallbacks] = None,
 428                **kwargs: Any) -> T
 429  ```
 430  
 431  Deserializes the pipeline from a dictionary.
 432  
 433  **Arguments**:
 434  
 435  - `data`: Dictionary to deserialize from.
 436  - `callbacks`: Callbacks to invoke during deserialization.
 437  - `kwargs`: `components`: a dictionary of `{name: instance}` to reuse instances of components instead of creating new
 438  ones.
 439  
 440  **Returns**:
 441  
 442  Deserialized component.
 443  
 444  <a id="async_pipeline.AsyncPipeline.dumps"></a>
 445  
 446  #### AsyncPipeline.dumps
 447  
 448  ```python
 449  def dumps(marshaller: Marshaller = DEFAULT_MARSHALLER) -> str
 450  ```
 451  
 452  Returns the string representation of this pipeline according to the format dictated by the `Marshaller` in use.
 453  
 454  **Arguments**:
 455  
 456  - `marshaller`: The Marshaller used to create the string representation. Defaults to `YamlMarshaller`.
 457  
 458  **Returns**:
 459  
 460  A string representing the pipeline.
 461  
 462  <a id="async_pipeline.AsyncPipeline.dump"></a>
 463  
 464  #### AsyncPipeline.dump
 465  
 466  ```python
 467  def dump(fp: TextIO, marshaller: Marshaller = DEFAULT_MARSHALLER) -> None
 468  ```
 469  
 470  Writes the string representation of this pipeline to the file-like object passed in the `fp` argument.
 471  
 472  **Arguments**:
 473  
 474  - `fp`: A file-like object ready to be written to.
 475  - `marshaller`: The Marshaller used to create the string representation. Defaults to `YamlMarshaller`.
 476  
 477  <a id="async_pipeline.AsyncPipeline.loads"></a>
 478  
 479  #### AsyncPipeline.loads
 480  
 481  ```python
 482  @classmethod
 483  def loads(cls: type[T],
 484            data: Union[str, bytes, bytearray],
 485            marshaller: Marshaller = DEFAULT_MARSHALLER,
 486            callbacks: Optional[DeserializationCallbacks] = None) -> T
 487  ```
 488  
 489  Creates a `Pipeline` object from the string representation passed in the `data` argument.
 490  
 491  **Arguments**:
 492  
 493  - `data`: The string representation of the pipeline, can be `str`, `bytes` or `bytearray`.
 494  - `marshaller`: The Marshaller used to create the string representation. Defaults to `YamlMarshaller`.
 495  - `callbacks`: Callbacks to invoke during deserialization.
 496  
 497  **Raises**:
 498  
 499  - `DeserializationError`: If an error occurs during deserialization.
 500  
 501  **Returns**:
 502  
 503  A `Pipeline` object.
 504  
 505  <a id="async_pipeline.AsyncPipeline.load"></a>
 506  
 507  #### AsyncPipeline.load
 508  
 509  ```python
 510  @classmethod
 511  def load(cls: type[T],
 512           fp: TextIO,
 513           marshaller: Marshaller = DEFAULT_MARSHALLER,
 514           callbacks: Optional[DeserializationCallbacks] = None) -> T
 515  ```
 516  
 517  Creates a `Pipeline` object a string representation.
 518  
 519  The string representation is read from the file-like object passed in the `fp` argument.
 520  
 521  **Arguments**:
 522  
 523  - `fp`: A file-like object ready to be read from.
 524  - `marshaller`: The Marshaller used to create the string representation. Defaults to `YamlMarshaller`.
 525  - `callbacks`: Callbacks to invoke during deserialization.
 526  
 527  **Raises**:
 528  
 529  - `DeserializationError`: If an error occurs during deserialization.
 530  
 531  **Returns**:
 532  
 533  A `Pipeline` object.
 534  
 535  <a id="async_pipeline.AsyncPipeline.add_component"></a>
 536  
 537  #### AsyncPipeline.add\_component
 538  
 539  ```python
 540  def add_component(name: str, instance: Component) -> None
 541  ```
 542  
 543  Add the given component to the pipeline.
 544  
 545  Components are not connected to anything by default: use `Pipeline.connect()` to connect components together.
 546  Component names must be unique, but component instances can be reused if needed.
 547  
 548  **Arguments**:
 549  
 550  - `name`: The name of the component to add.
 551  - `instance`: The component instance to add.
 552  
 553  **Raises**:
 554  
 555  - `ValueError`: If a component with the same name already exists.
 556  - `PipelineValidationError`: If the given instance is not a component.
 557  
 558  <a id="async_pipeline.AsyncPipeline.remove_component"></a>
 559  
 560  #### AsyncPipeline.remove\_component
 561  
 562  ```python
 563  def remove_component(name: str) -> Component
 564  ```
 565  
 566  Remove and returns component from the pipeline.
 567  
 568  Remove an existing component from the pipeline by providing its name.
 569  All edges that connect to the component will also be deleted.
 570  
 571  **Arguments**:
 572  
 573  - `name`: The name of the component to remove.
 574  
 575  **Raises**:
 576  
 577  - `ValueError`: If there is no component with that name already in the Pipeline.
 578  
 579  **Returns**:
 580  
 581  The removed Component instance.
 582  
 583  <a id="async_pipeline.AsyncPipeline.connect"></a>
 584  
 585  #### AsyncPipeline.connect
 586  
 587  ```python
 588  def connect(sender: str, receiver: str) -> "PipelineBase"
 589  ```
 590  
 591  Connects two components together.
 592  
 593  All components to connect must exist in the pipeline.
 594  If connecting to a component that has several output connections, specify the inputs and output names as
 595  'component_name.connections_name'.
 596  
 597  **Arguments**:
 598  
 599  - `sender`: The component that delivers the value. This can be either just a component name or can be
 600  in the format `component_name.connection_name` if the component has multiple outputs.
 601  - `receiver`: The component that receives the value. This can be either just a component name or can be
 602  in the format `component_name.connection_name` if the component has multiple inputs.
 603  
 604  **Raises**:
 605  
 606  - `PipelineConnectError`: If the two components cannot be connected (for example if one of the components is
 607  not present in the pipeline, or the connections don't match by type, and so on).
 608  
 609  **Returns**:
 610  
 611  The Pipeline instance.
 612  
 613  <a id="async_pipeline.AsyncPipeline.get_component"></a>
 614  
 615  #### AsyncPipeline.get\_component
 616  
 617  ```python
 618  def get_component(name: str) -> Component
 619  ```
 620  
 621  Get the component with the specified name from the pipeline.
 622  
 623  **Arguments**:
 624  
 625  - `name`: The name of the component.
 626  
 627  **Raises**:
 628  
 629  - `ValueError`: If a component with that name is not present in the pipeline.
 630  
 631  **Returns**:
 632  
 633  The instance of that component.
 634  
 635  <a id="async_pipeline.AsyncPipeline.get_component_name"></a>
 636  
 637  #### AsyncPipeline.get\_component\_name
 638  
 639  ```python
 640  def get_component_name(instance: Component) -> str
 641  ```
 642  
 643  Returns the name of the Component instance if it has been added to this Pipeline or an empty string otherwise.
 644  
 645  **Arguments**:
 646  
 647  - `instance`: The Component instance to look for.
 648  
 649  **Returns**:
 650  
 651  The name of the Component instance.
 652  
 653  <a id="async_pipeline.AsyncPipeline.inputs"></a>
 654  
 655  #### AsyncPipeline.inputs
 656  
 657  ```python
 658  def inputs(
 659      include_components_with_connected_inputs: bool = False
 660  ) -> dict[str, dict[str, Any]]
 661  ```
 662  
 663  Returns a dictionary containing the inputs of a pipeline.
 664  
 665  Each key in the dictionary corresponds to a component name, and its value is another dictionary that describes
 666  the input sockets of that component, including their types and whether they are optional.
 667  
 668  **Arguments**:
 669  
 670  - `include_components_with_connected_inputs`: If `False`, only components that have disconnected input edges are
 671  included in the output.
 672  
 673  **Returns**:
 674  
 675  A dictionary where each key is a pipeline component name and each value is a dictionary of
 676  inputs sockets of that component.
 677  
 678  <a id="async_pipeline.AsyncPipeline.outputs"></a>
 679  
 680  #### AsyncPipeline.outputs
 681  
 682  ```python
 683  def outputs(
 684      include_components_with_connected_outputs: bool = False
 685  ) -> dict[str, dict[str, Any]]
 686  ```
 687  
 688  Returns a dictionary containing the outputs of a pipeline.
 689  
 690  Each key in the dictionary corresponds to a component name, and its value is another dictionary that describes
 691  the output sockets of that component.
 692  
 693  **Arguments**:
 694  
 695  - `include_components_with_connected_outputs`: If `False`, only components that have disconnected output edges are
 696  included in the output.
 697  
 698  **Returns**:
 699  
 700  A dictionary where each key is a pipeline component name and each value is a dictionary of
 701  output sockets of that component.
 702  
 703  <a id="async_pipeline.AsyncPipeline.show"></a>
 704  
 705  #### AsyncPipeline.show
 706  
 707  ```python
 708  def show(*,
 709           server_url: str = "https://mermaid.ink",
 710           params: Optional[dict] = None,
 711           timeout: int = 30,
 712           super_component_expansion: bool = False) -> None
 713  ```
 714  
 715  Display an image representing this `Pipeline` in a Jupyter notebook.
 716  
 717  This function generates a diagram of the `Pipeline` using a Mermaid server and displays it directly in
 718  the notebook.
 719  
 720  **Arguments**:
 721  
 722  - `server_url`: The base URL of the Mermaid server used for rendering (default: 'https://mermaid.ink').
 723  See https://github.com/jihchi/mermaid.ink and https://github.com/mermaid-js/mermaid-live-editor for more
 724  info on how to set up your own Mermaid server.
 725  - `params`: Dictionary of customization parameters to modify the output. Refer to Mermaid documentation for more details
 726  Supported keys:
 727  - format: Output format ('img', 'svg', or 'pdf'). Default: 'img'.
 728  - type: Image type for /img endpoint ('jpeg', 'png', 'webp'). Default: 'png'.
 729  - theme: Mermaid theme ('default', 'neutral', 'dark', 'forest'). Default: 'neutral'.
 730  - bgColor: Background color in hexadecimal (e.g., 'FFFFFF') or named format (e.g., '!white').
 731  - width: Width of the output image (integer).
 732  - height: Height of the output image (integer).
 733  - scale: Scaling factor (1–3). Only applicable if 'width' or 'height' is specified.
 734  - fit: Whether to fit the diagram size to the page (PDF only, boolean).
 735  - paper: Paper size for PDFs (e.g., 'a4', 'a3'). Ignored if 'fit' is true.
 736  - landscape: Landscape orientation for PDFs (boolean). Ignored if 'fit' is true.
 737  - `timeout`: Timeout in seconds for the request to the Mermaid server.
 738  - `super_component_expansion`: If set to True and the pipeline contains SuperComponents the diagram will show the internal structure of
 739  super-components as if they were components part of the pipeline instead of a "black-box".
 740  Otherwise, only the super-component itself will be displayed.
 741  
 742  **Raises**:
 743  
 744  - `PipelineDrawingError`: If the function is called outside of a Jupyter notebook or if there is an issue with rendering.
 745  
 746  <a id="async_pipeline.AsyncPipeline.draw"></a>
 747  
 748  #### AsyncPipeline.draw
 749  
 750  ```python
 751  def draw(*,
 752           path: Path,
 753           server_url: str = "https://mermaid.ink",
 754           params: Optional[dict] = None,
 755           timeout: int = 30,
 756           super_component_expansion: bool = False) -> None
 757  ```
 758  
 759  Save an image representing this `Pipeline` to the specified file path.
 760  
 761  This function generates a diagram of the `Pipeline` using the Mermaid server and saves it to the provided path.
 762  
 763  **Arguments**:
 764  
 765  - `path`: The file path where the generated image will be saved.
 766  - `server_url`: The base URL of the Mermaid server used for rendering (default: 'https://mermaid.ink').
 767  See https://github.com/jihchi/mermaid.ink and https://github.com/mermaid-js/mermaid-live-editor for more
 768  info on how to set up your own Mermaid server.
 769  - `params`: Dictionary of customization parameters to modify the output. Refer to Mermaid documentation for more details
 770  Supported keys:
 771  - format: Output format ('img', 'svg', or 'pdf'). Default: 'img'.
 772  - type: Image type for /img endpoint ('jpeg', 'png', 'webp'). Default: 'png'.
 773  - theme: Mermaid theme ('default', 'neutral', 'dark', 'forest'). Default: 'neutral'.
 774  - bgColor: Background color in hexadecimal (e.g., 'FFFFFF') or named format (e.g., '!white').
 775  - width: Width of the output image (integer).
 776  - height: Height of the output image (integer).
 777  - scale: Scaling factor (1–3). Only applicable if 'width' or 'height' is specified.
 778  - fit: Whether to fit the diagram size to the page (PDF only, boolean).
 779  - paper: Paper size for PDFs (e.g., 'a4', 'a3'). Ignored if 'fit' is true.
 780  - landscape: Landscape orientation for PDFs (boolean). Ignored if 'fit' is true.
 781  - `timeout`: Timeout in seconds for the request to the Mermaid server.
 782  - `super_component_expansion`: If set to True and the pipeline contains SuperComponents the diagram will show the internal structure of
 783  super-components as if they were components part of the pipeline instead of a "black-box".
 784  Otherwise, only the super-component itself will be displayed.
 785  
 786  **Raises**:
 787  
 788  - `PipelineDrawingError`: If there is an issue with rendering or saving the image.
 789  
 790  <a id="async_pipeline.AsyncPipeline.walk"></a>
 791  
 792  #### AsyncPipeline.walk
 793  
 794  ```python
 795  def walk() -> Iterator[tuple[str, Component]]
 796  ```
 797  
 798  Visits each component in the pipeline exactly once and yields its name and instance.
 799  
 800  No guarantees are provided on the visiting order.
 801  
 802  **Returns**:
 803  
 804  An iterator of tuples of component name and component instance.
 805  
 806  <a id="async_pipeline.AsyncPipeline.warm_up"></a>
 807  
 808  #### AsyncPipeline.warm\_up
 809  
 810  ```python
 811  def warm_up() -> None
 812  ```
 813  
 814  Make sure all nodes are warm.
 815  
 816  It's the node's responsibility to make sure this method can be called at every `Pipeline.run()`
 817  without re-initializing everything.
 818  
 819  <a id="async_pipeline.AsyncPipeline.validate_input"></a>
 820  
 821  #### AsyncPipeline.validate\_input
 822  
 823  ```python
 824  def validate_input(data: dict[str, Any]) -> None
 825  ```
 826  
 827  Validates pipeline input data.
 828  
 829  Validates that data:
 830  * Each Component name actually exists in the Pipeline
 831  * Each Component is not missing any input
 832  * Each Component has only one input per input socket, if not variadic
 833  * Each Component doesn't receive inputs that are already sent by another Component
 834  
 835  **Arguments**:
 836  
 837  - `data`: A dictionary of inputs for the pipeline's components. Each key is a component name.
 838  
 839  **Raises**:
 840  
 841  - `ValueError`: If inputs are invalid according to the above.
 842  
 843  <a id="async_pipeline.AsyncPipeline.from_template"></a>
 844  
 845  #### AsyncPipeline.from\_template
 846  
 847  ```python
 848  @classmethod
 849  def from_template(
 850          cls,
 851          predefined_pipeline: PredefinedPipeline,
 852          template_params: Optional[dict[str, Any]] = None) -> "PipelineBase"
 853  ```
 854  
 855  Create a Pipeline from a predefined template. See `PredefinedPipeline` for available options.
 856  
 857  **Arguments**:
 858  
 859  - `predefined_pipeline`: The predefined pipeline to use.
 860  - `template_params`: An optional dictionary of parameters to use when rendering the pipeline template.
 861  
 862  **Returns**:
 863  
 864  An instance of `Pipeline`.
 865  
 866  <a id="async_pipeline.AsyncPipeline.validate_pipeline"></a>
 867  
 868  #### AsyncPipeline.validate\_pipeline
 869  
 870  ```python
 871  @staticmethod
 872  def validate_pipeline(priority_queue: FIFOPriorityQueue) -> None
 873  ```
 874  
 875  Validate the pipeline to check if it is blocked or has no valid entry point.
 876  
 877  **Arguments**:
 878  
 879  - `priority_queue`: Priority queue of component names.
 880  
 881  **Raises**:
 882  
 883  - `PipelineRuntimeError`: If the pipeline is blocked or has no valid entry point.
 884  
 885  <a id="pipeline"></a>
 886  
 887  ## Module pipeline
 888  
 889  <a id="pipeline.Pipeline"></a>
 890  
 891  ### Pipeline
 892  
 893  Synchronous version of the orchestration engine.
 894  
 895  Orchestrates component execution according to the execution graph, one after the other.
 896  
 897  <a id="pipeline.Pipeline.run"></a>
 898  
 899  #### Pipeline.run
 900  
 901  ```python
 902  def run(data: dict[str, Any],
 903          include_outputs_from: Optional[set[str]] = None,
 904          *,
 905          break_point: Optional[Union[Breakpoint, AgentBreakpoint]] = None,
 906          pipeline_snapshot: Optional[PipelineSnapshot] = None
 907          ) -> dict[str, Any]
 908  ```
 909  
 910  Runs the Pipeline with given input data.
 911  
 912  Usage:
 913  ```python
 914  from haystack import Pipeline, Document
 915  from haystack.utils import Secret
 916  from haystack.document_stores.in_memory import InMemoryDocumentStore
 917  from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
 918  from haystack.components.generators import OpenAIGenerator
 919  from haystack.components.builders.answer_builder import AnswerBuilder
 920  from haystack.components.builders.prompt_builder import PromptBuilder
 921  
 922  # Write documents to InMemoryDocumentStore
 923  document_store = InMemoryDocumentStore()
 924  document_store.write_documents([
 925      Document(content="My name is Jean and I live in Paris."),
 926      Document(content="My name is Mark and I live in Berlin."),
 927      Document(content="My name is Giorgio and I live in Rome.")
 928  ])
 929  
 930  prompt_template = """
 931  Given these documents, answer the question.
 932  Documents:
 933  {% for doc in documents %}
 934      {{ doc.content }}
 935  {% endfor %}
 936  Question: {{question}}
 937  Answer:
 938  """
 939  
 940  retriever = InMemoryBM25Retriever(document_store=document_store)
 941  prompt_builder = PromptBuilder(template=prompt_template)
 942  llm = OpenAIGenerator(api_key=Secret.from_token(api_key))
 943  
 944  rag_pipeline = Pipeline()
 945  rag_pipeline.add_component("retriever", retriever)
 946  rag_pipeline.add_component("prompt_builder", prompt_builder)
 947  rag_pipeline.add_component("llm", llm)
 948  rag_pipeline.connect("retriever", "prompt_builder.documents")
 949  rag_pipeline.connect("prompt_builder", "llm")
 950  
 951  # Ask a question
 952  question = "Who lives in Paris?"
 953  results = rag_pipeline.run(
 954      {
 955          "retriever": {"query": question},
 956          "prompt_builder": {"question": question},
 957      }
 958  )
 959  
 960  print(results["llm"]["replies"])
 961  # Jean lives in Paris
 962  ```
 963  
 964  **Arguments**:
 965  
 966  - `data`: A dictionary of inputs for the pipeline's components. Each key is a component name
 967  and its value is a dictionary of that component's input parameters:
 968  ```
 969  data = {
 970      "comp1": {"input1": 1, "input2": 2},
 971  }
 972  ```
 973  For convenience, this format is also supported when input names are unique:
 974  ```
 975  data = {
 976      "input1": 1, "input2": 2,
 977  }
 978  ```
 979  - `include_outputs_from`: Set of component names whose individual outputs are to be
 980  included in the pipeline's output. For components that are
 981  invoked multiple times (in a loop), only the last-produced
 982  output is included.
 983  - `break_point`: A set of breakpoints that can be used to debug the pipeline execution.
 984  - `pipeline_snapshot`: A dictionary containing a snapshot of a previously saved pipeline execution.
 985  
 986  **Raises**:
 987  
 988  - `ValueError`: If invalid inputs are provided to the pipeline.
 989  - `PipelineRuntimeError`: If the Pipeline contains cycles with unsupported connections that would cause
 990  it to get stuck and fail running.
 991  Or if a Component fails or returns output in an unsupported type.
 992  - `PipelineMaxComponentRuns`: If a Component reaches the maximum number of times it can be run in this Pipeline.
 993  - `PipelineBreakpointException`: When a pipeline_breakpoint is triggered. Contains the component name, state, and partial results.
 994  
 995  **Returns**:
 996  
 997  A dictionary where each entry corresponds to a component name
 998  and its output. If `include_outputs_from` is `None`, this dictionary
 999  will only contain the outputs of leaf components, i.e., components
1000  without outgoing connections.
1001  
1002  <a id="pipeline.Pipeline.__init__"></a>
1003  
1004  #### Pipeline.\_\_init\_\_
1005  
1006  ```python
1007  def __init__(metadata: Optional[dict[str, Any]] = None,
1008               max_runs_per_component: int = 100,
1009               connection_type_validation: bool = True)
1010  ```
1011  
1012  Creates the Pipeline.
1013  
1014  **Arguments**:
1015  
1016  - `metadata`: Arbitrary dictionary to store metadata about this `Pipeline`. Make sure all the values contained in
1017  this dictionary can be serialized and deserialized if you wish to save this `Pipeline` to file.
1018  - `max_runs_per_component`: How many times the `Pipeline` can run the same Component.
1019  If this limit is reached a `PipelineMaxComponentRuns` exception is raised.
1020  If not set defaults to 100 runs per Component.
1021  - `connection_type_validation`: Whether the pipeline will validate the types of the connections.
1022  Defaults to True.
1023  
1024  <a id="pipeline.Pipeline.__eq__"></a>
1025  
1026  #### Pipeline.\_\_eq\_\_
1027  
1028  ```python
1029  def __eq__(other: object) -> bool
1030  ```
1031  
1032  Pipeline equality is defined by their type and the equality of their serialized form.
1033  
1034  Pipelines of the same type share every metadata, node and edge, but they're not required to use
1035  the same node instances: this allows pipeline saved and then loaded back to be equal to themselves.
1036  
1037  <a id="pipeline.Pipeline.__repr__"></a>
1038  
1039  #### Pipeline.\_\_repr\_\_
1040  
1041  ```python
1042  def __repr__() -> str
1043  ```
1044  
1045  Returns a text representation of the Pipeline.
1046  
1047  <a id="pipeline.Pipeline.to_dict"></a>
1048  
1049  #### Pipeline.to\_dict
1050  
1051  ```python
1052  def to_dict() -> dict[str, Any]
1053  ```
1054  
1055  Serializes the pipeline to a dictionary.
1056  
1057  This is meant to be an intermediate representation but it can be also used to save a pipeline to file.
1058  
1059  **Returns**:
1060  
1061  Dictionary with serialized data.
1062  
1063  <a id="pipeline.Pipeline.from_dict"></a>
1064  
1065  #### Pipeline.from\_dict
1066  
1067  ```python
1068  @classmethod
1069  def from_dict(cls: type[T],
1070                data: dict[str, Any],
1071                callbacks: Optional[DeserializationCallbacks] = None,
1072                **kwargs: Any) -> T
1073  ```
1074  
1075  Deserializes the pipeline from a dictionary.
1076  
1077  **Arguments**:
1078  
1079  - `data`: Dictionary to deserialize from.
1080  - `callbacks`: Callbacks to invoke during deserialization.
1081  - `kwargs`: `components`: a dictionary of `{name: instance}` to reuse instances of components instead of creating new
1082  ones.
1083  
1084  **Returns**:
1085  
1086  Deserialized component.
1087  
1088  <a id="pipeline.Pipeline.dumps"></a>
1089  
1090  #### Pipeline.dumps
1091  
1092  ```python
1093  def dumps(marshaller: Marshaller = DEFAULT_MARSHALLER) -> str
1094  ```
1095  
1096  Returns the string representation of this pipeline according to the format dictated by the `Marshaller` in use.
1097  
1098  **Arguments**:
1099  
1100  - `marshaller`: The Marshaller used to create the string representation. Defaults to `YamlMarshaller`.
1101  
1102  **Returns**:
1103  
1104  A string representing the pipeline.
1105  
1106  <a id="pipeline.Pipeline.dump"></a>
1107  
1108  #### Pipeline.dump
1109  
1110  ```python
1111  def dump(fp: TextIO, marshaller: Marshaller = DEFAULT_MARSHALLER) -> None
1112  ```
1113  
1114  Writes the string representation of this pipeline to the file-like object passed in the `fp` argument.
1115  
1116  **Arguments**:
1117  
1118  - `fp`: A file-like object ready to be written to.
1119  - `marshaller`: The Marshaller used to create the string representation. Defaults to `YamlMarshaller`.
1120  
1121  <a id="pipeline.Pipeline.loads"></a>
1122  
1123  #### Pipeline.loads
1124  
1125  ```python
1126  @classmethod
1127  def loads(cls: type[T],
1128            data: Union[str, bytes, bytearray],
1129            marshaller: Marshaller = DEFAULT_MARSHALLER,
1130            callbacks: Optional[DeserializationCallbacks] = None) -> T
1131  ```
1132  
1133  Creates a `Pipeline` object from the string representation passed in the `data` argument.
1134  
1135  **Arguments**:
1136  
1137  - `data`: The string representation of the pipeline, can be `str`, `bytes` or `bytearray`.
1138  - `marshaller`: The Marshaller used to create the string representation. Defaults to `YamlMarshaller`.
1139  - `callbacks`: Callbacks to invoke during deserialization.
1140  
1141  **Raises**:
1142  
1143  - `DeserializationError`: If an error occurs during deserialization.
1144  
1145  **Returns**:
1146  
1147  A `Pipeline` object.
1148  
1149  <a id="pipeline.Pipeline.load"></a>
1150  
1151  #### Pipeline.load
1152  
1153  ```python
1154  @classmethod
1155  def load(cls: type[T],
1156           fp: TextIO,
1157           marshaller: Marshaller = DEFAULT_MARSHALLER,
1158           callbacks: Optional[DeserializationCallbacks] = None) -> T
1159  ```
1160  
1161  Creates a `Pipeline` object a string representation.
1162  
1163  The string representation is read from the file-like object passed in the `fp` argument.
1164  
1165  **Arguments**:
1166  
1167  - `fp`: A file-like object ready to be read from.
1168  - `marshaller`: The Marshaller used to create the string representation. Defaults to `YamlMarshaller`.
1169  - `callbacks`: Callbacks to invoke during deserialization.
1170  
1171  **Raises**:
1172  
1173  - `DeserializationError`: If an error occurs during deserialization.
1174  
1175  **Returns**:
1176  
1177  A `Pipeline` object.
1178  
1179  <a id="pipeline.Pipeline.add_component"></a>
1180  
1181  #### Pipeline.add\_component
1182  
1183  ```python
1184  def add_component(name: str, instance: Component) -> None
1185  ```
1186  
1187  Add the given component to the pipeline.
1188  
1189  Components are not connected to anything by default: use `Pipeline.connect()` to connect components together.
1190  Component names must be unique, but component instances can be reused if needed.
1191  
1192  **Arguments**:
1193  
1194  - `name`: The name of the component to add.
1195  - `instance`: The component instance to add.
1196  
1197  **Raises**:
1198  
1199  - `ValueError`: If a component with the same name already exists.
1200  - `PipelineValidationError`: If the given instance is not a component.
1201  
1202  <a id="pipeline.Pipeline.remove_component"></a>
1203  
1204  #### Pipeline.remove\_component
1205  
1206  ```python
1207  def remove_component(name: str) -> Component
1208  ```
1209  
1210  Remove and returns component from the pipeline.
1211  
1212  Remove an existing component from the pipeline by providing its name.
1213  All edges that connect to the component will also be deleted.
1214  
1215  **Arguments**:
1216  
1217  - `name`: The name of the component to remove.
1218  
1219  **Raises**:
1220  
1221  - `ValueError`: If there is no component with that name already in the Pipeline.
1222  
1223  **Returns**:
1224  
1225  The removed Component instance.
1226  
1227  <a id="pipeline.Pipeline.connect"></a>
1228  
1229  #### Pipeline.connect
1230  
1231  ```python
1232  def connect(sender: str, receiver: str) -> "PipelineBase"
1233  ```
1234  
1235  Connects two components together.
1236  
1237  All components to connect must exist in the pipeline.
1238  If connecting to a component that has several output connections, specify the inputs and output names as
1239  'component_name.connections_name'.
1240  
1241  **Arguments**:
1242  
1243  - `sender`: The component that delivers the value. This can be either just a component name or can be
1244  in the format `component_name.connection_name` if the component has multiple outputs.
1245  - `receiver`: The component that receives the value. This can be either just a component name or can be
1246  in the format `component_name.connection_name` if the component has multiple inputs.
1247  
1248  **Raises**:
1249  
1250  - `PipelineConnectError`: If the two components cannot be connected (for example if one of the components is
1251  not present in the pipeline, or the connections don't match by type, and so on).
1252  
1253  **Returns**:
1254  
1255  The Pipeline instance.
1256  
1257  <a id="pipeline.Pipeline.get_component"></a>
1258  
1259  #### Pipeline.get\_component
1260  
1261  ```python
1262  def get_component(name: str) -> Component
1263  ```
1264  
1265  Get the component with the specified name from the pipeline.
1266  
1267  **Arguments**:
1268  
1269  - `name`: The name of the component.
1270  
1271  **Raises**:
1272  
1273  - `ValueError`: If a component with that name is not present in the pipeline.
1274  
1275  **Returns**:
1276  
1277  The instance of that component.
1278  
1279  <a id="pipeline.Pipeline.get_component_name"></a>
1280  
1281  #### Pipeline.get\_component\_name
1282  
1283  ```python
1284  def get_component_name(instance: Component) -> str
1285  ```
1286  
1287  Returns the name of the Component instance if it has been added to this Pipeline or an empty string otherwise.
1288  
1289  **Arguments**:
1290  
1291  - `instance`: The Component instance to look for.
1292  
1293  **Returns**:
1294  
1295  The name of the Component instance.
1296  
1297  <a id="pipeline.Pipeline.inputs"></a>
1298  
1299  #### Pipeline.inputs
1300  
1301  ```python
1302  def inputs(
1303      include_components_with_connected_inputs: bool = False
1304  ) -> dict[str, dict[str, Any]]
1305  ```
1306  
1307  Returns a dictionary containing the inputs of a pipeline.
1308  
1309  Each key in the dictionary corresponds to a component name, and its value is another dictionary that describes
1310  the input sockets of that component, including their types and whether they are optional.
1311  
1312  **Arguments**:
1313  
1314  - `include_components_with_connected_inputs`: If `False`, only components that have disconnected input edges are
1315  included in the output.
1316  
1317  **Returns**:
1318  
1319  A dictionary where each key is a pipeline component name and each value is a dictionary of
1320  inputs sockets of that component.
1321  
1322  <a id="pipeline.Pipeline.outputs"></a>
1323  
1324  #### Pipeline.outputs
1325  
1326  ```python
1327  def outputs(
1328      include_components_with_connected_outputs: bool = False
1329  ) -> dict[str, dict[str, Any]]
1330  ```
1331  
1332  Returns a dictionary containing the outputs of a pipeline.
1333  
1334  Each key in the dictionary corresponds to a component name, and its value is another dictionary that describes
1335  the output sockets of that component.
1336  
1337  **Arguments**:
1338  
1339  - `include_components_with_connected_outputs`: If `False`, only components that have disconnected output edges are
1340  included in the output.
1341  
1342  **Returns**:
1343  
1344  A dictionary where each key is a pipeline component name and each value is a dictionary of
1345  output sockets of that component.
1346  
1347  <a id="pipeline.Pipeline.show"></a>
1348  
1349  #### Pipeline.show
1350  
1351  ```python
1352  def show(*,
1353           server_url: str = "https://mermaid.ink",
1354           params: Optional[dict] = None,
1355           timeout: int = 30,
1356           super_component_expansion: bool = False) -> None
1357  ```
1358  
1359  Display an image representing this `Pipeline` in a Jupyter notebook.
1360  
1361  This function generates a diagram of the `Pipeline` using a Mermaid server and displays it directly in
1362  the notebook.
1363  
1364  **Arguments**:
1365  
1366  - `server_url`: The base URL of the Mermaid server used for rendering (default: 'https://mermaid.ink').
1367  See https://github.com/jihchi/mermaid.ink and https://github.com/mermaid-js/mermaid-live-editor for more
1368  info on how to set up your own Mermaid server.
1369  - `params`: Dictionary of customization parameters to modify the output. Refer to Mermaid documentation for more details
1370  Supported keys:
1371  - format: Output format ('img', 'svg', or 'pdf'). Default: 'img'.
1372  - type: Image type for /img endpoint ('jpeg', 'png', 'webp'). Default: 'png'.
1373  - theme: Mermaid theme ('default', 'neutral', 'dark', 'forest'). Default: 'neutral'.
1374  - bgColor: Background color in hexadecimal (e.g., 'FFFFFF') or named format (e.g., '!white').
1375  - width: Width of the output image (integer).
1376  - height: Height of the output image (integer).
1377  - scale: Scaling factor (1–3). Only applicable if 'width' or 'height' is specified.
1378  - fit: Whether to fit the diagram size to the page (PDF only, boolean).
1379  - paper: Paper size for PDFs (e.g., 'a4', 'a3'). Ignored if 'fit' is true.
1380  - landscape: Landscape orientation for PDFs (boolean). Ignored if 'fit' is true.
1381  - `timeout`: Timeout in seconds for the request to the Mermaid server.
1382  - `super_component_expansion`: If set to True and the pipeline contains SuperComponents the diagram will show the internal structure of
1383  super-components as if they were components part of the pipeline instead of a "black-box".
1384  Otherwise, only the super-component itself will be displayed.
1385  
1386  **Raises**:
1387  
1388  - `PipelineDrawingError`: If the function is called outside of a Jupyter notebook or if there is an issue with rendering.
1389  
1390  <a id="pipeline.Pipeline.draw"></a>
1391  
1392  #### Pipeline.draw
1393  
1394  ```python
1395  def draw(*,
1396           path: Path,
1397           server_url: str = "https://mermaid.ink",
1398           params: Optional[dict] = None,
1399           timeout: int = 30,
1400           super_component_expansion: bool = False) -> None
1401  ```
1402  
1403  Save an image representing this `Pipeline` to the specified file path.
1404  
1405  This function generates a diagram of the `Pipeline` using the Mermaid server and saves it to the provided path.
1406  
1407  **Arguments**:
1408  
1409  - `path`: The file path where the generated image will be saved.
1410  - `server_url`: The base URL of the Mermaid server used for rendering (default: 'https://mermaid.ink').
1411  See https://github.com/jihchi/mermaid.ink and https://github.com/mermaid-js/mermaid-live-editor for more
1412  info on how to set up your own Mermaid server.
1413  - `params`: Dictionary of customization parameters to modify the output. Refer to Mermaid documentation for more details
1414  Supported keys:
1415  - format: Output format ('img', 'svg', or 'pdf'). Default: 'img'.
1416  - type: Image type for /img endpoint ('jpeg', 'png', 'webp'). Default: 'png'.
1417  - theme: Mermaid theme ('default', 'neutral', 'dark', 'forest'). Default: 'neutral'.
1418  - bgColor: Background color in hexadecimal (e.g., 'FFFFFF') or named format (e.g., '!white').
1419  - width: Width of the output image (integer).
1420  - height: Height of the output image (integer).
1421  - scale: Scaling factor (1–3). Only applicable if 'width' or 'height' is specified.
1422  - fit: Whether to fit the diagram size to the page (PDF only, boolean).
1423  - paper: Paper size for PDFs (e.g., 'a4', 'a3'). Ignored if 'fit' is true.
1424  - landscape: Landscape orientation for PDFs (boolean). Ignored if 'fit' is true.
1425  - `timeout`: Timeout in seconds for the request to the Mermaid server.
1426  - `super_component_expansion`: If set to True and the pipeline contains SuperComponents the diagram will show the internal structure of
1427  super-components as if they were components part of the pipeline instead of a "black-box".
1428  Otherwise, only the super-component itself will be displayed.
1429  
1430  **Raises**:
1431  
1432  - `PipelineDrawingError`: If there is an issue with rendering or saving the image.
1433  
1434  <a id="pipeline.Pipeline.walk"></a>
1435  
1436  #### Pipeline.walk
1437  
1438  ```python
1439  def walk() -> Iterator[tuple[str, Component]]
1440  ```
1441  
1442  Visits each component in the pipeline exactly once and yields its name and instance.
1443  
1444  No guarantees are provided on the visiting order.
1445  
1446  **Returns**:
1447  
1448  An iterator of tuples of component name and component instance.
1449  
1450  <a id="pipeline.Pipeline.warm_up"></a>
1451  
1452  #### Pipeline.warm\_up
1453  
1454  ```python
1455  def warm_up() -> None
1456  ```
1457  
1458  Make sure all nodes are warm.
1459  
1460  It's the node's responsibility to make sure this method can be called at every `Pipeline.run()`
1461  without re-initializing everything.
1462  
1463  <a id="pipeline.Pipeline.validate_input"></a>
1464  
1465  #### Pipeline.validate\_input
1466  
1467  ```python
1468  def validate_input(data: dict[str, Any]) -> None
1469  ```
1470  
1471  Validates pipeline input data.
1472  
1473  Validates that data:
1474  * Each Component name actually exists in the Pipeline
1475  * Each Component is not missing any input
1476  * Each Component has only one input per input socket, if not variadic
1477  * Each Component doesn't receive inputs that are already sent by another Component
1478  
1479  **Arguments**:
1480  
1481  - `data`: A dictionary of inputs for the pipeline's components. Each key is a component name.
1482  
1483  **Raises**:
1484  
1485  - `ValueError`: If inputs are invalid according to the above.
1486  
1487  <a id="pipeline.Pipeline.from_template"></a>
1488  
1489  #### Pipeline.from\_template
1490  
1491  ```python
1492  @classmethod
1493  def from_template(
1494          cls,
1495          predefined_pipeline: PredefinedPipeline,
1496          template_params: Optional[dict[str, Any]] = None) -> "PipelineBase"
1497  ```
1498  
1499  Create a Pipeline from a predefined template. See `PredefinedPipeline` for available options.
1500  
1501  **Arguments**:
1502  
1503  - `predefined_pipeline`: The predefined pipeline to use.
1504  - `template_params`: An optional dictionary of parameters to use when rendering the pipeline template.
1505  
1506  **Returns**:
1507  
1508  An instance of `Pipeline`.
1509  
1510  <a id="pipeline.Pipeline.validate_pipeline"></a>
1511  
1512  #### Pipeline.validate\_pipeline
1513  
1514  ```python
1515  @staticmethod
1516  def validate_pipeline(priority_queue: FIFOPriorityQueue) -> None
1517  ```
1518  
1519  Validate the pipeline to check if it is blocked or has no valid entry point.
1520  
1521  **Arguments**:
1522  
1523  - `priority_queue`: Priority queue of component names.
1524  
1525  **Raises**:
1526  
1527  - `PipelineRuntimeError`: If the pipeline is blocked or has no valid entry point.
1528