/ services / pipecat-agent / tools.py
tools.py
  1  """Bob voice agent tools — functions the LLM can call."""
  2  
  3  import json
  4  import os
  5  import urllib.request
  6  import urllib.error
  7  from loguru import logger
  8  
  9  HA_TOKEN = os.getenv("HA_TOKEN", "")
 10  HA_URL = os.getenv("HA_URL", "http://127.0.0.1:8123")
 11  OXIGRAPH_URL = os.getenv("OXIGRAPH_URL", "http://127.0.0.1:7878")
 12  
 13  # ── Tool definitions (OpenAI function calling format) ──────────
 14  
 15  TOOL_DEFINITIONS = [
 16      {
 17          "type": "function",
 18          "function": {
 19              "name": "get_weather",
 20              "description": "Get current weather conditions for a location. Use this when the user asks about weather, temperature, or forecast.",
 21              "parameters": {
 22                  "type": "object",
 23                  "properties": {
 24                      "location": {
 25                          "type": "string",
 26                          "description": "City and state/country, e.g. 'Tampa, FL' or 'London, UK'",
 27                      },
 28                  },
 29                  "required": ["location"],
 30              },
 31          },
 32      },
 33      {
 34          "type": "function",
 35          "function": {
 36              "name": "get_home_state",
 37              "description": "Get the current state of a smart home entity from Home Assistant. Use this when the user asks about lights, sensors, switches, temperature sensors, or any smart home device.",
 38              "parameters": {
 39                  "type": "object",
 40                  "properties": {
 41                      "entity_id": {
 42                          "type": "string",
 43                          "description": "The Home Assistant entity ID, e.g. 'light.kitchen', 'sensor.temperature', 'switch.fan'. If unsure, use 'all' to list available entities.",
 44                      },
 45                  },
 46                  "required": ["entity_id"],
 47              },
 48          },
 49      },
 50      {
 51          "type": "function",
 52          "function": {
 53              "name": "control_home_device",
 54              "description": "Control a smart home device — turn on/off lights, switches, etc. Use this when the user wants to change something in the home.",
 55              "parameters": {
 56                  "type": "object",
 57                  "properties": {
 58                      "entity_id": {
 59                          "type": "string",
 60                          "description": "The Home Assistant entity ID to control",
 61                      },
 62                      "action": {
 63                          "type": "string",
 64                          "enum": ["turn_on", "turn_off", "toggle"],
 65                          "description": "The action to perform",
 66                      },
 67                  },
 68                  "required": ["entity_id", "action"],
 69              },
 70          },
 71      },
 72      {
 73          "type": "function",
 74          "function": {
 75              "name": "query_knowledge",
 76              "description": "Query the Bob knowledge graph using SPARQL. Use this to look up family information, stored knowledge, or relationships between people, places, and things.",
 77              "parameters": {
 78                  "type": "object",
 79                  "properties": {
 80                      "query": {
 81                          "type": "string",
 82                          "description": "A natural language question about the knowledge graph. The function will construct the SPARQL query.",
 83                      },
 84                  },
 85                  "required": ["query"],
 86              },
 87          },
 88      },
 89      {
 90          "type": "function",
 91          "function": {
 92              "name": "execute_code",
 93              "description": "Execute Python code to investigate questions, run calculations, check system status, or perform any task that can't be answered with other tools. You have access to: requests (HTTP to internal services), docker (container management), numpy, and Bob's service URLs (PROMETHEUS_URL, DOCKER_HOST, HA_URL, OXIGRAPH_URL). Use print() to output results.",
 94              "parameters": {
 95                  "type": "object",
 96                  "properties": {
 97                      "code": {
 98                          "type": "string",
 99                          "description": "Python code to execute. Use print() for output. Available: requests, docker, numpy, json, datetime. Service URLs available as globals: PROMETHEUS_URL, DOCKER_HOST, HA_URL, OXIGRAPH_URL, NATS_URL, VLLM_URL.",
100                      },
101                  },
102                  "required": ["code"],
103              },
104          },
105      },
106  ]
107  
108  
109  # ── Tool implementations ───────────────────────────────────────
110  
111  def _http_get(url, headers=None, timeout=5):
112      """Simple HTTP GET."""
113      req = urllib.request.Request(url, headers=headers or {})
114      try:
115          with urllib.request.urlopen(req, timeout=timeout) as resp:
116              return json.loads(resp.read().decode())
117      except Exception as e:
118          return {"error": str(e)}
119  
120  
121  def _http_post(url, data, headers=None, timeout=5):
122      """Simple HTTP POST."""
123      body = json.dumps(data).encode() if isinstance(data, dict) else data.encode()
124      content_type = "application/json" if isinstance(data, dict) else "application/sparql-query"
125      hdrs = {"Content-Type": content_type, **(headers or {})}
126      req = urllib.request.Request(url, data=body, headers=hdrs, method="POST")
127      try:
128          with urllib.request.urlopen(req, timeout=timeout) as resp:
129              return json.loads(resp.read().decode())
130      except Exception as e:
131          return {"error": str(e)}
132  
133  
134  async def handle_get_weather(args):
135      """Get weather using Open-Meteo (free, no API key)."""
136      location = args.get("location", "Tampa, FL")
137      logger.info(f"Tool: get_weather({location})")
138  
139      # Geocode location
140      geo = _http_get(
141          f"https://geocoding-api.open-meteo.com/v1/search?name={urllib.request.quote(location)}&count=1"
142      )
143      if "error" in geo or not geo.get("results"):
144          return json.dumps({"error": f"Could not find location: {location}"})
145  
146      lat = geo["results"][0]["latitude"]
147      lon = geo["results"][0]["longitude"]
148      name = geo["results"][0].get("name", location)
149  
150      # Get weather
151      weather = _http_get(
152          f"https://api.open-meteo.com/v1/forecast?latitude={lat}&longitude={lon}"
153          f"&current=temperature_2m,relative_humidity_2m,wind_speed_10m,weather_code"
154          f"&temperature_unit=fahrenheit&wind_speed_unit=mph"
155      )
156      if "error" in weather:
157          return json.dumps(weather)
158  
159      current = weather.get("current", {})
160      # Weather code to description
161      codes = {0: "clear sky", 1: "mainly clear", 2: "partly cloudy", 3: "overcast",
162               45: "foggy", 48: "depositing rime fog", 51: "light drizzle", 53: "moderate drizzle",
163               61: "slight rain", 63: "moderate rain", 65: "heavy rain",
164               71: "slight snow", 73: "moderate snow", 75: "heavy snow",
165               80: "slight rain showers", 81: "moderate rain showers", 82: "violent rain showers",
166               95: "thunderstorm", 96: "thunderstorm with slight hail", 99: "thunderstorm with heavy hail"}
167      desc = codes.get(current.get("weather_code", -1), "unknown conditions")
168  
169      return json.dumps({
170          "location": name,
171          "temperature_f": current.get("temperature_2m"),
172          "humidity_percent": current.get("relative_humidity_2m"),
173          "wind_speed_mph": current.get("wind_speed_10m"),
174          "conditions": desc,
175      })
176  
177  
178  async def handle_get_home_state(args):
179      """Get Home Assistant entity state."""
180      entity_id = args.get("entity_id", "all")
181      logger.info(f"Tool: get_home_state({entity_id})")
182  
183      if not HA_TOKEN:
184          return json.dumps({"error": "Home Assistant not configured (no token)"})
185  
186      headers = {"Authorization": f"Bearer {HA_TOKEN}"}
187  
188      if entity_id == "all":
189          states = _http_get(f"{HA_URL}/api/states", headers=headers)
190          if isinstance(states, list):
191              summary = [{"entity_id": s["entity_id"], "state": s["state"],
192                          "name": s.get("attributes", {}).get("friendly_name", "")}
193                         for s in states[:30]]
194              return json.dumps({"entities": summary, "total": len(states)})
195          return json.dumps(states)
196  
197      state = _http_get(f"{HA_URL}/api/states/{entity_id}", headers=headers)
198      if "error" in state:
199          return json.dumps(state)
200      return json.dumps({
201          "entity_id": state.get("entity_id"),
202          "state": state.get("state"),
203          "name": state.get("attributes", {}).get("friendly_name", ""),
204          "attributes": {k: v for k, v in state.get("attributes", {}).items()
205                         if k in ("temperature", "unit_of_measurement", "brightness",
206                                  "color_temp", "friendly_name", "device_class")},
207      })
208  
209  
210  async def handle_control_home_device(args):
211      """Control a Home Assistant device."""
212      entity_id = args.get("entity_id", "")
213      action = args.get("action", "toggle")
214      logger.info(f"Tool: control_home_device({entity_id}, {action})")
215  
216      if not HA_TOKEN:
217          return json.dumps({"error": "Home Assistant not configured"})
218  
219      domain = entity_id.split(".")[0] if "." in entity_id else "homeassistant"
220      headers = {"Authorization": f"Bearer {HA_TOKEN}", "Content-Type": "application/json"}
221      data = json.dumps({"entity_id": entity_id}).encode()
222  
223      req = urllib.request.Request(
224          f"{HA_URL}/api/services/{domain}/{action}",
225          data=data, headers=headers, method="POST"
226      )
227      try:
228          with urllib.request.urlopen(req, timeout=5) as resp:
229              return json.dumps({"success": True, "entity_id": entity_id, "action": action})
230      except Exception as e:
231          return json.dumps({"error": str(e)})
232  
233  
234  async def handle_query_knowledge(args):
235      """Query the Oxigraph knowledge graph."""
236      query = args.get("query", "")
237      logger.info(f"Tool: query_knowledge({query})")
238  
239      # Simple SPARQL: list all classes and their labels
240      sparql = """
241      PREFIX bob: <https://bob.local/ontology#>
242      PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
243      SELECT ?subject ?predicate ?object WHERE {
244          ?subject ?predicate ?object
245      } LIMIT 50
246      """
247  
248      result = _http_post(
249          f"{OXIGRAPH_URL}/query",
250          sparql,
251          headers={"Accept": "application/json"},
252      )
253      if "error" in result:
254          return json.dumps(result)
255  
256      bindings = result.get("results", {}).get("bindings", [])
257      triples = [{"s": b["subject"]["value"], "p": b["predicate"]["value"], "o": b["object"]["value"]}
258                 for b in bindings[:20]]
259      return json.dumps({"knowledge_triples": triples, "total_results": len(bindings),
260                          "note": "This is the raw knowledge graph. Interpret these triples to answer the user's question."})
261  
262  
263  REPL_URL = os.getenv("REPL_URL", "http://127.0.0.1:10900")
264  
265  
266  async def handle_execute_code(args):
267      """Execute Python code in Bob's sandboxed REPL."""
268      code = args.get("code", "")
269      logger.info(f"Tool: execute_code ({len(code)} chars)")
270  
271      result = _http_post(
272          f"{REPL_URL}/execute",
273          {"code": code},
274          timeout=35,
275      )
276  
277      if "error" in result and result.get("stdout") is None:
278          return json.dumps({"error": f"REPL unavailable: {result['error']}"})
279  
280      output = result.get("stdout", "")
281      error = result.get("error") or result.get("stderr", "")
282  
283      if result.get("success"):
284          return json.dumps({"output": output or "(no output)", "success": True})
285      else:
286          return json.dumps({"output": output, "error": error, "success": False})
287  
288  
289  # ── Tool dispatch ──────────────────────────────────────────────
290  
291  TOOL_HANDLERS = {
292      "get_weather": handle_get_weather,
293      "get_home_state": handle_get_home_state,
294      "control_home_device": handle_control_home_device,
295      "query_knowledge": handle_query_knowledge,
296      "execute_code": handle_execute_code,
297  }