/ haystack / tools / pipeline_tool.py
pipeline_tool.py
  1  # SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
  2  #
  3  # SPDX-License-Identifier: Apache-2.0
  4  
  5  from collections.abc import Callable
  6  from typing import Any
  7  
  8  from haystack import AsyncPipeline, Pipeline, SuperComponent, logging
  9  from haystack.core.serialization import generate_qualified_class_name
 10  from haystack.tools.component_tool import ComponentTool
 11  from haystack.tools.tool import (
 12      _deserialize_outputs_to_state,
 13      _deserialize_outputs_to_string,
 14      _serialize_outputs_to_state,
 15      _serialize_outputs_to_string,
 16  )
 17  
 18  logger = logging.getLogger(__name__)
 19  
 20  
 21  class PipelineTool(ComponentTool):
 22      """
 23      A Tool that wraps Haystack Pipelines, allowing them to be used as tools by LLMs.
 24  
 25      PipelineTool automatically generates LLM-compatible tool schemas from pipeline input sockets,
 26      which are derived from the underlying components in the pipeline.
 27  
 28      Key features:
 29      - Automatic LLM tool calling schema generation from pipeline inputs
 30      - Description extraction of pipeline inputs based on the underlying component docstrings
 31  
 32      To use PipelineTool, you first need a Haystack pipeline.
 33      Below is an example of creating a PipelineTool
 34  
 35      ## Usage Example:
 36  
 37      ```python
 38      from haystack import Document, Pipeline
 39      from haystack.dataclasses import ChatMessage
 40      from haystack.document_stores.in_memory import InMemoryDocumentStore
 41      from haystack.components.embedders.sentence_transformers_text_embedder import SentenceTransformersTextEmbedder
 42      from haystack.components.embedders.sentence_transformers_document_embedder import (
 43          SentenceTransformersDocumentEmbedder
 44      )
 45      from haystack.components.generators.chat import OpenAIChatGenerator
 46      from haystack.components.retrievers import InMemoryEmbeddingRetriever
 47      from haystack.components.agents import Agent
 48      from haystack.tools import PipelineTool
 49  
 50      # Initialize a document store and add some documents
 51      document_store = InMemoryDocumentStore()
 52      document_embedder = SentenceTransformersDocumentEmbedder(model="sentence-transformers/all-MiniLM-L6-v2")
 53      documents = [
 54          Document(content="Nikola Tesla was a Serbian-American inventor and electrical engineer."),
 55          Document(
 56              content="He is best known for his contributions to the design of the modern alternating current (AC) "
 57                      "electricity supply system."
 58          ),
 59      ]
 60      docs_with_embeddings = document_embedder.run(documents=documents)["documents"]
 61      document_store.write_documents(docs_with_embeddings)
 62  
 63      # Build a simple retrieval pipeline
 64      retrieval_pipeline = Pipeline()
 65      retrieval_pipeline.add_component(
 66          "embedder", SentenceTransformersTextEmbedder(model="sentence-transformers/all-MiniLM-L6-v2")
 67      )
 68      retrieval_pipeline.add_component("retriever", InMemoryEmbeddingRetriever(document_store=document_store))
 69  
 70      retrieval_pipeline.connect("embedder.embedding", "retriever.query_embedding")
 71  
 72      # Wrap the pipeline as a tool
 73      retriever_tool = PipelineTool(
 74          pipeline=retrieval_pipeline,
 75          input_mapping={"query": ["embedder.text"]},
 76          output_mapping={"retriever.documents": "documents"},
 77          name="document_retriever",
 78          description="For any questions about Nikola Tesla, always use this tool",
 79      )
 80  
 81      # Create an Agent with the tool
 82      agent = Agent(
 83          chat_generator=OpenAIChatGenerator(model="gpt-4.1-mini"),
 84          tools=[retriever_tool]
 85      )
 86  
 87      # Let the Agent handle a query
 88      result = agent.run([ChatMessage.from_user("Who was Nikola Tesla?")])
 89  
 90      # Print result of the tool call
 91      print("Tool Call Result:")
 92      print(result["messages"][2].tool_call_result.result)
 93      print("")
 94  
 95      # Print answer
 96      print("Answer:")
 97      print(result["messages"][-1].text)
 98      ```
 99      """
