pipeline_api.md
   1  ---
   2  title: "Pipeline"
   3  id: pipeline-api
   4  description: "Arranges components and integrations in flow."
   5  slug: "/pipeline-api"
   6  ---
   7  
   8  <a id="async_pipeline"></a>
   9  
  10  ## Module async\_pipeline
  11  
  12  <a id="async_pipeline.AsyncPipeline"></a>
  13  
  14  ### AsyncPipeline
  15  
  16  Asynchronous version of the Pipeline orchestration engine.
  17  
  18  Manages components in a pipeline allowing for concurrent processing when the pipeline's execution graph permits.
  19  This enables efficient processing of components by minimizing idle time and maximizing resource utilization.
  20  
  21  <a id="async_pipeline.AsyncPipeline.run_async_generator"></a>
  22  
  23  #### AsyncPipeline.run\_async\_generator
  24  
  25  ```python
  26  async def run_async_generator(
  27          data: dict[str, Any],
  28          include_outputs_from: set[str] | None = None,
  29          concurrency_limit: int = 4) -> AsyncIterator[dict[str, Any]]
  30  ```
  31  
  32  Executes the pipeline step by step asynchronously, yielding partial outputs when any component finishes.
  33  
  34  Usage:
  35  ```python
  36  from haystack import Document
  37  from haystack.components.builders import ChatPromptBuilder
  38  from haystack.dataclasses import ChatMessage
  39  from haystack.utils import Secret
  40  from haystack.document_stores.in_memory import InMemoryDocumentStore
  41  from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
  42  from haystack.components.generators.chat import OpenAIChatGenerator
  43  from haystack.components.builders.prompt_builder import PromptBuilder
  44  from haystack import AsyncPipeline
  45  import asyncio
  46  
  47  # Write documents to InMemoryDocumentStore
  48  document_store = InMemoryDocumentStore()
  49  document_store.write_documents([
  50      Document(content="My name is Jean and I live in Paris."),
  51      Document(content="My name is Mark and I live in Berlin."),
  52      Document(content="My name is Giorgio and I live in Rome.")
  53  ])
  54  
  55  prompt_template = [
  56      ChatMessage.from_user(
  57          '''
  58          Given these documents, answer the question.
  59          Documents:
  60          {% for doc in documents %}
  61              {{ doc.content }}
  62          {% endfor %}
  63          Question: {{question}}
  64          Answer:
  65          ''')
  66  ]
  67  
  68  # Create and connect pipeline components
  69  retriever = InMemoryBM25Retriever(document_store=document_store)
  70  prompt_builder = ChatPromptBuilder(template=prompt_template)
  71  llm = OpenAIChatGenerator()
  72  
  73  rag_pipeline = AsyncPipeline()
  74  rag_pipeline.add_component("retriever", retriever)
  75  rag_pipeline.add_component("prompt_builder", prompt_builder)
  76  rag_pipeline.add_component("llm", llm)
  77  rag_pipeline.connect("retriever", "prompt_builder.documents")
  78  rag_pipeline.connect("prompt_builder", "llm")
  79  
  80  # Prepare input data
  81  question = "Who lives in Paris?"
  82  data = {
  83      "retriever": {"query": question},
  84      "prompt_builder": {"question": question},
  85  }
  86  
  87  
  88  # Process results as they become available
  89  async def process_results():
  90      async for partial_output in rag_pipeline.run_async_generator(
  91              data=data,
  92              include_outputs_from={"retriever", "llm"}
  93      ):
  94          # Each partial_output contains the results from a completed component
  95          if "retriever" in partial_output:
  96              print("Retrieved documents:", len(partial_output["retriever"]["documents"]))
  97          if "llm" in partial_output:
  98              print("Generated answer:", partial_output["llm"]["replies"][0])
  99  
 100  
 101  asyncio.run(process_results())
 102  ```
 103  
 104  **Arguments**:
 105  
 106  - `data`: Initial input data to the pipeline.
 107  - `concurrency_limit`: The maximum number of components that are allowed to run concurrently.
 108  - `include_outputs_from`: Set of component names whose individual outputs are to be
 109  included in the pipeline's output. For components that are
 110  invoked multiple times (in a loop), only the last-produced
 111  output is included.
 112  
 113  **Raises**:
 114  
 115  - `ValueError`: If invalid inputs are provided to the pipeline.
 116  - `PipelineMaxComponentRuns`: If a component exceeds the maximum number of allowed executions within the pipeline.
 117  - `PipelineRuntimeError`: If the Pipeline contains cycles with unsupported connections that would cause
 118  it to get stuck and fail running.
 119  Or if a Component fails or returns output in an unsupported type.
 120  
 121  **Returns**:
 122  
 123  An async iterator containing partial (and final) outputs.
 124  
 125  <a id="async_pipeline.AsyncPipeline.run_async"></a>
 126  
 127  #### AsyncPipeline.run\_async
 128  
 129  ```python
 130  async def run_async(data: dict[str, Any],
 131                      include_outputs_from: set[str] | None = None,
 132                      concurrency_limit: int = 4) -> dict[str, Any]
 133  ```
 134  
 135  Provides an asynchronous interface to run the pipeline with provided input data.
 136  
 137  This method allows the pipeline to be integrated into an asynchronous workflow, enabling non-blocking
 138  execution of pipeline components.
 139  
 140  Usage:
 141  ```python
 142  import asyncio
 143  
 144  from haystack import Document
 145  from haystack.components.builders import ChatPromptBuilder
 146  from haystack.components.generators.chat import OpenAIChatGenerator
 147  from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
 148  from haystack.core.pipeline import AsyncPipeline
 149  from haystack.dataclasses import ChatMessage
 150  from haystack.document_stores.in_memory import InMemoryDocumentStore
 151  
 152  # Write documents to InMemoryDocumentStore
 153  document_store = InMemoryDocumentStore()
 154  document_store.write_documents([
 155      Document(content="My name is Jean and I live in Paris."),
 156      Document(content="My name is Mark and I live in Berlin."),
 157      Document(content="My name is Giorgio and I live in Rome.")
 158  ])
 159  
 160  prompt_template = [
 161      ChatMessage.from_user(
 162          '''
 163          Given these documents, answer the question.
 164          Documents:
 165          {% for doc in documents %}
 166              {{ doc.content }}
 167          {% endfor %}
 168          Question: {{question}}
 169          Answer:
 170          ''')
 171  ]
 172  
 173  retriever = InMemoryBM25Retriever(document_store=document_store)
 174  prompt_builder = ChatPromptBuilder(template=prompt_template)
 175  llm = OpenAIChatGenerator()
 176  
 177  rag_pipeline = AsyncPipeline()
 178  rag_pipeline.add_component("retriever", retriever)
 179  rag_pipeline.add_component("prompt_builder", prompt_builder)
 180  rag_pipeline.add_component("llm", llm)
 181  rag_pipeline.connect("retriever", "prompt_builder.documents")
 182  rag_pipeline.connect("prompt_builder", "llm")
 183  
 184  # Ask a question
 185  question = "Who lives in Paris?"
 186  
 187  async def run_inner(data, include_outputs_from):
 188      return await rag_pipeline.run_async(data=data, include_outputs_from=include_outputs_from)
 189  
 190  data = {
 191      "retriever": {"query": question},
 192      "prompt_builder": {"question": question},
 193  }
 194  
 195  results = asyncio.run(run_inner(data, include_outputs_from={"retriever", "llm"}))
 196  
 197  print(results["llm"]["replies"])
 198  # [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>, _content=[TextContent(text='Jean lives in Paris.')],
 199  # _name=None, _meta={'model': 'gpt-5-mini', 'index': 0, 'finish_reason': 'stop', 'usage':
 200  # {'completion_tokens': 6, 'prompt_tokens': 69, 'total_tokens': 75,
 201  # 'completion_tokens_details': CompletionTokensDetails(accepted_prediction_tokens=0,
 202  # audio_tokens=0, reasoning_tokens=0, rejected_prediction_tokens=0), 'prompt_tokens_details':
 203  # PromptTokensDetails(audio_tokens=0, cached_tokens=0)}})]
 204  ```
 205  
 206  **Arguments**:
 207  
 208  - `data`: A dictionary of inputs for the pipeline's components. Each key is a component name
 209  and its value is a dictionary of that component's input parameters:
 210  ```
 211  data = {
 212      "comp1": {"input1": 1, "input2": 2},
 213  }
 214  ```
 215  For convenience, this format is also supported when input names are unique:
 216  ```
 217  data = {
 218      "input1": 1, "input2": 2,
 219  }
 220  ```
 221  - `include_outputs_from`: Set of component names whose individual outputs are to be
 222  included in the pipeline's output. For components that are
 223  invoked multiple times (in a loop), only the last-produced
 224  output is included.
 225  - `concurrency_limit`: The maximum number of components that should be allowed to run concurrently.
 226  
 227  **Raises**:
 228  
 229  - `ValueError`: If invalid inputs are provided to the pipeline.
 230  - `PipelineRuntimeError`: If the Pipeline contains cycles with unsupported connections that would cause
 231  it to get stuck and fail running.
 232  Or if a Component fails or returns output in an unsupported type.
 233  - `PipelineMaxComponentRuns`: If a Component reaches the maximum number of times it can be run in this Pipeline.
 234  
 235  **Returns**:
 236  
 237  A dictionary where each entry corresponds to a component name
 238  and its output. If `include_outputs_from` is `None`, this dictionary
 239  will only contain the outputs of leaf components, i.e., components
 240  without outgoing connections.
 241  
 242  <a id="async_pipeline.AsyncPipeline.run"></a>
 243  
 244  #### AsyncPipeline.run
 245  
 246  ```python
 247  def run(data: dict[str, Any],
 248          include_outputs_from: set[str] | None = None,
 249          concurrency_limit: int = 4) -> dict[str, Any]
 250  ```
 251  
 252  Provides a synchronous interface to run the pipeline with given input data.
 253  
 254  Internally, the pipeline components are executed asynchronously, but the method itself
 255  will block until the entire pipeline execution is complete.
 256  
 257  In case you need asynchronous methods, consider using `run_async` or `run_async_generator`.
 258  
 259  Usage:
 260  ```python
 261  from haystack import Document
 262  from haystack.components.builders import ChatPromptBuilder
 263  from haystack.components.generators.chat import OpenAIChatGenerator
 264  from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
 265  from haystack.core.pipeline import AsyncPipeline
 266  from haystack.dataclasses import ChatMessage
 267  from haystack.document_stores.in_memory import InMemoryDocumentStore
 268  
 269  # Write documents to InMemoryDocumentStore
 270  document_store = InMemoryDocumentStore()
 271  document_store.write_documents([
 272      Document(content="My name is Jean and I live in Paris."),
 273      Document(content="My name is Mark and I live in Berlin."),
 274      Document(content="My name is Giorgio and I live in Rome.")
 275  ])
 276  
 277  prompt_template = [
 278      ChatMessage.from_user(
 279          '''
 280          Given these documents, answer the question.
 281          Documents:
 282          {% for doc in documents %}
 283              {{ doc.content }}
 284          {% endfor %}
 285          Question: {{question}}
 286          Answer:
 287          ''')
 288  ]
 289  
 290  
 291  retriever = InMemoryBM25Retriever(document_store=document_store)
 292  prompt_builder = ChatPromptBuilder(template=prompt_template)
 293  llm = OpenAIChatGenerator()
 294  
 295  rag_pipeline = AsyncPipeline()
 296  rag_pipeline.add_component("retriever", retriever)
 297  rag_pipeline.add_component("prompt_builder", prompt_builder)
 298  rag_pipeline.add_component("llm", llm)
 299  rag_pipeline.connect("retriever", "prompt_builder.documents")
 300  rag_pipeline.connect("prompt_builder", "llm")
 301  
 302  # Ask a question
 303  question = "Who lives in Paris?"
 304  
 305  data = {
 306      "retriever": {"query": question},
 307      "prompt_builder": {"question": question},
 308  }
 309  
 310  results = rag_pipeline.run(data)
 311  
 312  print(results["llm"]["replies"])
 313  # [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>, _content=[TextContent(text='Jean lives in Paris.')],
 314  # _name=None, _meta={'model': 'gpt-5-mini', 'index': 0, 'finish_reason': 'stop', 'usage':
 315  # {'completion_tokens': 6, 'prompt_tokens': 69, 'total_tokens': 75, 'completion_tokens_details':
 316  # CompletionTokensDetails(accepted_prediction_tokens=0, audio_tokens=0, reasoning_tokens=0,
 317  # rejected_prediction_tokens=0), 'prompt_tokens_details': PromptTokensDetails(audio_tokens=0,
 318  # cached_tokens=0)}})]
 319  ```
 320  
 321  **Arguments**:
 322  
 323  - `data`: A dictionary of inputs for the pipeline's components. Each key is a component name
 324  and its value is a dictionary of that component's input parameters:
 325  ```
 326  data = {
 327      "comp1": {"input1": 1, "input2": 2},
 328  }
 329  ```
 330  For convenience, this format is also supported when input names are unique:
 331  ```
 332  data = {
 333      "input1": 1, "input2": 2,
 334  }
 335  ```
 336  - `include_outputs_from`: Set of component names whose individual outputs are to be
 337  included in the pipeline's output. For components that are
 338  invoked multiple times (in a loop), only the last-produced
 339  output is included.
 340  - `concurrency_limit`: The maximum number of components that should be allowed to run concurrently.
 341  
 342  **Raises**:
 343  
 344  - `ValueError`: If invalid inputs are provided to the pipeline.
 345  - `PipelineRuntimeError`: If the Pipeline contains cycles with unsupported connections that would cause
 346  it to get stuck and fail running.
 347  Or if a Component fails or returns output in an unsupported type.
 348  - `PipelineMaxComponentRuns`: If a Component reaches the maximum number of times it can be run in this Pipeline.
 349  - `RuntimeError`: If called from within an async context. Use `run_async` instead.
 350  
 351  **Returns**:
 352  
 353  A dictionary where each entry corresponds to a component name
 354  and its output. If `include_outputs_from` is `None`, this dictionary
 355  will only contain the outputs of leaf components, i.e., components
 356  without outgoing connections.
 357  
 358  <a id="async_pipeline.AsyncPipeline.__init__"></a>
 359  
 360  #### AsyncPipeline.\_\_init\_\_
 361  
 362  ```python
 363  def __init__(metadata: dict[str, Any] | None = None,
 364               max_runs_per_component: int = 100,
 365               connection_type_validation: bool = True)
 366  ```
 367  
 368  Creates the Pipeline.
 369  
 370  **Arguments**:
 371  
 372  - `metadata`: Arbitrary dictionary to store metadata about this `Pipeline`. Make sure all the values contained in
 373  this dictionary can be serialized and deserialized if you wish to save this `Pipeline` to file.
 374  - `max_runs_per_component`: How many times the `Pipeline` can run the same Component.
 375  If this limit is reached a `PipelineMaxComponentRuns` exception is raised.
 376  If not set defaults to 100 runs per Component.
 377  - `connection_type_validation`: Whether the pipeline will validate the types of the connections.
 378  Defaults to True.
 379  
 380  <a id="async_pipeline.AsyncPipeline.__eq__"></a>
 381  
 382  #### AsyncPipeline.\_\_eq\_\_
 383  
 384  ```python
 385  def __eq__(other: object) -> bool
 386  ```
 387  
 388  Pipeline equality is defined by their type and the equality of their serialized form.
 389  
 390  Pipelines of the same type share every metadata, node and edge, but they're not required to use
 391  the same node instances: this allows pipeline saved and then loaded back to be equal to themselves.
 392  
 393  <a id="async_pipeline.AsyncPipeline.__repr__"></a>
 394  
 395  #### AsyncPipeline.\_\_repr\_\_
 396  
 397  ```python
 398  def __repr__() -> str
 399  ```
 400  
 401  Returns a text representation of the Pipeline.
 402  
 403  <a id="async_pipeline.AsyncPipeline.to_dict"></a>
 404  
 405  #### AsyncPipeline.to\_dict
 406  
 407  ```python
 408  def to_dict() -> dict[str, Any]
 409  ```
 410  
 411  Serializes the pipeline to a dictionary.
 412  
 413  This is meant to be an intermediate representation but it can be also used to save a pipeline to file.
 414  
 415  **Returns**:
 416  
 417  Dictionary with serialized data.
 418  
 419  <a id="async_pipeline.AsyncPipeline.from_dict"></a>
 420  
 421  #### AsyncPipeline.from\_dict
 422  
 423  ```python
 424  @classmethod
 425  def from_dict(cls: type[T],
 426                data: dict[str, Any],
 427                callbacks: DeserializationCallbacks | None = None,
 428                **kwargs: Any) -> T
 429  ```
 430  
 431  Deserializes the pipeline from a dictionary.
 432  
 433  **Arguments**:
 434  
 435  - `data`: Dictionary to deserialize from.
 436  - `callbacks`: Callbacks to invoke during deserialization.
 437  - `kwargs`: `components`: a dictionary of `{name: instance}` to reuse instances of components instead of creating new
 438  ones.
 439  
 440  **Returns**:
 441  
 442  Deserialized component.
 443  
 444  <a id="async_pipeline.AsyncPipeline.dumps"></a>
 445  
 446  #### AsyncPipeline.dumps
 447  
 448  ```python
 449  def dumps(marshaller: Marshaller = DEFAULT_MARSHALLER) -> str
 450  ```
 451  
 452  Returns the string representation of this pipeline according to the format dictated by the `Marshaller` in use.
 453  
 454  **Arguments**:
 455  
 456  - `marshaller`: The Marshaller used to create the string representation. Defaults to `YamlMarshaller`.
 457  
 458  **Returns**:
 459  
 460  A string representing the pipeline.
 461  
 462  <a id="async_pipeline.AsyncPipeline.dump"></a>
 463  
 464  #### AsyncPipeline.dump
 465  
 466  ```python
 467  def dump(fp: TextIO, marshaller: Marshaller = DEFAULT_MARSHALLER) -> None
 468  ```
 469  
 470  Writes the string representation of this pipeline to the file-like object passed in the `fp` argument.
 471  
 472  **Arguments**:
 473  
 474  - `fp`: A file-like object ready to be written to.
 475  - `marshaller`: The Marshaller used to create the string representation. Defaults to `YamlMarshaller`.
 476  
 477  <a id="async_pipeline.AsyncPipeline.loads"></a>
 478  
 479  #### AsyncPipeline.loads
 480  
 481  ```python
 482  @classmethod
 483  def loads(cls: type[T],
 484            data: str | bytes | bytearray,
 485            marshaller: Marshaller = DEFAULT_MARSHALLER,
 486            callbacks: DeserializationCallbacks | None = None) -> T
 487  ```
 488  
 489  Creates a `Pipeline` object from the string representation passed in the `data` argument.
 490  
 491  **Arguments**:
 492  
 493  - `data`: The string representation of the pipeline, can be `str`, `bytes` or `bytearray`.
 494  - `marshaller`: The Marshaller used to create the string representation. Defaults to `YamlMarshaller`.
 495  - `callbacks`: Callbacks to invoke during deserialization.
 496  
 497  **Raises**:
 498  
 499  - `DeserializationError`: If an error occurs during deserialization.
 500  
 501  **Returns**:
 502  
 503  A `Pipeline` object.
 504  
 505  <a id="async_pipeline.AsyncPipeline.load"></a>
 506  
 507  #### AsyncPipeline.load
 508  
 509  ```python
 510  @classmethod
 511  def load(cls: type[T],
 512           fp: TextIO,
 513           marshaller: Marshaller = DEFAULT_MARSHALLER,
 514           callbacks: DeserializationCallbacks | None = None) -> T
 515  ```
 516  
 517  Creates a `Pipeline` object a string representation.
 518  
 519  The string representation is read from the file-like object passed in the `fp` argument.
 520  
 521  **Arguments**:
 522  
 523  - `fp`: A file-like object ready to be read from.
 524  - `marshaller`: The Marshaller used to create the string representation. Defaults to `YamlMarshaller`.
 525  - `callbacks`: Callbacks to invoke during deserialization.
 526  
 527  **Raises**:
 528  
 529  - `DeserializationError`: If an error occurs during deserialization.
 530  
 531  **Returns**:
 532  
 533  A `Pipeline` object.
 534  
 535  <a id="async_pipeline.AsyncPipeline.add_component"></a>
 536  
 537  #### AsyncPipeline.add\_component
 538  
 539  ```python
 540  def add_component(name: str, instance: Component) -> None
 541  ```
 542  
 543  Add the given component to the pipeline.
 544  
 545  Components are not connected to anything by default: use `Pipeline.connect()` to connect components together.
 546  Component names must be unique, but component instances can be reused if needed.
 547  
 548  **Arguments**:
 549  
 550  - `name`: The name of the component to add.
 551  - `instance`: The component instance to add.
 552  
 553  **Raises**:
 554  
 555  - `ValueError`: If a component with the same name already exists.
 556  - `PipelineValidationError`: If the given instance is not a component.
 557  
 558  <a id="async_pipeline.AsyncPipeline.remove_component"></a>
 559  
 560  #### AsyncPipeline.remove\_component
 561  
 562  ```python
 563  def remove_component(name: str) -> Component
 564  ```
 565  
 566  Remove and returns component from the pipeline.
 567  
 568  Remove an existing component from the pipeline by providing its name.
 569  All edges that connect to the component will also be deleted.
 570  
 571  **Arguments**:
 572  
 573  - `name`: The name of the component to remove.
 574  
 575  **Raises**:
 576  
 577  - `ValueError`: If there is no component with that name already in the Pipeline.
 578  
 579  **Returns**:
 580  
 581  The removed Component instance.
 582  
 583  <a id="async_pipeline.AsyncPipeline.connect"></a>
 584  
 585  #### AsyncPipeline.connect
 586  
 587  ```python
 588  def connect(sender: str, receiver: str) -> "PipelineBase"
 589  ```
 590  
 591  Connects two components together.
 592  
 593  All components to connect must exist in the pipeline.
 594  If connecting to a component that has several output connections, specify the inputs and output names as
 595  'component_name.connections_name'.
 596  
 597  **Arguments**:
 598  
 599  - `sender`: The component that delivers the value. This can be either just a component name or can be
 600  in the format `component_name.connection_name` if the component has multiple outputs.
 601  - `receiver`: The component that receives the value. This can be either just a component name or can be
 602  in the format `component_name.connection_name` if the component has multiple inputs.
 603  
 604  **Raises**:
 605  
 606  - `PipelineConnectError`: If the two components cannot be connected (for example if one of the components is
 607  not present in the pipeline, or the connections don't match by type, and so on).
 608  
 609  **Returns**:
 610  
 611  The Pipeline instance.
 612  
 613  <a id="async_pipeline.AsyncPipeline.get_component"></a>
 614  
 615  #### AsyncPipeline.get\_component
 616  
 617  ```python
 618  def get_component(name: str) -> Component
 619  ```
 620  
 621  Get the component with the specified name from the pipeline.
 622  
 623  **Arguments**:
 624  
 625  - `name`: The name of the component.
 626  
 627  **Raises**:
 628  
 629  - `ValueError`: If a component with that name is not present in the pipeline.
 630  
 631  **Returns**:
 632  
 633  The instance of that component.
 634  
 635  <a id="async_pipeline.AsyncPipeline.get_component_name"></a>
 636  
 637  #### AsyncPipeline.get\_component\_name
 638  
 639  ```python
 640  def get_component_name(instance: Component) -> str
 641  ```
 642  
 643  Returns the name of the Component instance if it has been added to this Pipeline or an empty string otherwise.
 644  
 645  **Arguments**:
 646  
 647  - `instance`: The Component instance to look for.
 648  
 649  **Returns**:
 650  
 651  The name of the Component instance.
 652  
 653  <a id="async_pipeline.AsyncPipeline.inputs"></a>
 654  
 655  #### AsyncPipeline.inputs
 656  
 657  ```python
 658  def inputs(
 659      include_components_with_connected_inputs: bool = False
 660  ) -> dict[str, dict[str, Any]]
 661  ```
 662  
 663  Returns a dictionary containing the inputs of a pipeline.
 664  
 665  Each key in the dictionary corresponds to a component name, and its value is another dictionary that describes
 666  the input sockets of that component, including their types and whether they are optional.
 667  
 668  **Arguments**:
 669  
 670  - `include_components_with_connected_inputs`: If `False`, only components that have disconnected input edges are
 671  included in the output.
 672  
 673  **Returns**:
 674  
 675  A dictionary where each key is a pipeline component name and each value is a dictionary of
 676  inputs sockets of that component.
 677  
 678  <a id="async_pipeline.AsyncPipeline.outputs"></a>
 679  
 680  #### AsyncPipeline.outputs
 681  
 682  ```python
 683  def outputs(
 684      include_components_with_connected_outputs: bool = False
 685  ) -> dict[str, dict[str, Any]]
 686  ```
 687  
 688  Returns a dictionary containing the outputs of a pipeline.
 689  
 690  Each key in the dictionary corresponds to a component name, and its value is another dictionary that describes
 691  the output sockets of that component.
 692  
 693  **Arguments**:
 694  
 695  - `include_components_with_connected_outputs`: If `False`, only components that have disconnected output edges are
 696  included in the output.
 697  
 698  **Returns**:
 699  
 700  A dictionary where each key is a pipeline component name and each value is a dictionary of
 701  output sockets of that component.
 702  
 703  <a id="async_pipeline.AsyncPipeline.show"></a>
 704  
 705  #### AsyncPipeline.show
 706  
 707  ```python
 708  def show(*,
 709           server_url: str = "https://mermaid.ink",
 710           params: dict | None = None,
 711           timeout: int = 30,
 712           super_component_expansion: bool = False) -> None
 713  ```
 714  
 715  Display an image representing this `Pipeline` in a Jupyter notebook.
 716  
 717  This function generates a diagram of the `Pipeline` using a Mermaid server and displays it directly in
 718  the notebook.
 719  
 720  **Arguments**:
 721  
 722  - `server_url`: The base URL of the Mermaid server used for rendering (default: 'https://mermaid.ink').
 723  See https://github.com/jihchi/mermaid.ink and https://github.com/mermaid-js/mermaid-live-editor for more
 724  info on how to set up your own Mermaid server.
 725  - `params`: Dictionary of customization parameters to modify the output. Refer to Mermaid documentation for more details
 726  Supported keys:
 727  - format: Output format ('img', 'svg', or 'pdf'). Default: 'img'.
 728  - type: Image type for /img endpoint ('jpeg', 'png', 'webp'). Default: 'png'.
 729  - theme: Mermaid theme ('default', 'neutral', 'dark', 'forest'). Default: 'neutral'.
 730  - bgColor: Background color in hexadecimal (e.g., 'FFFFFF') or named format (e.g., '!white').
 731  - width: Width of the output image (integer).
 732  - height: Height of the output image (integer).
 733  - scale: Scaling factor (1–3). Only applicable if 'width' or 'height' is specified.
 734  - fit: Whether to fit the diagram size to the page (PDF only, boolean).
 735  - paper: Paper size for PDFs (e.g., 'a4', 'a3'). Ignored if 'fit' is true.
 736  - landscape: Landscape orientation for PDFs (boolean). Ignored if 'fit' is true.
 737  - `timeout`: Timeout in seconds for the request to the Mermaid server.
 738  - `super_component_expansion`: If set to True and the pipeline contains SuperComponents the diagram will show the internal structure of
 739  super-components as if they were components part of the pipeline instead of a "black-box".
 740  Otherwise, only the super-component itself will be displayed.
 741  
 742  **Raises**:
 743  
 744  - `PipelineDrawingError`: If the function is called outside of a Jupyter notebook or if there is an issue with rendering.
 745  
 746  <a id="async_pipeline.AsyncPipeline.draw"></a>
 747  
 748  #### AsyncPipeline.draw
 749  
 750  ```python
 751  def draw(*,
 752           path: Path,
 753           server_url: str = "https://mermaid.ink",
 754           params: dict | None = None,
 755           timeout: int = 30,
 756           super_component_expansion: bool = False) -> None
 757  ```
 758  
 759  Save an image representing this `Pipeline` to the specified file path.
 760  
 761  This function generates a diagram of the `Pipeline` using the Mermaid server and saves it to the provided path.
 762  
 763  **Arguments**:
 764  
 765  - `path`: The file path where the generated image will be saved.
 766  - `server_url`: The base URL of the Mermaid server used for rendering (default: 'https://mermaid.ink').
 767  See https://github.com/jihchi/mermaid.ink and https://github.com/mermaid-js/mermaid-live-editor for more
 768  info on how to set up your own Mermaid server.
 769  - `params`: Dictionary of customization parameters to modify the output. Refer to Mermaid documentation for more details
 770  Supported keys:
 771  - format: Output format ('img', 'svg', or 'pdf'). Default: 'img'.
 772  - type: Image type for /img endpoint ('jpeg', 'png', 'webp'). Default: 'png'.
 773  - theme: Mermaid theme ('default', 'neutral', 'dark', 'forest'). Default: 'neutral'.
 774  - bgColor: Background color in hexadecimal (e.g., 'FFFFFF') or named format (e.g., '!white').
 775  - width: Width of the output image (integer).
 776  - height: Height of the output image (integer).
 777  - scale: Scaling factor (1–3). Only applicable if 'width' or 'height' is specified.
 778  - fit: Whether to fit the diagram size to the page (PDF only, boolean).
 779  - paper: Paper size for PDFs (e.g., 'a4', 'a3'). Ignored if 'fit' is true.
 780  - landscape: Landscape orientation for PDFs (boolean). Ignored if 'fit' is true.
 781  - `timeout`: Timeout in seconds for the request to the Mermaid server.
 782  - `super_component_expansion`: If set to True and the pipeline contains SuperComponents the diagram will show the internal structure of
 783  super-components as if they were components part of the pipeline instead of a "black-box".
 784  Otherwise, only the super-component itself will be displayed.
 785  
 786  **Raises**:
 787  
 788  - `PipelineDrawingError`: If there is an issue with rendering or saving the image.
 789  
 790  <a id="async_pipeline.AsyncPipeline.walk"></a>
 791  
 792  #### AsyncPipeline.walk
 793  
 794  ```python
 795  def walk() -> Iterator[tuple[str, Component]]
 796  ```
 797  
 798  Visits each component in the pipeline exactly once and yields its name and instance.
 799  
 800  No guarantees are provided on the visiting order.
 801  
 802  **Returns**:
 803  
 804  An iterator of tuples of component name and component instance.
 805  
 806  <a id="async_pipeline.AsyncPipeline.warm_up"></a>
 807  
 808  #### AsyncPipeline.warm\_up
 809  
 810  ```python
 811  def warm_up() -> None
 812  ```
 813  
 814  Make sure all nodes are warm.
 815  
 816  It's the node's responsibility to make sure this method can be called at every `Pipeline.run()`
 817  without re-initializing everything.
 818  
 819  <a id="async_pipeline.AsyncPipeline.validate_input"></a>
 820  
 821  #### AsyncPipeline.validate\_input
 822  
 823  ```python
 824  def validate_input(data: dict[str, Any]) -> None
 825  ```
 826  
 827  Validates pipeline input data.
 828  
 829  Validates that data:
 830  * Each Component name actually exists in the Pipeline
 831  * Each Component is not missing any input
 832  * Each Component has only one input per input socket, if not variadic
 833  * Each Component doesn't receive inputs that are already sent by another Component
 834  
 835  **Arguments**:
 836  
 837  - `data`: A dictionary of inputs for the pipeline's components. Each key is a component name.
 838  
 839  **Raises**:
 840  
 841  - `ValueError`: If inputs are invalid according to the above.
 842  
 843  <a id="async_pipeline.AsyncPipeline.from_template"></a>
 844  
 845  #### AsyncPipeline.from\_template
 846  
 847  ```python
 848  @classmethod
 849  def from_template(
 850          cls,
 851          predefined_pipeline: PredefinedPipeline,
 852          template_params: dict[str, Any] | None = None) -> "PipelineBase"
 853  ```
 854  
 855  Create a Pipeline from a predefined template. See `PredefinedPipeline` for available options.
 856  
 857  **Arguments**:
 858  
 859  - `predefined_pipeline`: The predefined pipeline to use.
 860  - `template_params`: An optional dictionary of parameters to use when rendering the pipeline template.
 861  
 862  **Returns**:
 863  
 864  An instance of `Pipeline`.
 865  
 866  <a id="async_pipeline.AsyncPipeline.validate_pipeline"></a>
 867  
 868  #### AsyncPipeline.validate\_pipeline
 869  
 870  ```python
 871  @staticmethod
 872  def validate_pipeline(priority_queue: FIFOPriorityQueue) -> None
 873  ```
 874  
 875  Validate the pipeline to check if it is blocked or has no valid entry point.
 876  
 877  **Arguments**:
 878  
 879  - `priority_queue`: Priority queue of component names.
 880  
 881  **Raises**:
 882  
 883  - `PipelineRuntimeError`: If the pipeline is blocked or has no valid entry point.
 884  
 885  <a id="pipeline"></a>
 886  
 887  ## Module pipeline
 888  
 889  <a id="pipeline.Pipeline"></a>
 890  
 891  ### Pipeline
 892  
 893  Synchronous version of the orchestration engine.
 894  
 895  Orchestrates component execution according to the execution graph, one after the other.
 896  
 897  <a id="pipeline.Pipeline.run"></a>
 898  
 899  #### Pipeline.run
 900  
 901  ```python
 902  def run(data: dict[str, Any],
 903          include_outputs_from: set[str] | None = None,
 904          *,
 905          break_point: Breakpoint | AgentBreakpoint | None = None,
 906          pipeline_snapshot: PipelineSnapshot | None = None) -> dict[str, Any]
 907  ```
 908  
 909  Runs the Pipeline with given input data.
 910  
 911  Usage:
 912  ```python
 913  from haystack import Pipeline, Document
 914  from haystack.utils import Secret
 915  from haystack.document_stores.in_memory import InMemoryDocumentStore
 916  from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
 917  from haystack.components.generators import OpenAIGenerator
 918  from haystack.components.builders.answer_builder import AnswerBuilder
 919  from haystack.components.builders.prompt_builder import PromptBuilder
 920  
 921  # Write documents to InMemoryDocumentStore
 922  document_store = InMemoryDocumentStore()
 923  document_store.write_documents([
 924      Document(content="My name is Jean and I live in Paris."),
 925      Document(content="My name is Mark and I live in Berlin."),
 926      Document(content="My name is Giorgio and I live in Rome.")
 927  ])
 928  
 929  prompt_template = """
 930  Given these documents, answer the question.
 931  Documents:
 932  {% for doc in documents %}
 933      {{ doc.content }}
 934  {% endfor %}
 935  Question: {{question}}
 936  Answer:
 937  """
 938  
 939  retriever = InMemoryBM25Retriever(document_store=document_store)
 940  prompt_builder = PromptBuilder(template=prompt_template)
 941  llm = OpenAIGenerator(api_key=Secret.from_token(api_key))
 942  
 943  rag_pipeline = Pipeline()
 944  rag_pipeline.add_component("retriever", retriever)
 945  rag_pipeline.add_component("prompt_builder", prompt_builder)
 946  rag_pipeline.add_component("llm", llm)
 947  rag_pipeline.connect("retriever", "prompt_builder.documents")
 948  rag_pipeline.connect("prompt_builder", "llm")
 949  
 950  # Ask a question
 951  question = "Who lives in Paris?"
 952  results = rag_pipeline.run(
 953      {
 954          "retriever": {"query": question},
 955          "prompt_builder": {"question": question},
 956      }
 957  )
 958  
 959  print(results["llm"]["replies"])
 960  # Jean lives in Paris
 961  ```
 962  
 963  **Arguments**:
 964  
 965  - `data`: A dictionary of inputs for the pipeline's components. Each key is a component name
 966  and its value is a dictionary of that component's input parameters:
 967  ```
 968  data = {
 969      "comp1": {"input1": 1, "input2": 2},
 970  }
 971  ```
 972  For convenience, this format is also supported when input names are unique:
 973  ```
 974  data = {
 975      "input1": 1, "input2": 2,
 976  }
 977  ```
 978  - `include_outputs_from`: Set of component names whose individual outputs are to be
 979  included in the pipeline's output. For components that are
 980  invoked multiple times (in a loop), only the last-produced
 981  output is included.
 982  - `break_point`: A set of breakpoints that can be used to debug the pipeline execution.
 983  - `pipeline_snapshot`: A dictionary containing a snapshot of a previously saved pipeline execution.
 984  
 985  **Raises**:
 986  
 987  - `ValueError`: If invalid inputs are provided to the pipeline.
 988  - `PipelineRuntimeError`: If the Pipeline contains cycles with unsupported connections that would cause
 989  it to get stuck and fail running.
 990  Or if a Component fails or returns output in an unsupported type.
 991  - `PipelineMaxComponentRuns`: If a Component reaches the maximum number of times it can be run in this Pipeline.
 992  - `PipelineBreakpointException`: When a pipeline_breakpoint is triggered. Contains the component name, state, and partial results.
 993  
 994  **Returns**:
 995  
 996  A dictionary where each entry corresponds to a component name
 997  and its output. If `include_outputs_from` is `None`, this dictionary
 998  will only contain the outputs of leaf components, i.e., components
 999  without outgoing connections.
1000  
1001  <a id="pipeline.Pipeline.__init__"></a>
1002  
1003  #### Pipeline.\_\_init\_\_
1004  
1005  ```python
1006  def __init__(metadata: dict[str, Any] | None = None,
1007               max_runs_per_component: int = 100,
1008               connection_type_validation: bool = True)
1009  ```
1010  
1011  Creates the Pipeline.
1012  
1013  **Arguments**:
1014  
1015  - `metadata`: Arbitrary dictionary to store metadata about this `Pipeline`. Make sure all the values contained in
1016  this dictionary can be serialized and deserialized if you wish to save this `Pipeline` to file.
1017  - `max_runs_per_component`: How many times the `Pipeline` can run the same Component.
1018  If this limit is reached a `PipelineMaxComponentRuns` exception is raised.
1019  If not set defaults to 100 runs per Component.
1020  - `connection_type_validation`: Whether the pipeline will validate the types of the connections.
1021  Defaults to True.
1022  
1023  <a id="pipeline.Pipeline.__eq__"></a>
1024  
1025  #### Pipeline.\_\_eq\_\_
1026  
1027  ```python
1028  def __eq__(other: object) -> bool
1029  ```
1030  
1031  Pipeline equality is defined by their type and the equality of their serialized form.
1032  
1033  Pipelines of the same type share every metadata, node and edge, but they're not required to use
1034  the same node instances: this allows pipeline saved and then loaded back to be equal to themselves.
1035  
1036  <a id="pipeline.Pipeline.__repr__"></a>
1037  
1038  #### Pipeline.\_\_repr\_\_
1039  
1040  ```python
1041  def __repr__() -> str
1042  ```
1043  
1044  Returns a text representation of the Pipeline.
1045  
1046  <a id="pipeline.Pipeline.to_dict"></a>
1047  
1048  #### Pipeline.to\_dict
1049  
1050  ```python
1051  def to_dict() -> dict[str, Any]
1052  ```
1053  
1054  Serializes the pipeline to a dictionary.
1055  
1056  This is meant to be an intermediate representation but it can be also used to save a pipeline to file.
1057  
1058  **Returns**:
1059  
1060  Dictionary with serialized data.
1061  
1062  <a id="pipeline.Pipeline.from_dict"></a>
1063  
1064  #### Pipeline.from\_dict
1065  
1066  ```python
1067  @classmethod
1068  def from_dict(cls: type[T],
1069                data: dict[str, Any],
1070                callbacks: DeserializationCallbacks | None = None,
1071                **kwargs: Any) -> T
1072  ```
1073  
1074  Deserializes the pipeline from a dictionary.
1075  
1076  **Arguments**:
1077  
1078  - `data`: Dictionary to deserialize from.
1079  - `callbacks`: Callbacks to invoke during deserialization.
1080  - `kwargs`: `components`: a dictionary of `{name: instance}` to reuse instances of components instead of creating new
1081  ones.
1082  
1083  **Returns**:
1084  
1085  Deserialized component.
1086  
1087  <a id="pipeline.Pipeline.dumps"></a>
1088  
1089  #### Pipeline.dumps
1090  
1091  ```python
1092  def dumps(marshaller: Marshaller = DEFAULT_MARSHALLER) -> str
1093  ```
1094  
1095  Returns the string representation of this pipeline according to the format dictated by the `Marshaller` in use.
1096  
1097  **Arguments**:
1098  
1099  - `marshaller`: The Marshaller used to create the string representation. Defaults to `YamlMarshaller`.
1100  
1101  **Returns**:
1102  
1103  A string representing the pipeline.
1104  
1105  <a id="pipeline.Pipeline.dump"></a>
1106  
1107  #### Pipeline.dump
1108  
1109  ```python
1110  def dump(fp: TextIO, marshaller: Marshaller = DEFAULT_MARSHALLER) -> None
1111  ```
1112  
1113  Writes the string representation of this pipeline to the file-like object passed in the `fp` argument.
1114  
1115  **Arguments**:
1116  
1117  - `fp`: A file-like object ready to be written to.
1118  - `marshaller`: The Marshaller used to create the string representation. Defaults to `YamlMarshaller`.
1119  
1120  <a id="pipeline.Pipeline.loads"></a>
1121  
1122  #### Pipeline.loads
1123  
1124  ```python
1125  @classmethod
1126  def loads(cls: type[T],
1127            data: str | bytes | bytearray,
1128            marshaller: Marshaller = DEFAULT_MARSHALLER,
1129            callbacks: DeserializationCallbacks | None = None) -> T
1130  ```
1131  
1132  Creates a `Pipeline` object from the string representation passed in the `data` argument.
1133  
1134  **Arguments**:
1135  
1136  - `data`: The string representation of the pipeline, can be `str`, `bytes` or `bytearray`.
1137  - `marshaller`: The Marshaller used to create the string representation. Defaults to `YamlMarshaller`.
1138  - `callbacks`: Callbacks to invoke during deserialization.
1139  
1140  **Raises**:
1141  
1142  - `DeserializationError`: If an error occurs during deserialization.
1143  
1144  **Returns**:
1145  
1146  A `Pipeline` object.
1147  
1148  <a id="pipeline.Pipeline.load"></a>
1149  
1150  #### Pipeline.load
1151  
1152  ```python
1153  @classmethod
1154  def load(cls: type[T],
1155           fp: TextIO,
1156           marshaller: Marshaller = DEFAULT_MARSHALLER,
1157           callbacks: DeserializationCallbacks | None = None) -> T
1158  ```
1159  
1160  Creates a `Pipeline` object a string representation.
1161  
1162  The string representation is read from the file-like object passed in the `fp` argument.
1163  
1164  **Arguments**:
1165  
1166  - `fp`: A file-like object ready to be read from.
1167  - `marshaller`: The Marshaller used to create the string representation. Defaults to `YamlMarshaller`.
1168  - `callbacks`: Callbacks to invoke during deserialization.
1169  
1170  **Raises**:
1171  
1172  - `DeserializationError`: If an error occurs during deserialization.
1173  
1174  **Returns**:
1175  
1176  A `Pipeline` object.
1177  
1178  <a id="pipeline.Pipeline.add_component"></a>
1179  
1180  #### Pipeline.add\_component
1181  
1182  ```python
1183  def add_component(name: str, instance: Component) -> None
1184  ```
1185  
1186  Add the given component to the pipeline.
1187  
1188  Components are not connected to anything by default: use `Pipeline.connect()` to connect components together.
1189  Component names must be unique, but component instances can be reused if needed.
1190  
1191  **Arguments**:
1192  
1193  - `name`: The name of the component to add.
1194  - `instance`: The component instance to add.
1195  
1196  **Raises**:
1197  
1198  - `ValueError`: If a component with the same name already exists.
1199  - `PipelineValidationError`: If the given instance is not a component.
1200  
1201  <a id="pipeline.Pipeline.remove_component"></a>
1202  
1203  #### Pipeline.remove\_component
1204  
1205  ```python
1206  def remove_component(name: str) -> Component
1207  ```
1208  
1209  Remove and returns component from the pipeline.
1210  
1211  Remove an existing component from the pipeline by providing its name.
1212  All edges that connect to the component will also be deleted.
1213  
1214  **Arguments**:
1215  
1216  - `name`: The name of the component to remove.
1217  
1218  **Raises**:
1219  
1220  - `ValueError`: If there is no component with that name already in the Pipeline.
1221  
1222  **Returns**:
1223  
1224  The removed Component instance.
1225  
1226  <a id="pipeline.Pipeline.connect"></a>
1227  
1228  #### Pipeline.connect
1229  
1230  ```python
1231  def connect(sender: str, receiver: str) -> "PipelineBase"
1232  ```
1233  
1234  Connects two components together.
1235  
1236  All components to connect must exist in the pipeline.
1237  If connecting to a component that has several output connections, specify the inputs and output names as
1238  'component_name.connections_name'.
1239  
1240  **Arguments**:
1241  
1242  - `sender`: The component that delivers the value. This can be either just a component name or can be
1243  in the format `component_name.connection_name` if the component has multiple outputs.
1244  - `receiver`: The component that receives the value. This can be either just a component name or can be
1245  in the format `component_name.connection_name` if the component has multiple inputs.
1246  
1247  **Raises**:
1248  
1249  - `PipelineConnectError`: If the two components cannot be connected (for example if one of the components is
1250  not present in the pipeline, or the connections don't match by type, and so on).
1251  
1252  **Returns**:
1253  
1254  The Pipeline instance.
1255  
1256  <a id="pipeline.Pipeline.get_component"></a>
1257  
1258  #### Pipeline.get\_component
1259  
1260  ```python
1261  def get_component(name: str) -> Component
1262  ```
1263  
1264  Get the component with the specified name from the pipeline.
1265  
1266  **Arguments**:
1267  
1268  - `name`: The name of the component.
1269  
1270  **Raises**:
1271  
1272  - `ValueError`: If a component with that name is not present in the pipeline.
1273  
1274  **Returns**:
1275  
1276  The instance of that component.
1277  
1278  <a id="pipeline.Pipeline.get_component_name"></a>
1279  
1280  #### Pipeline.get\_component\_name
1281  
1282  ```python
1283  def get_component_name(instance: Component) -> str
1284  ```
1285  
1286  Returns the name of the Component instance if it has been added to this Pipeline or an empty string otherwise.
1287  
1288  **Arguments**:
1289  
1290  - `instance`: The Component instance to look for.
1291  
1292  **Returns**:
1293  
1294  The name of the Component instance.
1295  
1296  <a id="pipeline.Pipeline.inputs"></a>
1297  
1298  #### Pipeline.inputs
1299  
1300  ```python
1301  def inputs(
1302      include_components_with_connected_inputs: bool = False
1303  ) -> dict[str, dict[str, Any]]
1304  ```
1305  
1306  Returns a dictionary containing the inputs of a pipeline.
1307  
1308  Each key in the dictionary corresponds to a component name, and its value is another dictionary that describes
1309  the input sockets of that component, including their types and whether they are optional.
1310  
1311  **Arguments**:
1312  
1313  - `include_components_with_connected_inputs`: If `False`, only components that have disconnected input edges are
1314  included in the output.
1315  
1316  **Returns**:
1317  
1318  A dictionary where each key is a pipeline component name and each value is a dictionary of
1319  inputs sockets of that component.
1320  
1321  <a id="pipeline.Pipeline.outputs"></a>
1322  
1323  #### Pipeline.outputs
1324  
1325  ```python
1326  def outputs(
1327      include_components_with_connected_outputs: bool = False
1328  ) -> dict[str, dict[str, Any]]
1329  ```
1330  
1331  Returns a dictionary containing the outputs of a pipeline.
1332  
1333  Each key in the dictionary corresponds to a component name, and its value is another dictionary that describes
1334  the output sockets of that component.
1335  
1336  **Arguments**:
1337  
1338  - `include_components_with_connected_outputs`: If `False`, only components that have disconnected output edges are
1339  included in the output.
1340  
1341  **Returns**:
1342  
1343  A dictionary where each key is a pipeline component name and each value is a dictionary of
1344  output sockets of that component.
1345  
1346  <a id="pipeline.Pipeline.show"></a>
1347  
1348  #### Pipeline.show
1349  
1350  ```python
1351  def show(*,
1352           server_url: str = "https://mermaid.ink",
1353           params: dict | None = None,
1354           timeout: int = 30,
1355           super_component_expansion: bool = False) -> None
1356  ```
1357  
1358  Display an image representing this `Pipeline` in a Jupyter notebook.
1359  
1360  This function generates a diagram of the `Pipeline` using a Mermaid server and displays it directly in
1361  the notebook.
1362  
1363  **Arguments**:
1364  
1365  - `server_url`: The base URL of the Mermaid server used for rendering (default: 'https://mermaid.ink').
1366  See https://github.com/jihchi/mermaid.ink and https://github.com/mermaid-js/mermaid-live-editor for more
1367  info on how to set up your own Mermaid server.
1368  - `params`: Dictionary of customization parameters to modify the output. Refer to Mermaid documentation for more details
1369  Supported keys:
1370  - format: Output format ('img', 'svg', or 'pdf'). Default: 'img'.
1371  - type: Image type for /img endpoint ('jpeg', 'png', 'webp'). Default: 'png'.
1372  - theme: Mermaid theme ('default', 'neutral', 'dark', 'forest'). Default: 'neutral'.
1373  - bgColor: Background color in hexadecimal (e.g., 'FFFFFF') or named format (e.g., '!white').
1374  - width: Width of the output image (integer).
1375  - height: Height of the output image (integer).
1376  - scale: Scaling factor (1–3). Only applicable if 'width' or 'height' is specified.
1377  - fit: Whether to fit the diagram size to the page (PDF only, boolean).
1378  - paper: Paper size for PDFs (e.g., 'a4', 'a3'). Ignored if 'fit' is true.
1379  - landscape: Landscape orientation for PDFs (boolean). Ignored if 'fit' is true.
1380  - `timeout`: Timeout in seconds for the request to the Mermaid server.
1381  - `super_component_expansion`: If set to True and the pipeline contains SuperComponents the diagram will show the internal structure of
1382  super-components as if they were components part of the pipeline instead of a "black-box".
1383  Otherwise, only the super-component itself will be displayed.
1384  
1385  **Raises**:
1386  
1387  - `PipelineDrawingError`: If the function is called outside of a Jupyter notebook or if there is an issue with rendering.
1388  
1389  <a id="pipeline.Pipeline.draw"></a>
1390  
1391  #### Pipeline.draw
1392  
1393  ```python
1394  def draw(*,
1395           path: Path,
1396           server_url: str = "https://mermaid.ink",
1397           params: dict | None = None,
1398           timeout: int = 30,
1399           super_component_expansion: bool = False) -> None
1400  ```
1401  
1402  Save an image representing this `Pipeline` to the specified file path.
1403  
1404  This function generates a diagram of the `Pipeline` using the Mermaid server and saves it to the provided path.
1405  
1406  **Arguments**:
1407  
1408  - `path`: The file path where the generated image will be saved.
1409  - `server_url`: The base URL of the Mermaid server used for rendering (default: 'https://mermaid.ink').
1410  See https://github.com/jihchi/mermaid.ink and https://github.com/mermaid-js/mermaid-live-editor for more
1411  info on how to set up your own Mermaid server.
1412  - `params`: Dictionary of customization parameters to modify the output. Refer to Mermaid documentation for more details
1413  Supported keys:
1414  - format: Output format ('img', 'svg', or 'pdf'). Default: 'img'.
1415  - type: Image type for /img endpoint ('jpeg', 'png', 'webp'). Default: 'png'.
1416  - theme: Mermaid theme ('default', 'neutral', 'dark', 'forest'). Default: 'neutral'.
1417  - bgColor: Background color in hexadecimal (e.g., 'FFFFFF') or named format (e.g., '!white').
1418  - width: Width of the output image (integer).
1419  - height: Height of the output image (integer).
1420  - scale: Scaling factor (1–3). Only applicable if 'width' or 'height' is specified.
1421  - fit: Whether to fit the diagram size to the page (PDF only, boolean).
1422  - paper: Paper size for PDFs (e.g., 'a4', 'a3'). Ignored if 'fit' is true.
1423  - landscape: Landscape orientation for PDFs (boolean). Ignored if 'fit' is true.
1424  - `timeout`: Timeout in seconds for the request to the Mermaid server.
1425  - `super_component_expansion`: If set to True and the pipeline contains SuperComponents the diagram will show the internal structure of
1426  super-components as if they were components part of the pipeline instead of a "black-box".
1427  Otherwise, only the super-component itself will be displayed.
1428  
1429  **Raises**:
1430  
1431  - `PipelineDrawingError`: If there is an issue with rendering or saving the image.
1432  
1433  <a id="pipeline.Pipeline.walk"></a>
1434  
1435  #### Pipeline.walk
1436  
1437  ```python
1438  def walk() -> Iterator[tuple[str, Component]]
1439  ```
1440  
1441  Visits each component in the pipeline exactly once and yields its name and instance.
1442  
1443  No guarantees are provided on the visiting order.
1444  
1445  **Returns**:
1446  
1447  An iterator of tuples of component name and component instance.
1448  
1449  <a id="pipeline.Pipeline.warm_up"></a>
1450  
1451  #### Pipeline.warm\_up
1452  
1453  ```python
1454  def warm_up() -> None
1455  ```
1456  
1457  Make sure all nodes are warm.
1458  
1459  It's the node's responsibility to make sure this method can be called at every `Pipeline.run()`
1460  without re-initializing everything.
1461  
1462  <a id="pipeline.Pipeline.validate_input"></a>
1463  
1464  #### Pipeline.validate\_input
1465  
1466  ```python
1467  def validate_input(data: dict[str, Any]) -> None
1468  ```
1469  
1470  Validates pipeline input data.
1471  
1472  Validates that data:
1473  * Each Component name actually exists in the Pipeline
1474  * Each Component is not missing any input
1475  * Each Component has only one input per input socket, if not variadic
1476  * Each Component doesn't receive inputs that are already sent by another Component
1477  
1478  **Arguments**:
1479  
1480  - `data`: A dictionary of inputs for the pipeline's components. Each key is a component name.
1481  
1482  **Raises**:
1483  
1484  - `ValueError`: If inputs are invalid according to the above.
1485  
1486  <a id="pipeline.Pipeline.from_template"></a>
1487  
1488  #### Pipeline.from\_template
1489  
1490  ```python
1491  @classmethod
1492  def from_template(
1493          cls,
1494          predefined_pipeline: PredefinedPipeline,
1495          template_params: dict[str, Any] | None = None) -> "PipelineBase"
1496  ```
1497  
1498  Create a Pipeline from a predefined template. See `PredefinedPipeline` for available options.
1499  
1500  **Arguments**:
1501  
1502  - `predefined_pipeline`: The predefined pipeline to use.
1503  - `template_params`: An optional dictionary of parameters to use when rendering the pipeline template.
1504  
1505  **Returns**:
1506  
1507  An instance of `Pipeline`.
1508  
1509  <a id="pipeline.Pipeline.validate_pipeline"></a>
1510  
1511  #### Pipeline.validate\_pipeline
1512  
1513  ```python
1514  @staticmethod
1515  def validate_pipeline(priority_queue: FIFOPriorityQueue) -> None
1516  ```
1517  
1518  Validate the pipeline to check if it is blocked or has no valid entry point.
1519  
1520  **Arguments**:
1521  
1522  - `priority_queue`: Priority queue of component names.
1523  
1524  **Raises**:
1525  
1526  - `PipelineRuntimeError`: If the pipeline is blocked or has no valid entry point.
1527