/ agent / moonshot_schema.py
moonshot_schema.py
  1  """Helpers for translating OpenAI-style tool schemas to Moonshot's schema subset.
  2  
  3  Moonshot (Kimi) accepts a stricter subset of JSON Schema than standard OpenAI
  4  tool calling.  Requests that violate it fail with HTTP 400:
  5  
  6      tools.function.parameters is not a valid moonshot flavored json schema,
  7      details: <...>
  8  
  9  Known rejection modes documented at
 10  https://forum.moonshot.ai/t/tool-calling-specification-violation-on-moonshot-api/102
 11  and MoonshotAI/kimi-cli#1595:
 12  
 13  1. Every property schema must carry a ``type``.  Standard JSON Schema allows
 14     type to be omitted (the value is then unconstrained); Moonshot refuses.
 15  2. When ``anyOf`` is used, ``type`` must be on the ``anyOf`` children, not
 16     the parent.  Presence of both causes "type should be defined in anyOf
 17     items instead of the parent schema".
 18  
 19  The ``#/definitions/...`` → ``#/$defs/...`` rewrite for draft-07 refs is
 20  handled separately in ``tools/mcp_tool._normalize_mcp_input_schema`` so it
 21  applies at MCP registration time for all providers.
 22  """
 23  
 24  from __future__ import annotations
 25  
 26  import copy
 27  from typing import Any, Dict, List
 28  
 29  # Keys whose values are maps of name → schema (not schemas themselves).
 30  # When we recurse, we walk the values of these maps as schemas, but we do
 31  # NOT apply the missing-type repair to the map itself.
 32  _SCHEMA_MAP_KEYS = frozenset({"properties", "patternProperties", "$defs", "definitions"})
 33  
 34  # Keys whose values are lists of schemas.
 35  _SCHEMA_LIST_KEYS = frozenset({"anyOf", "oneOf", "allOf", "prefixItems"})
 36  
 37  # Keys whose values are a single nested schema.
 38  _SCHEMA_NODE_KEYS = frozenset({"items", "contains", "not", "additionalProperties", "propertyNames"})
 39  
 40  
 41  def _repair_schema(node: Any, is_schema: bool = True) -> Any:
 42      """Recursively apply Moonshot repairs to a schema node.
 43  
 44      ``is_schema=True`` means this dict is a JSON Schema node and gets the
 45      missing-type + anyOf-parent repairs applied.  ``is_schema=False`` means
 46      it's a container map (e.g. the value of ``properties``) and we only
 47      recurse into its values.
 48      """
 49      if isinstance(node, list):
 50          # Lists only show up under schema-list keys (anyOf/oneOf/allOf), so
 51          # every element is itself a schema.
 52          return [_repair_schema(item, is_schema=True) for item in node]
 53      if not isinstance(node, dict):
 54          return node
 55  
 56      # Walk the dict, deciding per-key whether recursion is into a schema
 57      # node, a container map, or a scalar.
 58      repaired: Dict[str, Any] = {}
 59      for key, value in node.items():
 60          if key in _SCHEMA_MAP_KEYS and isinstance(value, dict):
 61              # Map of name → schema.  Don't treat the map itself as a schema
 62              # (it has no type / properties of its own), but each value is.
 63              repaired[key] = {
 64                  sub_key: _repair_schema(sub_val, is_schema=True)
 65                  for sub_key, sub_val in value.items()
 66              }
 67          elif key in _SCHEMA_LIST_KEYS and isinstance(value, list):
 68              repaired[key] = [_repair_schema(v, is_schema=True) for v in value]
 69          elif key in _SCHEMA_NODE_KEYS:
 70              # items / not / additionalProperties: single nested schema.
 71              # additionalProperties can also be a bool — leave those alone.
 72              if isinstance(value, dict):
 73                  repaired[key] = _repair_schema(value, is_schema=True)
 74              else:
 75                  repaired[key] = value
 76          else:
 77              # Scalars (description, title, format, enum values, etc.) pass through.
 78              repaired[key] = value
 79  
 80      if not is_schema:
 81          return repaired
 82  
 83      # Rule 2: when anyOf is present, type belongs only on the children.
 84      # Additionally, Moonshot rejects null-type branches inside anyOf
 85      # (enum value (<nil>) does not match any type in [string]).
 86      # Collapse the anyOf to the first non-null branch and infer its type.
 87      if "anyOf" in repaired and isinstance(repaired["anyOf"], list):
 88          repaired.pop("type", None)
 89          non_null = [b for b in repaired["anyOf"]
 90                      if isinstance(b, dict) and b.get("type") != "null"]
 91          if non_null and len(non_null) < len(repaired["anyOf"]):
 92              # Drop the anyOf wrapper — keep only the non-null branch.
 93              # If there's a single non-null branch, promote it and fall
 94              # through to Rules 1/3 so nullable/enum cleanup still applies
 95              # to the merged node.
 96              if len(non_null) == 1:
 97                  merge = {k: v for k, v in repaired.items() if k != "anyOf"}
 98                  merge.update(non_null[0])
 99                  repaired = merge
