pipeline_tool.py
1 # SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai> 2 # 3 # SPDX-License-Identifier: Apache-2.0 4 5 from collections.abc import Callable 6 from typing import Any 7 8 from haystack import AsyncPipeline, Pipeline, SuperComponent, logging 9 from haystack.core.serialization import generate_qualified_class_name 10 from haystack.tools.component_tool import ComponentTool 11 from haystack.tools.tool import ( 12 _deserialize_outputs_to_state, 13 _deserialize_outputs_to_string, 14 _serialize_outputs_to_state, 15 _serialize_outputs_to_string, 16 ) 17 18 logger = logging.getLogger(__name__) 19 20 21 class PipelineTool(ComponentTool): 22 """ 23 A Tool that wraps Haystack Pipelines, allowing them to be used as tools by LLMs. 24 25 PipelineTool automatically generates LLM-compatible tool schemas from pipeline input sockets, 26 which are derived from the underlying components in the pipeline. 27 28 Key features: 29 - Automatic LLM tool calling schema generation from pipeline inputs 30 - Description extraction of pipeline inputs based on the underlying component docstrings 31 32 To use PipelineTool, you first need a Haystack pipeline. 33 Below is an example of creating a PipelineTool 34 35 ## Usage Example: 36 37 ```python 38 from haystack import Document, Pipeline 39 from haystack.dataclasses import ChatMessage 40 from haystack.document_stores.in_memory import InMemoryDocumentStore 41 from haystack.components.embedders.sentence_transformers_text_embedder import SentenceTransformersTextEmbedder 42 from haystack.components.embedders.sentence_transformers_document_embedder import ( 43 SentenceTransformersDocumentEmbedder 44 ) 45 from haystack.components.generators.chat import OpenAIChatGenerator 46 from haystack.components.retrievers import InMemoryEmbeddingRetriever 47 from haystack.components.agents import Agent 48 from haystack.tools import PipelineTool 49 50 # Initialize a document store and add some documents 51 document_store = InMemoryDocumentStore() 52 document_embedder = SentenceTransformersDocumentEmbedder(model="sentence-transformers/all-MiniLM-L6-v2") 53 documents = [ 54 Document(content="Nikola Tesla was a Serbian-American inventor and electrical engineer."), 55 Document( 56 content="He is best known for his contributions to the design of the modern alternating current (AC) " 57 "electricity supply system." 58 ), 59 ] 60 docs_with_embeddings = document_embedder.run(documents=documents)["documents"] 61 document_store.write_documents(docs_with_embeddings) 62 63 # Build a simple retrieval pipeline 64 retrieval_pipeline = Pipeline() 65 retrieval_pipeline.add_component( 66 "embedder", SentenceTransformersTextEmbedder(model="sentence-transformers/all-MiniLM-L6-v2") 67 ) 68 retrieval_pipeline.add_component("retriever", InMemoryEmbeddingRetriever(document_store=document_store)) 69 70 retrieval_pipeline.connect("embedder.embedding", "retriever.query_embedding") 71 72 # Wrap the pipeline as a tool 73 retriever_tool = PipelineTool( 74 pipeline=retrieval_pipeline, 75 input_mapping={"query": ["embedder.text"]}, 76 output_mapping={"retriever.documents": "documents"}, 77 name="document_retriever", 78 description="For any questions about Nikola Tesla, always use this tool", 79 ) 80 81 # Create an Agent with the tool 82 agent = Agent( 83 chat_generator=OpenAIChatGenerator(model="gpt-4.1-mini"), 84 tools=[retriever_tool] 85 ) 86 87 # Let the Agent handle a query 88 result = agent.run([ChatMessage.from_user("Who was Nikola Tesla?")]) 89 90 # Print result of the tool call 91 print("Tool Call Result:") 92 print(result["messages"][2].tool_call_result.result) 93 print("") 94 95 # Print answer 96 print("Answer:") 97 print(result["messages"][-1].text) 98 ``` 99 """ 100 101 def __init__( 102 self, 103 pipeline: Pipeline | AsyncPipeline, 104 *, 105 name: str, 106 description: str, 107 input_mapping: dict[str, list[str]] | None = None, 108 output_mapping: dict[str, str] | None = None, 109 parameters: dict[str, Any] | None = None, 110 outputs_to_string: dict[str, str | Callable[[Any], str]] | None = None, 111 inputs_from_state: dict[str, str] | None = None, 112 outputs_to_state: dict[str, dict[str, str | Callable]] | None = None, 113 ) -> None: 114 """ 115 Create a Tool instance from a Haystack pipeline. 116 117 :param pipeline: The Haystack pipeline to wrap as a tool. 118 :param name: Name of the tool. 119 :param description: Description of the tool. 120 :param input_mapping: A dictionary mapping component input names to pipeline input socket paths. 121 If not provided, a default input mapping will be created based on all pipeline inputs. 122 Example: 123 ```python 124 input_mapping={ 125 "query": ["retriever.query", "prompt_builder.query"], 126 } 127 ``` 128 :param output_mapping: A dictionary mapping pipeline output socket paths to component output names. 129 If not provided, a default output mapping will be created based on all pipeline outputs. 130 Example: 131 ```python 132 output_mapping={ 133 "retriever.documents": "documents", 134 "generator.replies": "replies", 135 } 136 ``` 137 :param parameters: 138 A JSON schema defining the parameters expected by the Tool. 139 Will fall back to the parameters defined in the component's run method signature if not provided. 140 :param outputs_to_string: 141 Optional dictionary defining how tool outputs should be converted into string(s) or results. 142 If not provided, the tool result is converted to a string using a default handler. 143 144 `outputs_to_string` supports two formats: 145 146 1. Single output format - use "source", "handler", and/or "raw_result" at the root level: 147 ```python 148 { 149 "source": "docs", "handler": format_documents, "raw_result": False 150 } 151 ``` 152 - `source`: If provided, only the specified output key is sent to the handler. 153 - `handler`: A function that takes the tool output (or the extracted source value) and returns the 154 final result. 155 - `raw_result`: If `True`, the result is returned raw without string conversion, but applying the 156 `handler` if provided. This is intended for tools that return images. In this mode, the Tool 157 function or the `handler` function must return a list of `TextContent`/`ImageContent` objects to 158 ensure compatibility with Chat Generators. 159 160 2. Multiple output format - map keys to individual configurations: 161 ```python 162 { 163 "formatted_docs": {"source": "docs", "handler": format_documents}, 164 "summary": {"source": "summary_text", "handler": str.upper} 165 } 166 ``` 167 Each key maps to a dictionary that can contain "source" and/or "handler". 168 Note that `raw_result` is not supported in the multiple output format. 169 :param inputs_from_state: 170 Optional dictionary mapping state keys to tool parameter names. 171 Example: `{"repository": "repo"}` maps state's "repository" to tool's "repo" parameter. 172 :param outputs_to_state: 173 Optional dictionary defining how tool outputs map to keys within state as well as optional handlers. 174 If the source is provided only the specified output key is sent to the handler. 175 Example: 176 ```python 177 { 178 "documents": {"source": "docs", "handler": custom_handler} 179 } 180 ``` 181 If the source is omitted the whole tool result is sent to the handler. 182 Example: 183 ```python 184 { 185 "documents": {"handler": custom_handler} 186 } 187 ``` 188 :raises ValueError: If the provided pipeline is not a valid Haystack Pipeline instance. 189 """ 190 if not isinstance(pipeline, (Pipeline, AsyncPipeline)): 191 raise TypeError( 192 "The 'pipeline' parameter must be an instance of Pipeline or AsyncPipeline." 193 f" Got {type(pipeline)} instead." 194 ) 195 196 super().__init__( 197 component=SuperComponent(pipeline=pipeline, input_mapping=input_mapping, output_mapping=output_mapping), 198 name=name, 199 description=description, 200 parameters=parameters, 201 outputs_to_string=outputs_to_string, 202 inputs_from_state=inputs_from_state, 203 outputs_to_state=outputs_to_state, 204 ) 205 self._unresolved_parameters = parameters 206 self._pipeline = pipeline 207 self._input_mapping = input_mapping 208 self._output_mapping = output_mapping 209 210 def to_dict(self) -> dict[str, Any]: 211 """ 212 Serializes the PipelineTool to a dictionary. 213 214 :returns: 215 The serialized dictionary representation of PipelineTool. 216 """ 217 serialized: dict[str, Any] = { 218 "pipeline": self._pipeline.to_dict(), 219 "name": self.name, 220 "input_mapping": self._input_mapping, 221 "output_mapping": self._output_mapping, 222 "description": self.description, 223 "parameters": self._unresolved_parameters, 224 "inputs_from_state": self.inputs_from_state, 225 "is_pipeline_async": isinstance(self._pipeline, AsyncPipeline), 226 "outputs_to_state": _serialize_outputs_to_state(self.outputs_to_state) if self.outputs_to_state else None, 227 "outputs_to_string": _serialize_outputs_to_string(self.outputs_to_string) 228 if self.outputs_to_string 229 else None, 230 } 231 232 return {"type": generate_qualified_class_name(type(self)), "data": serialized} 233 234 @classmethod 235 def from_dict(cls, data: dict[str, Any]) -> "PipelineTool": 236 """ 237 Deserializes the PipelineTool from a dictionary. 238 239 :param data: The dictionary representation of PipelineTool. 240 :returns: 241 The deserialized PipelineTool instance. 242 """ 243 inner_data = data["data"] 244 is_pipeline_async = inner_data.get("is_pipeline_async", False) 245 pipeline_class = AsyncPipeline if is_pipeline_async else Pipeline 246 pipeline = pipeline_class.from_dict(inner_data["pipeline"]) 247 248 if "outputs_to_state" in inner_data and inner_data["outputs_to_state"]: 249 inner_data["outputs_to_state"] = _deserialize_outputs_to_state(inner_data["outputs_to_state"]) 250 251 if inner_data.get("outputs_to_string") is not None: 252 inner_data["outputs_to_string"] = _deserialize_outputs_to_string(inner_data["outputs_to_string"]) 253 254 merged_data = {**inner_data, "pipeline": pipeline} 255 # Remove is_pipeline_async as it's not a parameter of the constructor 256 merged_data.pop("is_pipeline_async", None) 257 return cls(**merged_data)