test_tool_guardrails.py
1 """Pure tool-call guardrail primitive tests.""" 2 3 import json 4 5 from agent.tool_guardrails import ( 6 ToolCallGuardrailConfig, 7 ToolCallGuardrailController, 8 ToolCallSignature, 9 canonical_tool_args, 10 ) 11 12 13 def test_tool_call_signature_hashes_canonical_nested_unicode_args_without_exposing_raw_args(): 14 args_a = { 15 "z": [{"β": "☤", "a": 1}], 16 "a": {"y": 2, "x": "secret-token-value"}, 17 } 18 args_b = { 19 "a": {"x": "secret-token-value", "y": 2}, 20 "z": [{"a": 1, "β": "☤"}], 21 } 22 23 assert canonical_tool_args(args_a) == canonical_tool_args(args_b) 24 sig_a = ToolCallSignature.from_call("web_search", args_a) 25 sig_b = ToolCallSignature.from_call("web_search", args_b) 26 27 assert sig_a == sig_b 28 assert len(sig_a.args_hash) == 64 29 metadata = sig_a.to_metadata() 30 assert metadata == {"tool_name": "web_search", "args_hash": sig_a.args_hash} 31 assert "secret-token-value" not in json.dumps(metadata) 32 assert "☤" not in json.dumps(metadata) 33 34 35 def test_default_config_is_soft_warning_only_with_hard_stop_disabled(): 36 cfg = ToolCallGuardrailConfig() 37 38 assert cfg.warnings_enabled is True 39 assert cfg.hard_stop_enabled is False 40 assert cfg.exact_failure_warn_after == 2 41 assert cfg.same_tool_failure_warn_after == 3 42 assert cfg.no_progress_warn_after == 2 43 assert cfg.exact_failure_block_after == 5 44 assert cfg.same_tool_failure_halt_after == 8 45 assert cfg.no_progress_block_after == 5 46 47 48 def test_config_parses_nested_warn_and_hard_stop_thresholds(): 49 cfg = ToolCallGuardrailConfig.from_mapping( 50 { 51 "warnings_enabled": False, 52 "hard_stop_enabled": True, 53 "warn_after": { 54 "exact_failure": 3, 55 "same_tool_failure": 4, 56 "idempotent_no_progress": 5, 57 }, 58 "hard_stop_after": { 59 "exact_failure": 6, 60 "same_tool_failure": 7, 61 "idempotent_no_progress": 8, 62 }, 63 } 64 ) 65 66 assert cfg.warnings_enabled is False 67 assert cfg.hard_stop_enabled is True 68 assert cfg.exact_failure_warn_after == 3 69 assert cfg.same_tool_failure_warn_after == 4 70 assert cfg.no_progress_warn_after == 5 71 assert cfg.exact_failure_block_after == 6 72 assert cfg.same_tool_failure_halt_after == 7 73 assert cfg.no_progress_block_after == 8 74 75 76 def test_default_repeated_identical_failed_call_warns_without_blocking(): 77 controller = ToolCallGuardrailController() 78 args = {"query": "same"} 79 80 decisions = [] 81 for _ in range(5): 82 assert controller.before_call("web_search", args).action == "allow" 83 decisions.append( 84 controller.after_call("web_search", args, '{"error":"boom"}', failed=True) 85 ) 86 87 assert decisions[0].action == "allow" 88 assert [d.action for d in decisions[1:]] == ["warn", "warn", "warn", "warn"] 89 assert {d.code for d in decisions[1:]} == {"repeated_exact_failure_warning"} 90 assert controller.before_call("web_search", args).action == "allow" 91 assert controller.halt_decision is None 92 93 94 def test_hard_stop_enabled_blocks_repeated_exact_failure_before_next_execution(): 95 controller = ToolCallGuardrailController( 96 ToolCallGuardrailConfig( 97 hard_stop_enabled=True, 98 exact_failure_warn_after=2, 99 exact_failure_block_after=2, 100 same_tool_failure_halt_after=99, 101 ) 102 ) 103 args = {"query": "same"} 104 105 assert controller.before_call("web_search", args).action == "allow" 106 first = controller.after_call("web_search", args, '{"error":"boom"}', failed=True) 107 assert first.action == "allow" 108 109 assert controller.before_call("web_search", args).action == "allow" 110 second = controller.after_call("web_search", args, '{"error":"boom"}', failed=True) 111 assert second.action == "warn" 112 assert second.code == "repeated_exact_failure_warning" 113 114 blocked = controller.before_call("web_search", args) 115 assert blocked.action == "block" 116 assert blocked.code == "repeated_exact_failure_block" 117 assert blocked.count == 2 118 119 120 def test_success_resets_exact_signature_failure_streak(): 121 controller = ToolCallGuardrailController( 122 ToolCallGuardrailConfig(hard_stop_enabled=True, exact_failure_block_after=2, same_tool_failure_halt_after=99) 123 ) 124 args = {"query": "same"} 125 126 controller.after_call("web_search", args, '{"error":"boom"}', failed=True) 127 controller.after_call("web_search", args, '{"ok":true}', failed=False) 128 129 assert controller.before_call("web_search", args).action == "allow" 130 controller.after_call("web_search", args, '{"error":"boom"}', failed=True) 131 assert controller.before_call("web_search", args).action == "allow" 132 133 134 def test_same_tool_varying_args_warns_by_default_without_halting(): 135 controller = ToolCallGuardrailController( 136 ToolCallGuardrailConfig(same_tool_failure_warn_after=2, same_tool_failure_halt_after=3) 137 ) 138 139 first = controller.after_call("terminal", {"command": "cmd-1"}, '{"exit_code":1}', failed=True) 140 second = controller.after_call("terminal", {"command": "cmd-2"}, '{"exit_code":1}', failed=True) 141 third = controller.after_call("terminal", {"command": "cmd-3"}, '{"exit_code":1}', failed=True) 142 fourth = controller.after_call("terminal", {"command": "cmd-4"}, '{"exit_code":1}', failed=True) 143 144 assert first.action == "allow" 145 assert [second.action, third.action, fourth.action] == ["warn", "warn", "warn"] 146 assert {second.code, third.code, fourth.code} == {"same_tool_failure_warning"} 147 assert controller.halt_decision is None 148 149 150 def test_hard_stop_enabled_halts_same_tool_varying_args_failure_streak(): 151 controller = ToolCallGuardrailController( 152 ToolCallGuardrailConfig( 153 hard_stop_enabled=True, 154 exact_failure_block_after=99, 155 same_tool_failure_warn_after=2, 156 same_tool_failure_halt_after=3, 157 ) 158 ) 159 160 first = controller.after_call("terminal", {"command": "cmd-1"}, '{"exit_code":1}', failed=True) 161 assert first.action == "allow" 162 second = controller.after_call("terminal", {"command": "cmd-2"}, '{"exit_code":1}', failed=True) 163 assert second.action == "warn" 164 assert second.code == "same_tool_failure_warning" 165 third = controller.after_call("terminal", {"command": "cmd-3"}, '{"exit_code":1}', failed=True) 166 assert third.action == "halt" 167 assert third.code == "same_tool_failure_halt" 168 assert third.count == 3 169 170 171 def test_idempotent_no_progress_repeated_result_warns_without_blocking_by_default(): 172 controller = ToolCallGuardrailController( 173 ToolCallGuardrailConfig(no_progress_warn_after=2, no_progress_block_after=2) 174 ) 175 args = {"path": "/tmp/same.txt"} 176 result = "same file contents" 177 178 for _ in range(4): 179 assert controller.before_call("read_file", args).action == "allow" 180 decision = controller.after_call("read_file", args, result, failed=False) 181 182 assert decision.action == "warn" 183 assert decision.code == "idempotent_no_progress_warning" 184 assert controller.before_call("read_file", args).action == "allow" 185 assert controller.halt_decision is None 186 187 188 def test_hard_stop_enabled_blocks_idempotent_no_progress_future_repeat(): 189 controller = ToolCallGuardrailController( 190 ToolCallGuardrailConfig( 191 hard_stop_enabled=True, 192 no_progress_warn_after=2, 193 no_progress_block_after=2, 194 ) 195 ) 196 args = {"path": "/tmp/same.txt"} 197 result = "same file contents" 198 199 assert controller.before_call("read_file", args).action == "allow" 200 assert controller.after_call("read_file", args, result, failed=False).action == "allow" 201 assert controller.before_call("read_file", args).action == "allow" 202 warn = controller.after_call("read_file", args, result, failed=False) 203 assert warn.action == "warn" 204 assert warn.code == "idempotent_no_progress_warning" 205 206 blocked = controller.before_call("read_file", args) 207 assert blocked.action == "block" 208 assert blocked.code == "idempotent_no_progress_block" 209 210 211 def test_mutating_or_unknown_tools_are_not_blocked_for_repeated_identical_success_output_by_default(): 212 controller = ToolCallGuardrailController( 213 ToolCallGuardrailConfig(no_progress_warn_after=2, no_progress_block_after=2) 214 ) 215 216 for _ in range(3): 217 assert controller.before_call("write_file", {"path": "/tmp/x", "content": "x"}).action == "allow" 218 assert controller.after_call("write_file", {"path": "/tmp/x", "content": "x"}, "ok", failed=False).action == "allow" 219 assert controller.before_call("custom_tool", {"x": 1}).action == "allow" 220 assert controller.after_call("custom_tool", {"x": 1}, "ok", failed=False).action == "allow" 221 222 223 def test_reset_for_turn_clears_bounded_guardrail_state(): 224 controller = ToolCallGuardrailController( 225 ToolCallGuardrailConfig(hard_stop_enabled=True, exact_failure_block_after=2, no_progress_block_after=2) 226 ) 227 controller.after_call("web_search", {"query": "same"}, '{"error":"boom"}', failed=True) 228 controller.after_call("web_search", {"query": "same"}, '{"error":"boom"}', failed=True) 229 controller.after_call("read_file", {"path": "/tmp/x"}, "same", failed=False) 230 controller.after_call("read_file", {"path": "/tmp/x"}, "same", failed=False) 231 232 assert controller.before_call("web_search", {"query": "same"}).action == "block" 233 assert controller.before_call("read_file", {"path": "/tmp/x"}).action == "block" 234 235 controller.reset_for_turn() 236 237 assert controller.before_call("web_search", {"query": "same"}).action == "allow" 238 assert controller.before_call("read_file", {"path": "/tmp/x"}).action == "allow"