/ tests / agent / test_tool_guardrails.py
test_tool_guardrails.py
  1  """Pure tool-call guardrail primitive tests."""
  2  
  3  import json
  4  
  5  from agent.tool_guardrails import (
  6      ToolCallGuardrailConfig,
  7      ToolCallGuardrailController,
  8      ToolCallSignature,
  9      canonical_tool_args,
 10  )
 11  
 12  
 13  def test_tool_call_signature_hashes_canonical_nested_unicode_args_without_exposing_raw_args():
 14      args_a = {
 15          "z": [{"β": "☤", "a": 1}],
 16          "a": {"y": 2, "x": "secret-token-value"},
 17      }
 18      args_b = {
 19          "a": {"x": "secret-token-value", "y": 2},
 20          "z": [{"a": 1, "β": "☤"}],
 21      }
 22  
 23      assert canonical_tool_args(args_a) == canonical_tool_args(args_b)
 24      sig_a = ToolCallSignature.from_call("web_search", args_a)
 25      sig_b = ToolCallSignature.from_call("web_search", args_b)
 26  
 27      assert sig_a == sig_b
 28      assert len(sig_a.args_hash) == 64
 29      metadata = sig_a.to_metadata()
 30      assert metadata == {"tool_name": "web_search", "args_hash": sig_a.args_hash}
 31      assert "secret-token-value" not in json.dumps(metadata)
 32      assert "☤" not in json.dumps(metadata)
 33  
 34  
 35  def test_default_config_is_soft_warning_only_with_hard_stop_disabled():
 36      cfg = ToolCallGuardrailConfig()
 37  
 38      assert cfg.warnings_enabled is True
 39      assert cfg.hard_stop_enabled is False
 40      assert cfg.exact_failure_warn_after == 2
 41      assert cfg.same_tool_failure_warn_after == 3
 42      assert cfg.no_progress_warn_after == 2
 43      assert cfg.exact_failure_block_after == 5
 44      assert cfg.same_tool_failure_halt_after == 8
 45      assert cfg.no_progress_block_after == 5
 46  
 47  
 48  def test_config_parses_nested_warn_and_hard_stop_thresholds():
 49      cfg = ToolCallGuardrailConfig.from_mapping(
 50          {
 51              "warnings_enabled": False,
 52              "hard_stop_enabled": True,
 53              "warn_after": {
 54                  "exact_failure": 3,
 55                  "same_tool_failure": 4,
 56                  "idempotent_no_progress": 5,
 57              },
 58              "hard_stop_after": {
 59                  "exact_failure": 6,
 60                  "same_tool_failure": 7,
 61                  "idempotent_no_progress": 8,
 62              },
 63          }
 64      )
 65  
 66      assert cfg.warnings_enabled is False
 67      assert cfg.hard_stop_enabled is True
 68      assert cfg.exact_failure_warn_after == 3
 69      assert cfg.same_tool_failure_warn_after == 4
 70      assert cfg.no_progress_warn_after == 5
 71      assert cfg.exact_failure_block_after == 6
 72      assert cfg.same_tool_failure_halt_after == 7
 73      assert cfg.no_progress_block_after == 8
 74  
 75  
 76  def test_default_repeated_identical_failed_call_warns_without_blocking():
 77      controller = ToolCallGuardrailController()
 78      args = {"query": "same"}
 79  
 80      decisions = []
 81      for _ in range(5):
 82          assert controller.before_call("web_search", args).action == "allow"
 83          decisions.append(
 84              controller.after_call("web_search", args, '{"error":"boom"}', failed=True)
 85          )
 86  
 87      assert decisions[0].action == "allow"
 88      assert [d.action for d in decisions[1:]] == ["warn", "warn", "warn", "warn"]
 89      assert {d.code for d in decisions[1:]} == {"repeated_exact_failure_warning"}
 90      assert controller.before_call("web_search", args).action == "allow"
 91      assert controller.halt_decision is None
 92  
 93  
 94  def test_hard_stop_enabled_blocks_repeated_exact_failure_before_next_execution():
 95      controller = ToolCallGuardrailController(
 96          ToolCallGuardrailConfig(
 97              hard_stop_enabled=True,
 98              exact_failure_warn_after=2,
 99              exact_failure_block_after=2,
100              same_tool_failure_halt_after=99,
101          )
102      )
103      args = {"query": "same"}
104  
105      assert controller.before_call("web_search", args).action == "allow"
106      first = controller.after_call("web_search", args, '{"error":"boom"}', failed=True)
107      assert first.action == "allow"
108  
109      assert controller.before_call("web_search", args).action == "allow"
110      second = controller.after_call("web_search", args, '{"error":"boom"}', failed=True)
111      assert second.action == "warn"
112      assert second.code == "repeated_exact_failure_warning"
113  
114      blocked = controller.before_call("web_search", args)
115      assert blocked.action == "block"
116      assert blocked.code == "repeated_exact_failure_block"
117      assert blocked.count == 2
118  
119  
120  def test_success_resets_exact_signature_failure_streak():
121      controller = ToolCallGuardrailController(
122          ToolCallGuardrailConfig(hard_stop_enabled=True, exact_failure_block_after=2, same_tool_failure_halt_after=99)
123      )
124      args = {"query": "same"}
125  
126      controller.after_call("web_search", args, '{"error":"boom"}', failed=True)
127      controller.after_call("web_search", args, '{"ok":true}', failed=False)
128  
129      assert controller.before_call("web_search", args).action == "allow"
130      controller.after_call("web_search", args, '{"error":"boom"}', failed=True)
131      assert controller.before_call("web_search", args).action == "allow"
132  
133  
134  def test_same_tool_varying_args_warns_by_default_without_halting():
135      controller = ToolCallGuardrailController(
136          ToolCallGuardrailConfig(same_tool_failure_warn_after=2, same_tool_failure_halt_after=3)
137      )
138  
139      first = controller.after_call("terminal", {"command": "cmd-1"}, '{"exit_code":1}', failed=True)
140      second = controller.after_call("terminal", {"command": "cmd-2"}, '{"exit_code":1}', failed=True)
141      third = controller.after_call("terminal", {"command": "cmd-3"}, '{"exit_code":1}', failed=True)
142      fourth = controller.after_call("terminal", {"command": "cmd-4"}, '{"exit_code":1}', failed=True)
143  
144      assert first.action == "allow"
145      assert [second.action, third.action, fourth.action] == ["warn", "warn", "warn"]
146      assert {second.code, third.code, fourth.code} == {"same_tool_failure_warning"}
147      assert controller.halt_decision is None
148  
149  
150  def test_hard_stop_enabled_halts_same_tool_varying_args_failure_streak():
151      controller = ToolCallGuardrailController(
152          ToolCallGuardrailConfig(
153              hard_stop_enabled=True,
154              exact_failure_block_after=99,
155              same_tool_failure_warn_after=2,
156              same_tool_failure_halt_after=3,
157          )
158      )
159  
160      first = controller.after_call("terminal", {"command": "cmd-1"}, '{"exit_code":1}', failed=True)
161      assert first.action == "allow"
162      second = controller.after_call("terminal", {"command": "cmd-2"}, '{"exit_code":1}', failed=True)
163      assert second.action == "warn"
164      assert second.code == "same_tool_failure_warning"
165      third = controller.after_call("terminal", {"command": "cmd-3"}, '{"exit_code":1}', failed=True)
166      assert third.action == "halt"
167      assert third.code == "same_tool_failure_halt"
168      assert third.count == 3
169  
170  
171  def test_idempotent_no_progress_repeated_result_warns_without_blocking_by_default():
172      controller = ToolCallGuardrailController(
173          ToolCallGuardrailConfig(no_progress_warn_after=2, no_progress_block_after=2)
174      )
175      args = {"path": "/tmp/same.txt"}
176      result = "same file contents"
177  
178      for _ in range(4):
179          assert controller.before_call("read_file", args).action == "allow"
180          decision = controller.after_call("read_file", args, result, failed=False)
181  
182      assert decision.action == "warn"
183      assert decision.code == "idempotent_no_progress_warning"
184      assert controller.before_call("read_file", args).action == "allow"
185      assert controller.halt_decision is None
186  
187  
188  def test_hard_stop_enabled_blocks_idempotent_no_progress_future_repeat():
189      controller = ToolCallGuardrailController(
190          ToolCallGuardrailConfig(
191              hard_stop_enabled=True,
192              no_progress_warn_after=2,
193              no_progress_block_after=2,
194          )
195      )
196      args = {"path": "/tmp/same.txt"}
197      result = "same file contents"
198  
199      assert controller.before_call("read_file", args).action == "allow"
200      assert controller.after_call("read_file", args, result, failed=False).action == "allow"
201      assert controller.before_call("read_file", args).action == "allow"
202      warn = controller.after_call("read_file", args, result, failed=False)
203      assert warn.action == "warn"
204      assert warn.code == "idempotent_no_progress_warning"
205  
206      blocked = controller.before_call("read_file", args)
207      assert blocked.action == "block"
208      assert blocked.code == "idempotent_no_progress_block"
209  
210  
211  def test_mutating_or_unknown_tools_are_not_blocked_for_repeated_identical_success_output_by_default():
212      controller = ToolCallGuardrailController(
213          ToolCallGuardrailConfig(no_progress_warn_after=2, no_progress_block_after=2)
214      )
215  
216      for _ in range(3):
217          assert controller.before_call("write_file", {"path": "/tmp/x", "content": "x"}).action == "allow"
218          assert controller.after_call("write_file", {"path": "/tmp/x", "content": "x"}, "ok", failed=False).action == "allow"
219          assert controller.before_call("custom_tool", {"x": 1}).action == "allow"
220          assert controller.after_call("custom_tool", {"x": 1}, "ok", failed=False).action == "allow"
221  
222  
223  def test_reset_for_turn_clears_bounded_guardrail_state():
224      controller = ToolCallGuardrailController(
225          ToolCallGuardrailConfig(hard_stop_enabled=True, exact_failure_block_after=2, no_progress_block_after=2)
226      )
227      controller.after_call("web_search", {"query": "same"}, '{"error":"boom"}', failed=True)
228      controller.after_call("web_search", {"query": "same"}, '{"error":"boom"}', failed=True)
229      controller.after_call("read_file", {"path": "/tmp/x"}, "same", failed=False)
230      controller.after_call("read_file", {"path": "/tmp/x"}, "same", failed=False)
231  
232      assert controller.before_call("web_search", {"query": "same"}).action == "block"
233      assert controller.before_call("read_file", {"path": "/tmp/x"}).action == "block"
234  
235      controller.reset_for_turn()
236  
237      assert controller.before_call("web_search", {"query": "same"}).action == "allow"
238      assert controller.before_call("read_file", {"path": "/tmp/x"}).action == "allow"