100  
101      def __init__(
102          self,
103          pipeline: Pipeline | AsyncPipeline,
104          *,
105          name: str,
106          description: str,
107          input_mapping: dict[str, list[str]] | None = None,
108          output_mapping: dict[str, str] | None = None,
109          parameters: dict[str, Any] | None = None,
110          outputs_to_string: dict[str, str | Callable[[Any], str]] | None = None,
111          inputs_from_state: dict[str, str] | None = None,
112          outputs_to_state: dict[str, dict[str, str | Callable]] | None = None,
113      ) -> None:
114          """
115          Create a Tool instance from a Haystack pipeline.
116  
117          :param pipeline: The Haystack pipeline to wrap as a tool.
118          :param name: Name of the tool.
119          :param description: Description of the tool.
120          :param input_mapping: A dictionary mapping component input names to pipeline input socket paths.
121              If not provided, a default input mapping will be created based on all pipeline inputs.
122              Example:
123              ```python
124              input_mapping={
125                  "query": ["retriever.query", "prompt_builder.query"],
126              }
127              ```
128          :param output_mapping: A dictionary mapping pipeline output socket paths to component output names.
129              If not provided, a default output mapping will be created based on all pipeline outputs.
130              Example:
131              ```python
132              output_mapping={
133                  "retriever.documents": "documents",
134                  "generator.replies": "replies",
135              }
136              ```
137          :param parameters:
138              A JSON schema defining the parameters expected by the Tool.
139              Will fall back to the parameters defined in the component's run method signature if not provided.
140          :param outputs_to_string:
141              Optional dictionary defining how tool outputs should be converted into string(s) or results.
142              If not provided, the tool result is converted to a string using a default handler.
143  
144              `outputs_to_string` supports two formats:
145  
146              1. Single output format - use "source", "handler", and/or "raw_result" at the root level:
147                  ```python
148                  {
149                      "source": "docs", "handler": format_documents, "raw_result": False
150                  }
151                  ```
152                  - `source`: If provided, only the specified output key is sent to the handler.
153                  - `handler`: A function that takes the tool output (or the extracted source value) and returns the
154                    final result.
155                  - `raw_result`: If `True`, the result is returned raw without string conversion, but applying the
156                     `handler` if provided. This is intended for tools that return images. In this mode, the Tool
157                     function or the `handler` function must return a list of `TextContent`/`ImageContent` objects to
158                     ensure compatibility with Chat Generators.
159  
160              2. Multiple output format - map keys to individual configurations:
161                  ```python
162                  {
163                      "formatted_docs": {"source": "docs", "handler": format_documents},
164                      "summary": {"source": "summary_text", "handler": str.upper}
165                  }
166                  ```
167                  Each key maps to a dictionary that can contain "source" and/or "handler".
168                  Note that `raw_result` is not supported in the multiple output format.
169          :param inputs_from_state:
170              Optional dictionary mapping state keys to tool parameter names.
171              Example: `{"repository": "repo"}` maps state's "repository" to tool's "repo" parameter.
172          :param outputs_to_state:
173              Optional dictionary defining how tool outputs map to keys within state as well as optional handlers.
174              If the source is provided only the specified output key is sent to the handler.
175              Example:
176              ```python
177              {
178                  "documents": {"source": "docs", "handler": custom_handler}
179              }
180              ```
181              If the source is omitted the whole tool result is sent to the handler.
182              Example:
183              ```python
184              {
185                  "documents": {"handler": custom_handler}
186              }
187              ```
188          :raises ValueError: If the provided pipeline is not a valid Haystack Pipeline instance.
189          """
190          if not isinstance(pipeline, (Pipeline, AsyncPipeline)):
191              raise TypeError(
192                  "The 'pipeline' parameter must be an instance of Pipeline or AsyncPipeline."
193                  f" Got {type(pipeline)} instead."
194              )
195  
196          super().__init__(
197              component=SuperComponent(pipeline=pipeline, input_mapping=input_mapping, output_mapping=output_mapping),
198              name=name,
199              description=description,
200              parameters=parameters,
201              outputs_to_string=outputs_to_string,
202              inputs_from_state=inputs_from_state,
203              outputs_to_state=outputs_to_state,
204          )
205          self._unresolved_parameters = parameters
206          self._pipeline = pipeline
207          self._input_mapping = input_mapping
208          self._output_mapping = output_mapping
209  
210      def to_dict(self) -> dict[str, Any]:
211          """
212          Serializes the PipelineTool to a dictionary.
213  
214          :returns:
215              The serialized dictionary representation of PipelineTool.
216          """
217          serialized: dict[str, Any] = {
218              "pipeline": self._pipeline.to_dict(),
219              "name": self.name,
220              "input_mapping": self._input_mapping,
221              "output_mapping": self._output_mapping,
222              "description": self.description,
223              "parameters": self._unresolved_parameters,
224              "inputs_from_state": self.inputs_from_state,
225              "is_pipeline_async": isinstance(self._pipeline, AsyncPipeline),
226              "outputs_to_state": _serialize_outputs_to_state(self.outputs_to_state) if self.outputs_to_state else None,
227              "outputs_to_string": _serialize_outputs_to_string(self.outputs_to_string)
228              if self.outputs_to_string
229              else None,
230          }
231  
232          return {"type": generate_qualified_class_name(type(self)), "data": serialized}
233  
234      @classmethod
235      def from_dict(cls, data: dict[str, Any]) -> "PipelineTool":
236          """
237          Deserializes the PipelineTool from a dictionary.
238  
239          :param data: The dictionary representation of PipelineTool.
240          :returns:
241              The deserialized PipelineTool instance.
242          """
243          inner_data = data["data"]
244          is_pipeline_async = inner_data.get("is_pipeline_async", False)
245          pipeline_class = AsyncPipeline if is_pipeline_async else Pipeline
246          pipeline = pipeline_class.from_dict(inner_data["pipeline"])
247  
248          if "outputs_to_state" in inner_data and inner_data["outputs_to_state"]:
249              inner_data["outputs_to_state"] = _deserialize_outputs_to_state(inner_data["outputs_to_state"])
250  
251          if inner_data.get("outputs_to_string") is not None:
252              inner_data["outputs_to_string"] = _deserialize_outputs_to_string(inner_data["outputs_to_string"])
253  
254          merged_data = {**inner_data, "pipeline": pipeline}
255          # Remove is_pipeline_async as it's not a parameter of the constructor
256          merged_data.pop("is_pipeline_async", None)
257          return cls(**merged_data)