/ tools / schema_sanitizer.py
schema_sanitizer.py
  1  """Sanitize tool JSON schemas for broad LLM-backend compatibility.
  2  
  3  Some local inference backends (notably llama.cpp's ``json-schema-to-grammar``
  4  converter used to build GBNF tool-call parsers) are strict about what JSON
  5  Schema shapes they accept. Schemas that OpenAI / Anthropic / most cloud
  6  providers silently accept can make llama.cpp fail the entire request with:
  7  
  8      HTTP 400: Unable to generate parser for this template.
  9      Automatic parser generation failed: JSON schema conversion failed:
 10      Unrecognized schema: "object"
 11  
 12  The failure modes we've seen in the wild:
 13  
 14  * ``{"type": "object"}`` with no ``properties`` — rejected as a node the
 15    grammar generator can't constrain.
 16  * A schema value that is the bare string ``"object"`` instead of a dict
 17    (malformed MCP server output, e.g. ``additionalProperties: "object"``).
 18  * ``"type": ["string", "null"]`` array types — many converters only accept
 19    single-string ``type``.
 20  * ``anyOf`` / ``oneOf`` unions whose only purpose is to permit ``null`` for
 21    optional fields (common Pydantic/MCP shape). Anthropic rejects these at
 22    the top of ``input_schema``; collapse them to the non-null branch.
 23  * Unconstrained ``additionalProperties`` on objects with empty properties.
 24  
 25  This module walks the final tool schema tree (after MCP-level normalization
 26  and any per-tool dynamic rebuilds) and fixes the known-hostile constructs
 27  in-place on a deep copy. It is intentionally conservative: it only modifies
 28  shapes the LLM backend couldn't use anyway.
 29  """
 30  
 31  from __future__ import annotations
 32  
 33  import copy
 34  import logging
 35  from typing import Any
 36  
 37  logger = logging.getLogger(__name__)
 38  
 39  
 40  def sanitize_tool_schemas(tools: list[dict]) -> list[dict]:
 41      """Return a copy of ``tools`` with each tool's parameter schema sanitized.
 42  
 43      Input is an OpenAI-format tool list:
 44      ``[{"type": "function", "function": {"name": ..., "parameters": {...}}}]``
 45  
 46      The returned list is a deep copy — callers can safely mutate it without
 47      affecting the original registry entries.
 48      """
 49      if not tools:
 50          return tools
 51  
 52      sanitized: list[dict] = []
 53      for tool in tools:
 54          sanitized.append(_sanitize_single_tool(tool))
 55      return sanitized
 56  
 57  
 58  def _sanitize_single_tool(tool: dict) -> dict:
 59      """Deep-copy and sanitize a single OpenAI-format tool entry."""
 60      out = copy.deepcopy(tool)
 61      fn = out.get("function") if isinstance(out, dict) else None
 62      if not isinstance(fn, dict):
 63          return out
 64  
 65      params = fn.get("parameters")
 66      # Missing / non-dict parameters → substitute the minimal valid shape.
 67      if not isinstance(params, dict):
 68          fn["parameters"] = {"type": "object", "properties": {}}
 69          return out
 70  
 71      fn["parameters"] = _sanitize_node(params, path=fn.get("name", "<tool>"))
 72      # After recursion, guarantee the top-level is an object with properties.
 73      top = fn["parameters"]
 74      if not isinstance(top, dict):
 75          fn["parameters"] = {"type": "object", "properties": {}}
 76      else:
 77          if top.get("type") != "object":
 78              top["type"] = "object"
 79          if "properties" not in top or not isinstance(top.get("properties"), dict):
 80              top["properties"] = {}
 81      # Final pass: collapse nullable anyOf/oneOf unions that the recursive
 82      # sanitizer above leaves intact (it only handles the array-form
 83      # ``type: [X, "null"]``). Keep the ``nullable: true`` hint so runtime
 84      # argument coercion (``model_tools._schema_allows_null``) can still
 85      # map a model-emitted ``"null"`` string to Python ``None``.
 86      fn["parameters"] = strip_nullable_unions(fn["parameters"], keep_nullable_hint=True)
 87      return out
 88  
 89  
 90  def strip_nullable_unions(
 91      schema: Any,
 92      *,
 93      keep_nullable_hint: bool = True,
 94  ) -> Any:
 95      """Collapse ``anyOf`` / ``oneOf`` nullable unions to the non-null branch.
 96  
 97      MCP / Pydantic optional fields commonly arrive as::
 98  
 99          {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null}
100  
101      Anthropic's tool input-schema validator rejects the null branch. Tool
102      optionality is already represented by the parent object's ``required``
103      array, so we collapse the union to the single non-null variant.
104  
105      Metadata (``title``, ``description``, ``default``, ``examples``) on the
106      outer union node is carried over to the replacement variant.
107  
108      Args:
109          schema: JSON-Schema fragment (dict, list, or scalar).
110          keep_nullable_hint: If True, set ``nullable: true`` on the replacement
111              to preserve the "this field may be None" signal for downstream
112              consumers that care (e.g. runtime argument coercion that maps the
113              literal string ``"null"`` to Python ``None``). Anthropic's
114              validator accepts ``nullable: true`` but strict producers may
115              prefer False.
116  
117      Returns:
118          The schema with nullable unions collapsed. Non-union nodes are
119          returned unchanged.
120      """
121      if isinstance(schema, list):
122          return [strip_nullable_unions(item, keep_nullable_hint=keep_nullable_hint) for item in schema]
123      if not isinstance(schema, dict):
124          return schema
125  
126      stripped = {
127          k: strip_nullable_unions(v, keep_nullable_hint=keep_nullable_hint)
128          for k, v in schema.items()
129      }
130      for key in ("anyOf", "oneOf"):
131          variants = stripped.get(key)
132          if not isinstance(variants, list):
133              continue
134          non_null = [
135              item for item in variants
136              if not (isinstance(item, dict) and item.get("type") == "null")
137          ]
138          # Only collapse when we actually dropped a null branch AND exactly
139          # one non-null branch survives (otherwise the union is meaningful
140          # and we leave it alone).
141          if len(non_null) == 1 and len(non_null) != len(variants):
142              replacement = dict(non_null[0]) if isinstance(non_null[0], dict) else {}
143              if keep_nullable_hint:
144                  replacement.setdefault("nullable", True)
145              for meta_key in ("title", "description", "default", "examples"):
146                  if meta_key in stripped and meta_key not in replacement:
147                      replacement[meta_key] = stripped[meta_key]
148              return strip_nullable_unions(replacement, keep_nullable_hint=keep_nullable_hint)
149      return stripped
150  
151  
152  def _sanitize_node(node: Any, path: str) -> Any:
153      """Recursively sanitize a JSON-Schema fragment.
154  
155      - Replaces bare-string schema values ("object", "string", ...) with
156        ``{"type": <value>}`` so downstream consumers see a dict.
157      - Injects ``properties: {}`` into object-typed nodes missing it.
158      - Normalizes ``type: [X, "null"]`` arrays to single ``type: X`` (keeping
159        ``nullable: true`` as a hint).
160      - Recurses into ``properties``, ``items``, ``additionalProperties``,
161        ``anyOf``, ``oneOf``, ``allOf``, and ``$defs`` / ``definitions``.
162      """
163      # Malformed: the schema position holds a bare string like "object".
164      if isinstance(node, str):
165          if node in {"object", "string", "number", "integer", "boolean", "array", "null"}:
166              logger.debug(
167                  "schema_sanitizer[%s]: replacing bare-string schema %r "
168                  "with {'type': %r}",
169                  path, node, node,
170              )
171              return {"type": node} if node != "object" else {
172                  "type": "object",
173                  "properties": {},
174              }
175          # Any other stray string is not a schema — drop it by replacing with
176          # a permissive object schema rather than propagate something the
177          # backend will reject.
178          logger.debug(
179              "schema_sanitizer[%s]: replacing non-schema string %r "
180              "with empty object schema", path, node,
181          )
182          return {"type": "object", "properties": {}}
183  
184      if isinstance(node, list):
185          return [_sanitize_node(item, f"{path}[{i}]") for i, item in enumerate(node)]
186  
187      if not isinstance(node, dict):
188          return node
189  
190      out: dict = {}
191      for key, value in node.items():
192          # type: [X, "null"] → type: X (the backend's tool-call parser only
193          # accepts singular string types; nullable is lost but the call still
194          # succeeds, and the model can still pass null on its own.)
195          if key == "type" and isinstance(value, list):
196              non_null = [t for t in value if t != "null"]
197              if len(non_null) == 1 and isinstance(non_null[0], str):
198                  out["type"] = non_null[0]
199                  if "null" in value:
200                      out.setdefault("nullable", True)
201                  continue
202              # Fallback: pick the first string type, drop the rest.
203              first_str = next((t for t in value if isinstance(t, str) and t != "null"), None)
204              if first_str:
205                  out["type"] = first_str
206                  continue
207              # All-null or empty list → treat as object.
208              out["type"] = "object"
209              continue
210  
211          if key in {"properties", "$defs", "definitions"} and isinstance(value, dict):
212              out[key] = {
213                  sub_k: _sanitize_node(sub_v, f"{path}.{key}.{sub_k}")
214                  for sub_k, sub_v in value.items()
215              }
216          elif key in {"items", "additionalProperties"}:
217              if isinstance(value, bool):
218                  # Keep bool ``additionalProperties`` as-is — it's a valid form
219                  # and widely accepted. ``items: true/false`` is non-standard
220                  # but we preserve rather than drop.
221                  out[key] = value
222              else:
223                  out[key] = _sanitize_node(value, f"{path}.{key}")
224          elif key in {"anyOf", "oneOf", "allOf"} and isinstance(value, list):
225              out[key] = [
226                  _sanitize_node(item, f"{path}.{key}[{i}]")
227                  for i, item in enumerate(value)
228              ]
229          elif key in {"required", "enum", "examples"}:
230              # Schema "sibling" keywords whose values are NOT schemas:
231              #  - ``required``: list of property-name strings
232              #  - ``enum``: list of literal values (any JSON type)
233              #  - ``examples``: list of example values (any JSON type)
234              # Recursing into these with _sanitize_node() would mis-interpret
235              # literal strings like "path" as bare-string schemas and replace
236              # them with {"type": "object"} dicts. Pass through unchanged.
237              out[key] = copy.deepcopy(value) if isinstance(value, (list, dict)) else value
238          else:
239              out[key] = _sanitize_node(value, f"{path}.{key}") if isinstance(value, (dict, list)) else value
240  
241      # Object nodes without properties: inject empty properties dict.
242      # llama.cpp's grammar generator can't constrain a free-form object.
243      if out.get("type") == "object" and not isinstance(out.get("properties"), dict):
244          out["properties"] = {}
245  
246      # Prune ``required`` entries that don't exist in properties (defense
247      # against malformed MCP schemas; also caught upstream for MCP tools, but
248      # built-in tools or plugin tools may not have been through that path).
249      if out.get("type") == "object" and isinstance(out.get("required"), list):
250          props = out.get("properties") or {}
251          valid = [r for r in out["required"] if isinstance(r, str) and r in props]
252          if not valid:
253              out.pop("required", None)
254          elif len(valid) != len(out["required"]):
255              out["required"] = valid
256  
257      return out