pipeline_api.md
   1  ---
   2  title: "Pipeline"
   3  id: pipeline-api
   4  description: "Arranges components and integrations in flow."
   5  slug: "/pipeline-api"
   6  ---
   7  
   8  <a id="async_pipeline"></a>
   9  
  10  ## Module async\_pipeline
  11  
  12  <a id="async_pipeline.AsyncPipeline"></a>
  13  
  14  ### AsyncPipeline
  15  
  16  Asynchronous version of the Pipeline orchestration engine.
  17  
  18  Manages components in a pipeline allowing for concurrent processing when the pipeline's execution graph permits.
  19  This enables efficient processing of components by minimizing idle time and maximizing resource utilization.
  20  
  21  <a id="async_pipeline.AsyncPipeline.run_async_generator"></a>
  22  
  23  #### AsyncPipeline.run\_async\_generator
  24  
  25  ```python
  26  async def run_async_generator(
  27          data: dict[str, Any],
  28          include_outputs_from: set[str] | None = None,
  29          concurrency_limit: int = 4) -> AsyncIterator[dict[str, Any]]
  30  ```
  31  
  32  Executes the pipeline step by step asynchronously, yielding partial outputs when any component finishes.
  33  
  34  Usage:
  35  ```python
  36  from haystack import Document
  37  from haystack.components.builders import ChatPromptBuilder
  38  from haystack.dataclasses import ChatMessage
  39  from haystack.utils import Secret
  40  from haystack.document_stores.in_memory import InMemoryDocumentStore
  41  from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
  42  from haystack.components.generators.chat import OpenAIChatGenerator
  43  from haystack.components.builders.prompt_builder import PromptBuilder
  44  from haystack import AsyncPipeline
  45  import asyncio
  46  
  47  # Write documents to InMemoryDocumentStore
  48  document_store = InMemoryDocumentStore()
  49  document_store.write_documents([
  50      Document(content="My name is Jean and I live in Paris."),
  51      Document(content="My name is Mark and I live in Berlin."),
  52      Document(content="My name is Giorgio and I live in Rome.")
  53  ])
  54  
  55  prompt_template = [
  56      ChatMessage.from_user(
  57          '''
  58          Given these documents, answer the question.
  59          Documents:
  60          {% for doc in documents %}
  61              {{ doc.content }}
  62          {% endfor %}
  63          Question: {{question}}
  64          Answer:
  65          ''')
  66  ]
  67  
  68  # Create and connect pipeline components
  69  retriever = InMemoryBM25Retriever(document_store=document_store)
  70  prompt_builder = ChatPromptBuilder(template=prompt_template)
  71  llm = OpenAIChatGenerator()
  72  
  73  rag_pipeline = AsyncPipeline()
  74  rag_pipeline.add_component("retriever", retriever)
  75  rag_pipeline.add_component("prompt_builder", prompt_builder)
  76  rag_pipeline.add_component("llm", llm)
  77  rag_pipeline.connect("retriever", "prompt_builder.documents")
  78  rag_pipeline.connect("prompt_builder", "llm")
  79  
  80  # Prepare input data
  81  question = "Who lives in Paris?"
  82  data = {
  83      "retriever": {"query": question},
  84      "prompt_builder": {"question": question},
  85  }
  86  
  87  
  88  # Process results as they become available
  89  async def process_results():
  90      async for partial_output in rag_pipeline.run_async_generator(
  91              data=data,
  92              include_outputs_from={"retriever", "llm"}
  93      ):
  94          # Each partial_output contains the results from a completed component
  95          if "retriever" in partial_output:
  96              print("Retrieved documents:", len(partial_output["retriever"]["documents"]))
  97          if "llm" in partial_output:
  98              print("Generated answer:", partial_output["llm"]["replies"][0])
  99  
 100  
 101  asyncio.run(process_results())
 102  ```
 103  
 104  **Arguments**:
 105  
 106  - `data`: Initial input data to the pipeline.
 107  - `concurrency_limit`: The maximum number of components that are allowed to run concurrently.
 108  - `include_outputs_from`: Set of component names whose individual outputs are to be
 109  included in the pipeline's output. For components that are
 110  invoked multiple times (in a loop), only the last-produced
 111  output is included.
 112  
 113  **Raises**:
 114  
 115  - `ValueError`: If invalid inputs are provided to the pipeline.
 116  - `PipelineMaxComponentRuns`: If a component exceeds the maximum number of allowed executions within the pipeline.
 117  - `PipelineRuntimeError`: If the Pipeline contains cycles with unsupported connections that would cause
 118  it to get stuck and fail running.
 119  Or if a Component fails or returns output in an unsupported type.
 120  
 121  **Returns**:
 122  
 123  An async iterator containing partial (and final) outputs.
 124  
 125  <a id="async_pipeline.AsyncPipeline.run_async"></a>
 126  
 127  #### AsyncPipeline.run\_async
 128  
 129  ```python
 130  async def run_async(data: dict[str, Any],
 131                      include_outputs_from: set[str] | None = None,
 132                      concurrency_limit: int = 4) -> dict[str, Any]
 133  ```
 134  
 135  Provides an asynchronous interface to run the pipeline with provided input data.
 136  
 137  This method allows the pipeline to be integrated into an asynchronous workflow, enabling non-blocking
 138  execution of pipeline components.
 139  
 140  Usage:
 141  ```python
 142  import asyncio
 143  
 144  from haystack import Document
 145  from haystack.components.builders import ChatPromptBuilder
 146  from haystack.components.generators.chat import OpenAIChatGenerator
 147  from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
 148  from haystack.core.pipeline import AsyncPipeline
 149  from haystack.dataclasses import ChatMessage
 150  from haystack.document_stores.in_memory import InMemoryDocumentStore
 151  
 152  # Write documents to InMemoryDocumentStore
 153  document_store = InMemoryDocumentStore()
 154  document_store.write_documents([
 155      Document(content="My name is Jean and I live in Paris."),
 156      Document(content="My name is Mark and I live in Berlin."),
 157      Document(content="My name is Giorgio and I live in Rome.")
 158  ])
 159  
 160  prompt_template = [
 161      ChatMessage.from_user(
 162          '''
 163          Given these documents, answer the question.
 164          Documents:
 165          {% for doc in documents %}
 166              {{ doc.content }}
 167          {% endfor %}
 168          Question: {{question}}
 169          Answer:
 170          ''')
 171  ]
 172  
 173  retriever = InMemoryBM25Retriever(document_store=document_store)
 174  prompt_builder = ChatPromptBuilder(template=prompt_template)
 175  llm = OpenAIChatGenerator()
 176  
 177  rag_pipeline = AsyncPipeline()
 178  rag_pipeline.add_component("retriever", retriever)
 179  rag_pipeline.add_component("prompt_builder", prompt_builder)
 180  rag_pipeline.add_component("llm", llm)
 181  rag_pipeline.connect("retriever", "prompt_builder.documents")
 182  rag_pipeline.connect("prompt_builder", "llm")
 183  
 184  # Ask a question
 185  question = "Who lives in Paris?"
 186  
 187  async def run_inner(data, include_outputs_from):
 188      return await rag_pipeline.run_async(data=data, include_outputs_from=include_outputs_from)
 189  
 190  data = {
 191      "retriever": {"query": question},
 192      "prompt_builder": {"question": question},
 193  }
 194  
 195  results = asyncio.run(run_inner(data, include_outputs_from={"retriever", "llm"}))
 196  
 197  print(results["llm"]["replies"])
 198  # [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>, _content=[TextContent(text='Jean lives in Paris.')],
 199  # _name=None, _meta={'model': 'gpt-5-mini', 'index': 0, 'finish_reason': 'stop', 'usage':
 200  # {'completion_tokens': 6, 'prompt_tokens': 69, 'total_tokens': 75,
 201  # 'completion_tokens_details': CompletionTokensDetails(accepted_prediction_tokens=0,
 202  # audio_tokens=0, reasoning_tokens=0, rejected_prediction_tokens=0), 'prompt_tokens_details':
 203  # PromptTokensDetails(audio_tokens=0, cached_tokens=0)}})]
 204  ```
 205  
 206  **Arguments**:
 207  
 208  - `data`: A dictionary of inputs for the pipeline's components. Each key is a component name
 209  and its value is a dictionary of that component's input parameters:
 210  ```
 211  data = {
 212      "comp1": {"input1": 1, "input2": 2},
 213  }
 214  ```
 215  For convenience, this format is also supported when input names are unique:
 216  ```
 217  data = {
 218      "input1": 1, "input2": 2,
 219  }
 220  ```
 221  - `include_outputs_from`: Set of component names whose individual outputs are to be
 222  included in the pipeline's output. For components that are
 223  invoked multiple times (in a loop), only the last-produced
 224  output is included.
 225  - `concurrency_limit`: The maximum number of components that should be allowed to run concurrently.
 226  
 227  **Raises**:
 228  
 229  - `ValueError`: If invalid inputs are provided to the pipeline.
 230  - `PipelineRuntimeError`: If the Pipeline contains cycles with unsupported connections that would cause
 231  it to get stuck and fail running.
 232  Or if a Component fails or returns output in an unsupported type.
 233  - `PipelineMaxComponentRuns`: If a Component reaches the maximum number of times it can be run in this Pipeline.
 234  
 235  **Returns**:
 236  
 237  A dictionary where each entry corresponds to a component name
 238  and its output. If `include_outputs_from` is `None`, this dictionary
 239  will only contain the outputs of leaf components, i.e., components
 240  without outgoing connections.
 241  
 242  <a id="async_pipeline.AsyncPipeline.run"></a>
 243  
 244  #### AsyncPipeline.run
 245  
 246  ```python
 247  def run(data: dict[str, Any],
 248          include_outputs_from: set[str] | None = None,
 249          concurrency_limit: int = 4) -> dict[str, Any]
 250  ```
 251  
 252  Provides a synchronous interface to run the pipeline with given input data.
 253  
 254  Internally, the pipeline components are executed asynchronously, but the method itself
 255  will block until the entire pipeline execution is complete.
 256  
 257  In case you need asynchronous methods, consider using `run_async` or `run_async_generator`.
 258  
 259  Usage:
 260  ```python
 261  from haystack import Document
 262  from haystack.components.builders import ChatPromptBuilder
 263  from haystack.components.generators.chat import OpenAIChatGenerator
 264  from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
 265  from haystack.core.pipeline import AsyncPipeline
 266  from haystack.dataclasses import ChatMessage
 267  from haystack.document_stores.in_memory import InMemoryDocumentStore
 268  
 269  # Write documents to InMemoryDocumentStore
 270  document_store = InMemoryDocumentStore()
 271  document_store.write_documents([
 272      Document(content="My name is Jean and I live in Paris."),
 273      Document(content="My name is Mark and I live in Berlin."),
 274      Document(content="My name is Giorgio and I live in Rome.")
 275  ])
 276  
 277  prompt_template = [
 278      ChatMessage.from_user(
 279          '''
 280          Given these documents, answer the question.
 281          Documents:
 282          {% for doc in documents %}
 283              {{ doc.content }}
 284          {% endfor %}
 285          Question: {{question}}
 286          Answer:
 287          ''')
 288  ]
 289  
 290  
 291  retriever = InMemoryBM25Retriever(document_store=document_store)
 292  prompt_builder = ChatPromptBuilder(template=prompt_template)
 293  llm = OpenAIChatGenerator()
 294  
 295  rag_pipeline = AsyncPipeline()
 296  rag_pipeline.add_component("retriever", retriever)
 297  rag_pipeline.add_component("prompt_builder", prompt_builder)
 298  rag_pipeline.add_component("llm", llm)
 299  rag_pipeline.connect("retriever", "prompt_builder.documents")
 300  rag_pipeline.connect("prompt_builder", "llm")
 301  
 302  # Ask a question
 303  question = "Who lives in Paris?"
 304  
 305  data = {
 306      "retriever": {"query": question},
 307      "prompt_builder": {"question": question},
 308  }
 309  
 310  results = rag_pipeline.run(data)
 311  
 312  print(results["llm"]["replies"])
 313  # [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>, _content=[TextContent(text='Jean lives in Paris.')],
 314  # _name=None, _meta={'model': 'gpt-5-mini', 'index': 0, 'finish_reason': 'stop', 'usage':
 315  # {'completion_tokens': 6, 'prompt_tokens': 69, 'total_tokens': 75, 'completion_tokens_details':
 316  # CompletionTokensDetails(accepted_prediction_tokens=0, audio_tokens=0, reasoning_tokens=0,
 317  # rejected_prediction_tokens=0), 'prompt_tokens_details': PromptTokensDetails(audio_tokens=0,
 318  # cached_tokens=0)}})]
 319  ```
 320  
 321  **Arguments**:
 322  
 323  - `data`: A dictionary of inputs for the pipeline's components. Each key is a component name
 324  and its value is a dictionary of that component's input parameters:
 325  ```
 326  data = {
 327      "comp1": {"input1": 1, "input2": 2},
 328  }
 329  ```
 330  For convenience, this format is also supported when input names are unique:
 331  ```
 332  data = {
 333      "input1": 1, "input2": 2,
 334  }
 335  ```
 336  - `include_outputs_from`: Set of component names whose individual outputs are to be
 337  included in the pipeline's output. For components that are
 338  invoked multiple times (in a loop), only the last-produced
 339  output is included.
 340  - `concurrency_limit`: The maximum number of components that should be allowed to run concurrently.
 341  
 342  **Raises**:
 343  
 344  - `ValueError`: If invalid inputs are provided to the pipeline.
 345  - `PipelineRuntimeError`: If the Pipeline contains cycles with unsupported connections that would cause
 346  it to get stuck and fail running.
 347  Or if a Component fails or returns output in an unsupported type.
 348  - `PipelineMaxComponentRuns`: If a Component reaches the maximum number of times it can be run in this Pipeline.
 349  - `RuntimeError`: If called from within an async context. Use `run_async` instead.
 350  
 351  **Returns**:
 352  
 353  A dictionary where each entry corresponds to a component name
 354  and its output. If `include_outputs_from` is `None`, this dictionary
 355  will only contain the outputs of leaf components, i.e., components
 356  without outgoing connections.
 357  
 358  <a id="async_pipeline.AsyncPipeline.__init__"></a>
 359  
 360  #### AsyncPipeline.\_\_init\_\_
 361  
 362  ```python
 363  def __init__(metadata: dict[str, Any] | None = None,
 364               max_runs_per_component: int = 100,
 365               connection_type_validation: bool = True)
 366  ```
 367  
 368  Creates the Pipeline.
 369  
 370  **Arguments**:
 371  
 372  - `metadata`: Arbitrary dictionary to store metadata about this `Pipeline`. Make sure all the values contained in
 373  this dictionary can be serialized and deserialized if you wish to save this `Pipeline` to file.
 374  - `max_runs_per_component`: How many times the `Pipeline` can run the same Component.
 375  If this limit is reached a `PipelineMaxComponentRuns` exception is raised.
 376  If not set defaults to 100 runs per Component.
 377  - `connection_type_validation`: Whether the pipeline will validate the types of the connections.
 378  Defaults to True.
 379  
 380  <a id="async_pipeline.AsyncPipeline.__eq__"></a>
 381  
 382  #### AsyncPipeline.\_\_eq\_\_
 383  
 384  ```python
 385  def __eq__(other: object) -> bool
 386  ```
 387  
 388  Pipeline equality is defined by their type and the equality of their serialized form.
 389  
 390  Pipelines of the same type share every metadata, node and edge, but they're not required to use
 391  the same node instances: this allows pipeline saved and then loaded back to be equal to themselves.
 392  
 393  <a id="async_pipeline.AsyncPipeline.__repr__"></a>
 394  
 395  #### AsyncPipeline.\_\_repr\_\_
 396  
 397  ```python
 398  def __repr__() -> str
 399  ```
 400  
 401  Returns a text representation of the Pipeline.
 402  
 403  <a id="async_pipeline.AsyncPipeline.to_dict"></a>
 404  
 405  #### AsyncPipeline.to\_dict
 406  
 407  ```python
 408  def to_dict() -> dict[str, Any]
 409  ```
 410  
 411  Serializes the pipeline to a dictionary.
 412  
 413  This is meant to be an intermediate representation but it can be also used to save a pipeline to file.
 414  
 415  **Returns**:
 416  
 417  Dictionary with serialized data.
 418  
 419  <a id="async_pipeline.AsyncPipeline.from_dict"></a>
 420  
 421  #### AsyncPipeline.from\_dict
 422  
 423  ```python
 424  @classmethod
 425  def from_dict(cls: type[T],
 426                data: dict[str, Any],
 427                callbacks: DeserializationCallbacks | None = None,
 428                **kwargs: Any) -> T
 429  ```
 430  
 431  Deserializes the pipeline from a dictionary.
 432  
 433  **Arguments**:
 434  
 435  - `data`: Dictionary to deserialize from.
 436  - `callbacks`: Callbacks to invoke during deserialization.
 437  - `kwargs`: `components`: a dictionary of `{name: instance}` to reuse instances of components instead of creating new
 438  ones.
 439  
 440  **Returns**:
 441  
 442  Deserialized component.
 443  
 444  <a id="async_pipeline.AsyncPipeline.dumps"></a>
 445  
 446  #### AsyncPipeline.dumps
 447  
 448  ```python
 449  def dumps(marshaller: Marshaller = DEFAULT_MARSHALLER) -> str
 450  ```
 451  
 452  Returns the string representation of this pipeline according to the format dictated by the `Marshaller` in use.
 453  
 454  **Arguments**:
 455  
 456  - `marshaller`: The Marshaller used to create the string representation. Defaults to `YamlMarshaller`.
 457  
 458  **Returns**:
 459  
 460  A string representing the pipeline.
 461  
 462  <a id="async_pipeline.AsyncPipeline.dump"></a>
 463  
 464  #### AsyncPipeline.dump
 465  
 466  ```python
 467  def dump(fp: TextIO, marshaller: Marshaller = DEFAULT_MARSHALLER) -> None
 468  ```
 469  
 470  Writes the string representation of this pipeline to the file-like object passed in the `fp` argument.
 471  
 472  **Arguments**:
 473  
 474  - `fp`: A file-like object ready to be written to.
 475  - `marshaller`: The Marshaller used to create the string representation. Defaults to `YamlMarshaller`.
 476  
 477  <a id="async_pipeline.AsyncPipeline.loads"></a>
 478  
 479  #### AsyncPipeline.loads
 480  
 481  ```python
 482  @classmethod
 483  def loads(cls: type[T],
 484            data: str | bytes | bytearray,
 485            marshaller: Marshaller = DEFAULT_MARSHALLER,
 486            callbacks: DeserializationCallbacks | None = None) -> T
 487  ```
 488  
 489  Creates a `Pipeline` object from the string representation passed in the `data` argument.
 490  
 491  **Arguments**:
 492  
 493  - `data`: The string representation of the pipeline, can be `str`, `bytes` or `bytearray`.
 494  - `marshaller`: The Marshaller used to create the string representation. Defaults to `YamlMarshaller`.
 495  - `callbacks`: Callbacks to invoke during deserialization.
 496  
 497  **Raises**:
 498  
 499  - `DeserializationError`: If an error occurs during deserialization.
 500  
 501  **Returns**:
 502  
 503  A `Pipeline` object.
 504  
 505  <a id="async_pipeline.AsyncPipeline.load"></a>
 506  
 507  #### AsyncPipeline.load
 508  
 509  ```python
 510  @classmethod
 511  def load(cls: type[T],
 512           fp: TextIO,
 513           marshaller: Marshaller = DEFAULT_MARSHALLER,
 514           callbacks: DeserializationCallbacks | None = None) -> T
 515  ```
 516  
 517  Creates a `Pipeline` object a string representation.
 518  
 519  The string representation is read from the file-like object passed in the `fp` argument.
 520  
 521  **Arguments**:
 522  
 523  - `fp`: A file-like object ready to be read from.
 524  - `marshaller`: The Marshaller used to create the string representation. Defaults to `YamlMarshaller`.
 525  - `callbacks`: Callbacks to invoke during deserialization.
 526  
 527  **Raises**:
 528  
 529  - `DeserializationError`: If an error occurs during deserialization.
 530  
 531  **Returns**:
 532  
 533  A `Pipeline` object.
 534  
 535  <a id="async_pipeline.AsyncPipeline.add_component"></a>
 536  
 537  #### AsyncPipeline.add\_component
 538  
 539  ```python
 540  def add_component(name: str, instance: Component) -> None
 541  ```
 542  
 543  Add the given component to the pipeline.
 544  
 545  Components are not connected to anything by default: use `Pipeline.connect()` to connect components together.
 546  Component names must be unique, but component instances can be reused if needed.
 547  
 548  **Arguments**:
 549  
 550  - `name`: The name of the component to add.
 551  - `instance`: The component instance to add.
 552  
 553  **Raises**:
 554  
 555  - `ValueError`: If a component with the same name already exists.
 556  - `PipelineValidationError`: If the given instance is not a component.
 557  
 558  <a id="async_pipeline.AsyncPipeline.remove_component"></a>
 559  
 560  #### AsyncPipeline.remove\_component
 561  
 562  ```python
 563  def remove_component(name: str) -> Component
 564  ```
 565  
 566  Remove and returns component from the pipeline.
 567  
 568  Remove an existing component from the pipeline by providing its name.
 569  All edges that connect to the component will also be deleted.
 570  
 571  **Arguments**:
 572  
 573  - `name`: The name of the component to remove.
 574  
 575  **Raises**:
 576  
 577  - `ValueError`: If there is no component with that name already in the Pipeline.
 578  
 579  **Returns**:
 580  
 581  The removed Component instance.
 582  
 583  <a id="async_pipeline.AsyncPipeline.connect"></a>
 584  
 585  #### AsyncPipeline.connect
 586  
 587  ```python
 588  def connect(sender: str, receiver: str) -> "PipelineBase"
 589  ```
 590  
 591  Connects two components together.
 592  
 593  All components to connect must exist in the pipeline.
 594  If connecting to a component that has several output connections, specify the inputs and output names as
 595  'component_name.connections_name'.
 596  
 597  **Arguments**:
 598  
 599  - `sender`: The component that delivers the value. This can be either just a component name or can be
 600  in the format `component_name.connection_name` if the component has multiple outputs.
 601  - `receiver`: The component that receives the value. This can be either just a component name or can be
 602  in the format `component_name.connection_name` if the component has multiple inputs.
 603  
 604  **Raises**:
 605  
 606  - `PipelineConnectError`: If the two components cannot be connected (for example if one of the components is
 607  not present in the pipeline, or the connections don't match by type, and so on).
 608  
 609  **Returns**:
 610  
 611  The Pipeline instance.
 612  
 613  <a id="async_pipeline.AsyncPipeline.get_component"></a>
 614  
 615  #### AsyncPipeline.get\_component
 616  
 617  ```python
 618  def get_component(name: str) -> Component
 619  ```
 620  
 621  Get the component with the specified name from the pipeline.
 622  
 623  **Arguments**:
 624  
 625  - `name`: The name of the component.
 626  
 627  **Raises**:
 628  
 629  - `ValueError`: If a component with that name is not present in the pipeline.
 630  
 631  **Returns**:
 632  
 633  The instance of that component.
 634  
 635  <a id="async_pipeline.AsyncPipeline.get_component_name"></a>
 636  
 637  #### AsyncPipeline.get\_component\_name
 638  
 639  ```python
 640  def get_component_name(instance: Component) -> str
 641  ```
 642  
 643  Returns the name of the Component instance if it has been added to this Pipeline or an empty string otherwise.
 644  
 645  **Arguments**:
 646  
 647  - `instance`: The Component instance to look for.
 648  
 649  **Returns**:
 650  
 651  The name of the Component instance.
 652  
 653  <a id="async_pipeline.AsyncPipeline.inputs"></a>
 654  
 655  #### AsyncPipeline.inputs
 656  
 657  ```python
 658  def inputs(
 659      include_components_with_connected_inputs: bool = False
 660  ) -> dict[str, dict[str, Any]]
 661  ```
 662  
 663  Returns a dictionary containing the inputs of a pipeline.
 664  
 665  Each key in the dictionary corresponds to a component name, and its value is another dictionary that describes
 666  the input sockets of that component, including their types and whether they are optional.
 667  
 668  **Arguments**:
 669  
 670  - `include_components_with_connected_inputs`: If `False`, only components that have disconnected input edges are
 671  included in the output.
 672  
 673  **Returns**:
 674  
 675  A dictionary where each key is a pipeline component name and each value is a dictionary of
 676  inputs sockets of that component.
 677  
 678  <a id="async_pipeline.AsyncPipeline.outputs"></a>
 679  
 680  #### AsyncPipeline.outputs
 681  
 682  ```python
 683  def outputs(
 684      include_components_with_connected_outputs: bool = False
 685  ) -> dict[str, dict[str, Any]]
 686  ```
 687  
 688  Returns a dictionary containing the outputs of a pipeline.
 689  
 690  Each key in the dictionary corresponds to a component name, and its value is another dictionary that describes
 691  the output sockets of that component.
 692  
 693  **Arguments**:
 694  
 695  - `include_components_with_connected_outputs`: If `False`, only components that have disconnected output edges are
 696  included in the output.
 697  
 698  **Returns**:
 699  
 700  A dictionary where each key is a pipeline component name and each value is a dictionary of
 701  output sockets of that component.
 702  
 703  <a id="async_pipeline.AsyncPipeline.show"></a>
 704  
 705  #### AsyncPipeline.show
 706  
 707  ```python
 708  def show(*,
 709           server_url: str = "https://mermaid.ink",
 710           params: dict | None = None,
 711           timeout: int = 30,
 712           super_component_expansion: bool = False) -> None
 713  ```
 714  
 715  Display an image representing this `Pipeline` in a Jupyter notebook.
 716  
 717  This function generates a diagram of the `Pipeline` using a Mermaid server and displays it directly in
 718  the notebook.
 719  
 720  **Arguments**:
 721  
 722  - `server_url`: The base URL of the Mermaid server used for rendering (default: 'https://mermaid.ink').
 723  See https://github.com/jihchi/mermaid.ink and https://github.com/mermaid-js/mermaid-live-editor for more
 724  info on how to set up your own Mermaid server.
 725  - `params`: Dictionary of customization parameters to modify the output. Refer to Mermaid documentation for more details
 726  Supported keys:
 727  - format: Output format ('img', 'svg', or 'pdf'). Default: 'img'.
 728  - type: Image type for /img endpoint ('jpeg', 'png', 'webp'). Default: 'png'.
 729  - theme: Mermaid theme ('default', 'neutral', 'dark', 'forest'). Default: 'neutral'.
 730  - bgColor: Background color in hexadecimal (e.g., 'FFFFFF') or named format (e.g., '!white').
 731  - width: Width of the output image (integer).
 732  - height: Height of the output image (integer).
 733  - scale: Scaling factor (1–3). Only applicable if 'width' or 'height' is specified.
 734  - fit: Whether to fit the diagram size to the page (PDF only, boolean).
 735  - paper: Paper size for PDFs (e.g., 'a4', 'a3'). Ignored if 'fit' is true.
 736  - landscape: Landscape orientation for PDFs (boolean). Ignored if 'fit' is true.
 737  - `timeout`: Timeout in seconds for the request to the Mermaid server.
 738  - `super_component_expansion`: If set to True and the pipeline contains SuperComponents the diagram will show the internal structure of
 739  super-components as if they were components part of the pipeline instead of a "black-box".
 740  Otherwise, only the super-component itself will be displayed.
 741  
 742  **Raises**:
 743  
 744  - `PipelineDrawingError`: If the function is called outside of a Jupyter notebook or if there is an issue with rendering.
 745  
 746  <a id="async_pipeline.AsyncPipeline.draw"></a>
 747  
 748  #### AsyncPipeline.draw
 749  
 750  ```python
 751  def draw(*,
 752           path: Path,
 753           server_url: str = "https://mermaid.ink",
 754           params: dict | None = None,
 755           timeout: int = 30,
 756           super_component_expansion: bool = False) -> None
 757  ```
 758  
 759  Save an image representing this `Pipeline` to the specified file path.
 760  
 761  This function generates a diagram of the `Pipeline` using the Mermaid server and saves it to the provided path.
 762  
 763  **Arguments**:
 764  
 765  - `path`: The file path where the generated image will be saved.
 766  - `server_url`: The base URL of the Mermaid server used for rendering (default: 'https://mermaid.ink').
 767  See https://github.com/jihchi/mermaid.ink and https://github.com/mermaid-js/mermaid-live-editor for more
 768  info on how to set up your own Mermaid server.
 769  - `params`: Dictionary of customization parameters to modify the output. Refer to Mermaid documentation for more details
 770  Supported keys:
 771  - format: Output format ('img', 'svg', or 'pdf'). Default: 'img'.
 772  - type: Image type for /img endpoint ('jpeg', 'png', 'webp'). Default: 'png'.
 773  - theme: Mermaid theme ('default', 'neutral', 'dark', 'forest'). Default: 'neutral'.
 774  - bgColor: Background color in hexadecimal (e.g., 'FFFFFF') or named format (e.g., '!white').
 775  - width: Width of the output image (integer).
 776  - height: Height of the output image (integer).
 777  - scale: Scaling factor (1–3). Only applicable if 'width' or 'height' is specified.
 778  - fit: Whether to fit the diagram size to the page (PDF only, boolean).
 779  - paper: Paper size for PDFs (e.g., 'a4', 'a3'). Ignored if 'fit' is true.
 780  - landscape: Landscape orientation for PDFs (boolean). Ignored if 'fit' is true.
 781  - `timeout`: Timeout in seconds for the request to the Mermaid server.
 782  - `super_component_expansion`: If set to True and the pipeline contains SuperComponents the diagram will show the internal structure of
 783  super-components as if they were components part of the pipeline instead of a "black-box".
 784  Otherwise, only the super-component itself will be displayed.
 785  
 786  **Raises**:
 787  
 788  - `PipelineDrawingError`: If there is an issue with rendering or saving the image.
 789  
 790  <a id="async_pipeline.AsyncPipeline.walk"></a>
 791  
 792  #### AsyncPipeline.walk
 793  
 794  ```python
 795  def walk() -> Iterator[tuple[str, Component]]
 796  ```
 797  
 798  Visits each component in the pipeline exactly once and yields its name and instance.
 799  
 800  No guarantees are provided on the visiting order.
 801  
 802  **Returns**:
 803  
 804  An iterator of tuples of component name and component instance.
 805  
 806  <a id="async_pipeline.AsyncPipeline.warm_up"></a>
 807  
 808  #### AsyncPipeline.warm\_up
 809  
 810  ```python
 811  def warm_up() -> None
 812  ```
 813  
 814  Make sure all nodes are warm.
 815  
 816  It's the node's responsibility to make sure this method can be called at every `Pipeline.run()`
 817  without re-initializing everything.
 818  
 819  <a id="async_pipeline.AsyncPipeline.validate_input"></a>
 820  
 821  #### AsyncPipeline.validate\_input
 822  
 823  ```python
 824  def validate_input(data: dict[str, Any]) -> None
 825  ```
 826  
 827  Validates pipeline input data.
 828  
 829  Validates that data:
 830  * Each Component name actually exists in the Pipeline
 831  * Each Component is not missing any input
 832  * Each Component has only one input per input socket, if not variadic
 833  * Each Component doesn't receive inputs that are already sent by another Component
 834  
 835  **Arguments**:
 836  
 837  - `data`: A dictionary of inputs for the pipeline's components. Each key is a component name.
 838  
 839  **Raises**:
 840  
 841  - `ValueError`: If inputs are invalid according to the above.
 842  
 843  <a id="async_pipeline.AsyncPipeline.from_template"></a>
 844  
 845  #### AsyncPipeline.from\_template
 846  
 847  ```python
 848  @classmethod
 849  def from_template(
 850          cls,
 851          predefined_pipeline: PredefinedPipeline,
 852          template_params: dict[str, Any] | None = None) -> "PipelineBase"
 853  ```
 854  
 855  Create a Pipeline from a predefined template. See `PredefinedPipeline` for available options.
 856  
 857  **Arguments**:
 858  
 859  - `predefined_pipeline`: The predefined pipeline to use.
 860  - `template_params`: An optional dictionary of parameters to use when rendering the pipeline template.
 861  
 862  **Returns**:
 863  
 864  An instance of `Pipeline`.
 865  
 866  <a id="async_pipeline.AsyncPipeline.validate_pipeline"></a>
 867  
 868  #### AsyncPipeline.validate\_pipeline
 869  
 870  ```python
 871  @staticmethod
 872  def validate_pipeline(priority_queue: FIFOPriorityQueue) -> None
 873  ```
 874  
 875  Validate the pipeline to check if it is blocked or has no valid entry point.
 876  
 877  **Arguments**:
 878  
 879  - `priority_queue`: Priority queue of component names.
 880  
 881  **Raises**:
 882  
 883  - `PipelineRuntimeError`: If the pipeline is blocked or has no valid entry point.
 884  
 885  <a id="pipeline"></a>
 886  
 887  ## Module pipeline
 888  
 889  <a id="pipeline.Pipeline"></a>
 890  
 891  ### Pipeline
 892  
 893  Synchronous version of the orchestration engine.
 894  
 895  Orchestrates component execution according to the execution graph, one after the other.
 896  
 897  <a id="pipeline.Pipeline.run"></a>
 898  
 899  #### Pipeline.run
 900  
 901  ```python
 902  def run(data: dict[str, Any],
 903          include_outputs_from: set[str] | None = None,
 904          *,
 905          break_point: Breakpoint | AgentBreakpoint | None = None,
 906          pipeline_snapshot: PipelineSnapshot | None = None,
 907          snapshot_callback: SnapshotCallback | None = None) -> dict[str, Any]
 908  ```
 909  
 910  Runs the Pipeline with given input data.
 911  
 912  Usage:
 913  ```python
 914  from haystack import Pipeline, Document
 915  from haystack.utils import Secret
 916  from haystack.document_stores.in_memory import InMemoryDocumentStore
 917  from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
 918  from haystack.components.generators import OpenAIGenerator
 919  from haystack.components.builders.answer_builder import AnswerBuilder
 920  from haystack.components.builders.prompt_builder import PromptBuilder
 921  
 922  # Write documents to InMemoryDocumentStore
 923  document_store = InMemoryDocumentStore()
 924  document_store.write_documents([
 925      Document(content="My name is Jean and I live in Paris."),
 926      Document(content="My name is Mark and I live in Berlin."),
 927      Document(content="My name is Giorgio and I live in Rome.")
 928  ])
 929  
 930  prompt_template = """
 931  Given these documents, answer the question.
 932  Documents:
 933  {% for doc in documents %}
 934      {{ doc.content }}
 935  {% endfor %}
 936  Question: {{question}}
 937  Answer:
 938  """
 939  
 940  retriever = InMemoryBM25Retriever(document_store=document_store)
 941  prompt_builder = PromptBuilder(template=prompt_template)
 942  llm = OpenAIGenerator(api_key=Secret.from_token(api_key))
 943  
 944  rag_pipeline = Pipeline()
 945  rag_pipeline.add_component("retriever", retriever)
 946  rag_pipeline.add_component("prompt_builder", prompt_builder)
 947  rag_pipeline.add_component("llm", llm)
 948  rag_pipeline.connect("retriever", "prompt_builder.documents")
 949  rag_pipeline.connect("prompt_builder", "llm")
 950  
 951  # Ask a question
 952  question = "Who lives in Paris?"
 953  results = rag_pipeline.run(
 954      {
 955          "retriever": {"query": question},
 956          "prompt_builder": {"question": question},
 957      }
 958  )
 959  
 960  print(results["llm"]["replies"])
 961  # Jean lives in Paris
 962  ```
 963  
 964  **Arguments**:
 965  
 966  - `data`: A dictionary of inputs for the pipeline's components. Each key is a component name
 967  and its value is a dictionary of that component's input parameters:
 968  ```
 969  data = {
 970      "comp1": {"input1": 1, "input2": 2},
 971  }
 972  ```
 973  For convenience, this format is also supported when input names are unique:
 974  ```
 975  data = {
 976      "input1": 1, "input2": 2,
 977  }
 978  ```
 979  - `include_outputs_from`: Set of component names whose individual outputs are to be
 980  included in the pipeline's output. For components that are
 981  invoked multiple times (in a loop), only the last-produced
 982  output is included.
 983  - `break_point`: A set of breakpoints that can be used to debug the pipeline execution.
 984  - `pipeline_snapshot`: A dictionary containing a snapshot of a previously saved pipeline execution.
 985  - `snapshot_callback`: Optional callback function that is invoked when a pipeline snapshot is created.
 986  The callback receives a `PipelineSnapshot` object and can return an optional string
 987  (e.g., a file path or identifier).
 988  If provided, the callback is used instead of the default file-saving behavior,
 989  allowing custom handling of snapshots (e.g., saving to a database, sending to a remote service).
 990  If not provided, the default behavior saves snapshots to a JSON file.
 991  
 992  **Raises**:
 993  
 994  - `ValueError`: If invalid inputs are provided to the pipeline.
 995  - `PipelineRuntimeError`: If the Pipeline contains cycles with unsupported connections that would cause
 996  it to get stuck and fail running.
 997  Or if a Component fails or returns output in an unsupported type.
 998  - `PipelineMaxComponentRuns`: If a Component reaches the maximum number of times it can be run in this Pipeline.
 999  - `PipelineBreakpointException`: When a pipeline_breakpoint is triggered. Contains the component name, state, and partial results.
1000  
1001  **Returns**:
1002  
1003  A dictionary where each entry corresponds to a component name
1004  and its output. If `include_outputs_from` is `None`, this dictionary
1005  will only contain the outputs of leaf components, i.e., components
1006  without outgoing connections.
1007  
1008  <a id="pipeline.Pipeline.__init__"></a>
1009  
1010  #### Pipeline.\_\_init\_\_
1011  
1012  ```python
1013  def __init__(metadata: dict[str, Any] | None = None,
1014               max_runs_per_component: int = 100,
1015               connection_type_validation: bool = True)
1016  ```
1017  
1018  Creates the Pipeline.
1019  
1020  **Arguments**:
1021  
1022  - `metadata`: Arbitrary dictionary to store metadata about this `Pipeline`. Make sure all the values contained in
1023  this dictionary can be serialized and deserialized if you wish to save this `Pipeline` to file.
1024  - `max_runs_per_component`: How many times the `Pipeline` can run the same Component.
1025  If this limit is reached a `PipelineMaxComponentRuns` exception is raised.
1026  If not set defaults to 100 runs per Component.
1027  - `connection_type_validation`: Whether the pipeline will validate the types of the connections.
1028  Defaults to True.
1029  
1030  <a id="pipeline.Pipeline.__eq__"></a>
1031  
1032  #### Pipeline.\_\_eq\_\_
1033  
1034  ```python
1035  def __eq__(other: object) -> bool
1036  ```
1037  
1038  Pipeline equality is defined by their type and the equality of their serialized form.
1039  
1040  Pipelines of the same type share every metadata, node and edge, but they're not required to use
1041  the same node instances: this allows pipeline saved and then loaded back to be equal to themselves.
1042  
1043  <a id="pipeline.Pipeline.__repr__"></a>
1044  
1045  #### Pipeline.\_\_repr\_\_
1046  
1047  ```python
1048  def __repr__() -> str
1049  ```
1050  
1051  Returns a text representation of the Pipeline.
1052  
1053  <a id="pipeline.Pipeline.to_dict"></a>
1054  
1055  #### Pipeline.to\_dict
1056  
1057  ```python
1058  def to_dict() -> dict[str, Any]
1059  ```
1060  
1061  Serializes the pipeline to a dictionary.
1062  
1063  This is meant to be an intermediate representation but it can be also used to save a pipeline to file.
1064  
1065  **Returns**:
1066  
1067  Dictionary with serialized data.
1068  
1069  <a id="pipeline.Pipeline.from_dict"></a>
1070  
1071  #### Pipeline.from\_dict
1072  
1073  ```python
1074  @classmethod
1075  def from_dict(cls: type[T],
1076                data: dict[str, Any],
1077                callbacks: DeserializationCallbacks | None = None,
1078                **kwargs: Any) -> T
1079  ```
1080  
1081  Deserializes the pipeline from a dictionary.
1082  
1083  **Arguments**:
1084  
1085  - `data`: Dictionary to deserialize from.
1086  - `callbacks`: Callbacks to invoke during deserialization.
1087  - `kwargs`: `components`: a dictionary of `{name: instance}` to reuse instances of components instead of creating new
1088  ones.
1089  
1090  **Returns**:
1091  
1092  Deserialized component.
1093  
1094  <a id="pipeline.Pipeline.dumps"></a>
1095  
1096  #### Pipeline.dumps
1097  
1098  ```python
1099  def dumps(marshaller: Marshaller = DEFAULT_MARSHALLER) -> str
1100  ```
1101  
1102  Returns the string representation of this pipeline according to the format dictated by the `Marshaller` in use.
1103  
1104  **Arguments**:
1105  
1106  - `marshaller`: The Marshaller used to create the string representation. Defaults to `YamlMarshaller`.
1107  
1108  **Returns**:
1109  
1110  A string representing the pipeline.
1111  
1112  <a id="pipeline.Pipeline.dump"></a>
1113  
1114  #### Pipeline.dump
1115  
1116  ```python
1117  def dump(fp: TextIO, marshaller: Marshaller = DEFAULT_MARSHALLER) -> None
1118  ```
1119  
1120  Writes the string representation of this pipeline to the file-like object passed in the `fp` argument.
1121  
1122  **Arguments**:
1123  
1124  - `fp`: A file-like object ready to be written to.
1125  - `marshaller`: The Marshaller used to create the string representation. Defaults to `YamlMarshaller`.
1126  
1127  <a id="pipeline.Pipeline.loads"></a>
1128  
1129  #### Pipeline.loads
1130  
1131  ```python
1132  @classmethod
1133  def loads(cls: type[T],
1134            data: str | bytes | bytearray,
1135            marshaller: Marshaller = DEFAULT_MARSHALLER,
1136            callbacks: DeserializationCallbacks | None = None) -> T
1137  ```
1138  
1139  Creates a `Pipeline` object from the string representation passed in the `data` argument.
1140  
1141  **Arguments**:
1142  
1143  - `data`: The string representation of the pipeline, can be `str`, `bytes` or `bytearray`.
1144  - `marshaller`: The Marshaller used to create the string representation. Defaults to `YamlMarshaller`.
1145  - `callbacks`: Callbacks to invoke during deserialization.
1146  
1147  **Raises**:
1148  
1149  - `DeserializationError`: If an error occurs during deserialization.
1150  
1151  **Returns**:
1152  
1153  A `Pipeline` object.
1154  
1155  <a id="pipeline.Pipeline.load"></a>
1156  
1157  #### Pipeline.load
1158  
1159  ```python
1160  @classmethod
1161  def load(cls: type[T],
1162           fp: TextIO,
1163           marshaller: Marshaller = DEFAULT_MARSHALLER,
1164           callbacks: DeserializationCallbacks | None = None) -> T
1165  ```
1166  
1167  Creates a `Pipeline` object a string representation.
1168  
1169  The string representation is read from the file-like object passed in the `fp` argument.
1170  
1171  **Arguments**:
1172  
1173  - `fp`: A file-like object ready to be read from.
1174  - `marshaller`: The Marshaller used to create the string representation. Defaults to `YamlMarshaller`.
1175  - `callbacks`: Callbacks to invoke during deserialization.
1176  
1177  **Raises**:
1178  
1179  - `DeserializationError`: If an error occurs during deserialization.
1180  
1181  **Returns**:
1182  
1183  A `Pipeline` object.
1184  
1185  <a id="pipeline.Pipeline.add_component"></a>
1186  
1187  #### Pipeline.add\_component
1188  
1189  ```python
1190  def add_component(name: str, instance: Component) -> None
1191  ```
1192  
1193  Add the given component to the pipeline.
1194  
1195  Components are not connected to anything by default: use `Pipeline.connect()` to connect components together.
1196  Component names must be unique, but component instances can be reused if needed.
1197  
1198  **Arguments**:
1199  
1200  - `name`: The name of the component to add.
1201  - `instance`: The component instance to add.
1202  
1203  **Raises**:
1204  
1205  - `ValueError`: If a component with the same name already exists.
1206  - `PipelineValidationError`: If the given instance is not a component.
1207  
1208  <a id="pipeline.Pipeline.remove_component"></a>
1209  
1210  #### Pipeline.remove\_component
1211  
1212  ```python
1213  def remove_component(name: str) -> Component
1214  ```
1215  
1216  Remove and returns component from the pipeline.
1217  
1218  Remove an existing component from the pipeline by providing its name.
1219  All edges that connect to the component will also be deleted.
1220  
1221  **Arguments**:
1222  
1223  - `name`: The name of the component to remove.
1224  
1225  **Raises**:
1226  
1227  - `ValueError`: If there is no component with that name already in the Pipeline.
1228  
1229  **Returns**:
1230  
1231  The removed Component instance.
1232  
1233  <a id="pipeline.Pipeline.connect"></a>
1234  
1235  #### Pipeline.connect
1236  
1237  ```python
1238  def connect(sender: str, receiver: str) -> "PipelineBase"
1239  ```
1240  
1241  Connects two components together.
1242  
1243  All components to connect must exist in the pipeline.
1244  If connecting to a component that has several output connections, specify the inputs and output names as
1245  'component_name.connections_name'.
1246  
1247  **Arguments**:
1248  
1249  - `sender`: The component that delivers the value. This can be either just a component name or can be
1250  in the format `component_name.connection_name` if the component has multiple outputs.
1251  - `receiver`: The component that receives the value. This can be either just a component name or can be
1252  in the format `component_name.connection_name` if the component has multiple inputs.
1253  
1254  **Raises**:
1255  
1256  - `PipelineConnectError`: If the two components cannot be connected (for example if one of the components is
1257  not present in the pipeline, or the connections don't match by type, and so on).
1258  
1259  **Returns**:
1260  
1261  The Pipeline instance.
1262  
1263  <a id="pipeline.Pipeline.get_component"></a>
1264  
1265  #### Pipeline.get\_component
1266  
1267  ```python
1268  def get_component(name: str) -> Component
1269  ```
1270  
1271  Get the component with the specified name from the pipeline.
1272  
1273  **Arguments**:
1274  
1275  - `name`: The name of the component.
1276  
1277  **Raises**:
1278  
1279  - `ValueError`: If a component with that name is not present in the pipeline.
1280  
1281  **Returns**:
1282  
1283  The instance of that component.
1284  
1285  <a id="pipeline.Pipeline.get_component_name"></a>
1286  
1287  #### Pipeline.get\_component\_name
1288  
1289  ```python
1290  def get_component_name(instance: Component) -> str
1291  ```
1292  
1293  Returns the name of the Component instance if it has been added to this Pipeline or an empty string otherwise.
1294  
1295  **Arguments**:
1296  
1297  - `instance`: The Component instance to look for.
1298  
1299  **Returns**:
1300  
1301  The name of the Component instance.
1302  
1303  <a id="pipeline.Pipeline.inputs"></a>
1304  
1305  #### Pipeline.inputs
1306  
1307  ```python
1308  def inputs(
1309      include_components_with_connected_inputs: bool = False
1310  ) -> dict[str, dict[str, Any]]
1311  ```
1312  
1313  Returns a dictionary containing the inputs of a pipeline.
1314  
1315  Each key in the dictionary corresponds to a component name, and its value is another dictionary that describes
1316  the input sockets of that component, including their types and whether they are optional.
1317  
1318  **Arguments**:
1319  
1320  - `include_components_with_connected_inputs`: If `False`, only components that have disconnected input edges are
1321  included in the output.
1322  
1323  **Returns**:
1324  
1325  A dictionary where each key is a pipeline component name and each value is a dictionary of
1326  inputs sockets of that component.
1327  
1328  <a id="pipeline.Pipeline.outputs"></a>
1329  
1330  #### Pipeline.outputs
1331  
1332  ```python
1333  def outputs(
1334      include_components_with_connected_outputs: bool = False
1335  ) -> dict[str, dict[str, Any]]
1336  ```
1337  
1338  Returns a dictionary containing the outputs of a pipeline.
1339  
1340  Each key in the dictionary corresponds to a component name, and its value is another dictionary that describes
1341  the output sockets of that component.
1342  
1343  **Arguments**:
1344  
1345  - `include_components_with_connected_outputs`: If `False`, only components that have disconnected output edges are
1346  included in the output.
1347  
1348  **Returns**:
1349  
1350  A dictionary where each key is a pipeline component name and each value is a dictionary of
1351  output sockets of that component.
1352  
1353  <a id="pipeline.Pipeline.show"></a>
1354  
1355  #### Pipeline.show
1356  
1357  ```python
1358  def show(*,
1359           server_url: str = "https://mermaid.ink",
1360           params: dict | None = None,
1361           timeout: int = 30,
1362           super_component_expansion: bool = False) -> None
1363  ```
1364  
1365  Display an image representing this `Pipeline` in a Jupyter notebook.
1366  
1367  This function generates a diagram of the `Pipeline` using a Mermaid server and displays it directly in
1368  the notebook.
1369  
1370  **Arguments**:
1371  
1372  - `server_url`: The base URL of the Mermaid server used for rendering (default: 'https://mermaid.ink').
1373  See https://github.com/jihchi/mermaid.ink and https://github.com/mermaid-js/mermaid-live-editor for more
1374  info on how to set up your own Mermaid server.
1375  - `params`: Dictionary of customization parameters to modify the output. Refer to Mermaid documentation for more details
1376  Supported keys:
1377  - format: Output format ('img', 'svg', or 'pdf'). Default: 'img'.
1378  - type: Image type for /img endpoint ('jpeg', 'png', 'webp'). Default: 'png'.
1379  - theme: Mermaid theme ('default', 'neutral', 'dark', 'forest'). Default: 'neutral'.
1380  - bgColor: Background color in hexadecimal (e.g., 'FFFFFF') or named format (e.g., '!white').
1381  - width: Width of the output image (integer).
1382  - height: Height of the output image (integer).
1383  - scale: Scaling factor (1–3). Only applicable if 'width' or 'height' is specified.
1384  - fit: Whether to fit the diagram size to the page (PDF only, boolean).
1385  - paper: Paper size for PDFs (e.g., 'a4', 'a3'). Ignored if 'fit' is true.
1386  - landscape: Landscape orientation for PDFs (boolean). Ignored if 'fit' is true.
1387  - `timeout`: Timeout in seconds for the request to the Mermaid server.
1388  - `super_component_expansion`: If set to True and the pipeline contains SuperComponents the diagram will show the internal structure of
1389  super-components as if they were components part of the pipeline instead of a "black-box".
1390  Otherwise, only the super-component itself will be displayed.
1391  
1392  **Raises**:
1393  
1394  - `PipelineDrawingError`: If the function is called outside of a Jupyter notebook or if there is an issue with rendering.
1395  
1396  <a id="pipeline.Pipeline.draw"></a>
1397  
1398  #### Pipeline.draw
1399  
1400  ```python
1401  def draw(*,
1402           path: Path,
1403           server_url: str = "https://mermaid.ink",
1404           params: dict | None = None,
1405           timeout: int = 30,
1406           super_component_expansion: bool = False) -> None
1407  ```
1408  
1409  Save an image representing this `Pipeline` to the specified file path.
1410  
1411  This function generates a diagram of the `Pipeline` using the Mermaid server and saves it to the provided path.
1412  
1413  **Arguments**:
1414  
1415  - `path`: The file path where the generated image will be saved.
1416  - `server_url`: The base URL of the Mermaid server used for rendering (default: 'https://mermaid.ink').
1417  See https://github.com/jihchi/mermaid.ink and https://github.com/mermaid-js/mermaid-live-editor for more
1418  info on how to set up your own Mermaid server.
1419  - `params`: Dictionary of customization parameters to modify the output. Refer to Mermaid documentation for more details
1420  Supported keys:
1421  - format: Output format ('img', 'svg', or 'pdf'). Default: 'img'.
1422  - type: Image type for /img endpoint ('jpeg', 'png', 'webp'). Default: 'png'.
1423  - theme: Mermaid theme ('default', 'neutral', 'dark', 'forest'). Default: 'neutral'.
1424  - bgColor: Background color in hexadecimal (e.g., 'FFFFFF') or named format (e.g., '!white').
1425  - width: Width of the output image (integer).
1426  - height: Height of the output image (integer).
1427  - scale: Scaling factor (1–3). Only applicable if 'width' or 'height' is specified.
1428  - fit: Whether to fit the diagram size to the page (PDF only, boolean).
1429  - paper: Paper size for PDFs (e.g., 'a4', 'a3'). Ignored if 'fit' is true.
1430  - landscape: Landscape orientation for PDFs (boolean). Ignored if 'fit' is true.
1431  - `timeout`: Timeout in seconds for the request to the Mermaid server.
1432  - `super_component_expansion`: If set to True and the pipeline contains SuperComponents the diagram will show the internal structure of
1433  super-components as if they were components part of the pipeline instead of a "black-box".
1434  Otherwise, only the super-component itself will be displayed.
1435  
1436  **Raises**:
1437  
1438  - `PipelineDrawingError`: If there is an issue with rendering or saving the image.
1439  
1440  <a id="pipeline.Pipeline.walk"></a>
1441  
1442  #### Pipeline.walk
1443  
1444  ```python
1445  def walk() -> Iterator[tuple[str, Component]]
1446  ```
1447  
1448  Visits each component in the pipeline exactly once and yields its name and instance.
1449  
1450  No guarantees are provided on the visiting order.
1451  
1452  **Returns**:
1453  
1454  An iterator of tuples of component name and component instance.
1455  
1456  <a id="pipeline.Pipeline.warm_up"></a>
1457  
1458  #### Pipeline.warm\_up
1459  
1460  ```python
1461  def warm_up() -> None
1462  ```
1463  
1464  Make sure all nodes are warm.
1465  
1466  It's the node's responsibility to make sure this method can be called at every `Pipeline.run()`
1467  without re-initializing everything.
1468  
1469  <a id="pipeline.Pipeline.validate_input"></a>
1470  
1471  #### Pipeline.validate\_input
1472  
1473  ```python
1474  def validate_input(data: dict[str, Any]) -> None
1475  ```
1476  
1477  Validates pipeline input data.
1478  
1479  Validates that data:
1480  * Each Component name actually exists in the Pipeline
1481  * Each Component is not missing any input
1482  * Each Component has only one input per input socket, if not variadic
1483  * Each Component doesn't receive inputs that are already sent by another Component
1484  
1485  **Arguments**:
1486  
1487  - `data`: A dictionary of inputs for the pipeline's components. Each key is a component name.
1488  
1489  **Raises**:
1490  
1491  - `ValueError`: If inputs are invalid according to the above.
1492  
1493  <a id="pipeline.Pipeline.from_template"></a>
1494  
1495  #### Pipeline.from\_template
1496  
1497  ```python
1498  @classmethod
1499  def from_template(
1500          cls,
1501          predefined_pipeline: PredefinedPipeline,
1502          template_params: dict[str, Any] | None = None) -> "PipelineBase"
1503  ```
1504  
1505  Create a Pipeline from a predefined template. See `PredefinedPipeline` for available options.
1506  
1507  **Arguments**:
1508  
1509  - `predefined_pipeline`: The predefined pipeline to use.
1510  - `template_params`: An optional dictionary of parameters to use when rendering the pipeline template.
1511  
1512  **Returns**:
1513  
1514  An instance of `Pipeline`.
1515  
1516  <a id="pipeline.Pipeline.validate_pipeline"></a>
1517  
1518  #### Pipeline.validate\_pipeline
1519  
1520  ```python
1521  @staticmethod
1522  def validate_pipeline(priority_queue: FIFOPriorityQueue) -> None
1523  ```
1524  
1525  Validate the pipeline to check if it is blocked or has no valid entry point.
1526  
1527  **Arguments**:
1528  
1529  - `priority_queue`: Priority queue of component names.
1530  
1531  **Raises**:
1532  
1533  - `PipelineRuntimeError`: If the pipeline is blocked or has no valid entry point.
1534