100              else:
101                  repaired["anyOf"] = non_null
102                  return repaired
103          else:
104              # Nothing to collapse — parent type stripped, children already
105              # repaired by the recursive walk above.
106              return repaired
107  
108      # Moonshot also rejects non-standard keywords like ``nullable`` on
109      # parameter schemas — strip it.
110      repaired.pop("nullable", None)
111  
112      # Rule 1: property schemas without type need one.  $ref nodes are exempt
113      # — their type comes from the referenced definition.
114      # Fill missing type BEFORE Rule 3 so enum cleanup can check the type.
115      if "$ref" not in repaired:
116          repaired = _fill_missing_type(repaired)
117  
118      # Rule 3: Moonshot rejects null/empty-string values inside enum arrays
119      # when the parent type is a scalar (string, integer, etc.).  The error:
120      #   "enum value (<nil>) does not match any type in [string]"
121      # Strip null and empty-string from enum values, and if the enum becomes
122      # empty, drop it entirely.
123      if "enum" in repaired and isinstance(repaired["enum"], list):
124          node_type = repaired.get("type")
125          if node_type in ("string", "integer", "number", "boolean"):
126              cleaned = [v for v in repaired["enum"]
127                         if v is not None and v != ""]
128              if cleaned:
129                  repaired["enum"] = cleaned
130              else:
131                  repaired.pop("enum")
132  
133      return repaired
134  
135  
136  def _fill_missing_type(node: Dict[str, Any]) -> Dict[str, Any]:
137      """Infer a reasonable ``type`` if this schema node has none."""
138      if "type" in node and node["type"] not in (None, ""):
139          return node
140  
141      # Heuristic: presence of ``properties`` → object, ``items`` → array, ``enum``
142      # → type of first enum value, else fall back to ``string`` (safest scalar).
143      if "properties" in node or "required" in node or "additionalProperties" in node:
144          inferred = "object"
145      elif "items" in node or "prefixItems" in node:
146          inferred = "array"
147      elif "enum" in node and isinstance(node["enum"], list) and node["enum"]:
148          sample = node["enum"][0]
149          if isinstance(sample, bool):
150              inferred = "boolean"
151          elif isinstance(sample, int):
152              inferred = "integer"
153          elif isinstance(sample, float):
154              inferred = "number"
155          else:
156              inferred = "string"
157      else:
158          inferred = "string"
159  
160      return {**node, "type": inferred}
161  
162  
163  def sanitize_moonshot_tool_parameters(parameters: Any) -> Dict[str, Any]:
164      """Normalize tool parameters to a Moonshot-compatible object schema.
165  
166      Returns a deep-copied schema with the two flavored-JSON-Schema repairs
167      applied.  Input is not mutated.
168      """
169      if not isinstance(parameters, dict):
170          return {"type": "object", "properties": {}}
171  
172      repaired = _repair_schema(copy.deepcopy(parameters), is_schema=True)
173      if not isinstance(repaired, dict):
174          return {"type": "object", "properties": {}}
175  
176      # Top-level must be an object schema
177      if repaired.get("type") != "object":
178          repaired["type"] = "object"
179      if "properties" not in repaired:
180          repaired["properties"] = {}
181  
182      return repaired
183  
184  
185  def sanitize_moonshot_tools(tools: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
186      """Apply ``sanitize_moonshot_tool_parameters`` to every tool's parameters."""
187      if not tools:
188          return tools
189  
190      sanitized: List[Dict[str, Any]] = []
191      any_change = False
192      for tool in tools:
193          if not isinstance(tool, dict):
194              sanitized.append(tool)
195              continue
196          fn = tool.get("function")
197          if not isinstance(fn, dict):
198              sanitized.append(tool)
199              continue
200          params = fn.get("parameters")
201          repaired = sanitize_moonshot_tool_parameters(params)
202          if repaired is not params:
203              any_change = True
204              new_fn = {**fn, "parameters": repaired}
205              sanitized.append({**tool, "function": new_fn})
206          else:
207              sanitized.append(tool)
208  
209      return sanitized if any_change else tools
210  
211  
212  def is_moonshot_model(model: str | None) -> bool:
213      """True for any Kimi / Moonshot model slug, regardless of aggregator prefix.
214  
215      Matches bare names (``kimi-k2.6``, ``moonshotai/Kimi-K2.6``) and aggregator-
216      prefixed slugs (``nous/moonshotai/kimi-k2.6``, ``openrouter/moonshotai/...``).
217      Detection by model name covers Nous / OpenRouter / other aggregators that
218      route to Moonshot's inference, where the base URL is the aggregator's, not
219      ``api.moonshot.ai``.
220      """
221      if not model:
222          return False
223      bare = model.strip().lower()
224      # Last path segment (covers aggregator-prefixed slugs)
225      tail = bare.rsplit("/", 1)[-1]
226      if tail.startswith("kimi-") or tail == "kimi":
227          return True
228      # Vendor-prefixed forms commonly used on aggregators
229      if "moonshot" in bare or "/kimi" in bare or bare.startswith("kimi"):
230          return True
231      return False