test_file_operations.py
1 """Tests for tools/file_operations.py — deny list, result dataclasses, helpers.""" 2 3 import os 4 import pytest 5 from pathlib import Path 6 from unittest.mock import MagicMock 7 8 from tools.file_operations import ( 9 _is_write_denied, 10 WRITE_DENIED_PATHS, 11 WRITE_DENIED_PREFIXES, 12 ReadResult, 13 WriteResult, 14 PatchResult, 15 SearchResult, 16 SearchMatch, 17 LintResult, 18 ShellFileOperations, 19 BINARY_EXTENSIONS, 20 IMAGE_EXTENSIONS, 21 MAX_LINE_LENGTH, 22 normalize_read_pagination, 23 normalize_search_pagination, 24 ) 25 26 27 # ========================================================================= 28 # Write deny list 29 # ========================================================================= 30 31 class TestIsWriteDenied: 32 def test_ssh_authorized_keys_denied(self): 33 path = os.path.join(str(Path.home()), ".ssh", "authorized_keys") 34 assert _is_write_denied(path) is True 35 36 def test_ssh_id_rsa_denied(self): 37 path = os.path.join(str(Path.home()), ".ssh", "id_rsa") 38 assert _is_write_denied(path) is True 39 40 def test_netrc_denied(self): 41 path = os.path.join(str(Path.home()), ".netrc") 42 assert _is_write_denied(path) is True 43 44 def test_aws_prefix_denied(self): 45 path = os.path.join(str(Path.home()), ".aws", "credentials") 46 assert _is_write_denied(path) is True 47 48 def test_kube_prefix_denied(self): 49 path = os.path.join(str(Path.home()), ".kube", "config") 50 assert _is_write_denied(path) is True 51 52 def test_normal_file_allowed(self, tmp_path): 53 path = str(tmp_path / "safe_file.txt") 54 assert _is_write_denied(path) is False 55 56 def test_project_file_allowed(self): 57 assert _is_write_denied("/tmp/project/main.py") is False 58 59 def test_tilde_expansion(self): 60 assert _is_write_denied("~/.ssh/authorized_keys") is True 61 62 63 64 # ========================================================================= 65 # Result dataclasses 66 # ========================================================================= 67 68 class TestReadResult: 69 def test_to_dict_omits_defaults(self): 70 r = ReadResult() 71 d = r.to_dict() 72 assert "error" not in d # None omitted 73 assert "similar_files" not in d # empty list omitted 74 75 def test_to_dict_preserves_empty_content(self): 76 """Empty file should still have content key in the dict.""" 77 r = ReadResult(content="", total_lines=0, file_size=0) 78 d = r.to_dict() 79 assert "content" in d 80 assert d["content"] == "" 81 assert d["total_lines"] == 0 82 assert d["file_size"] == 0 83 84 def test_to_dict_includes_values(self): 85 r = ReadResult(content="hello", total_lines=10, file_size=50, truncated=True) 86 d = r.to_dict() 87 assert d["content"] == "hello" 88 assert d["total_lines"] == 10 89 assert d["truncated"] is True 90 91 def test_binary_fields(self): 92 r = ReadResult(is_binary=True, is_image=True, mime_type="image/png") 93 d = r.to_dict() 94 assert d["is_binary"] is True 95 assert d["is_image"] is True 96 assert d["mime_type"] == "image/png" 97 98 99 class TestWriteResult: 100 def test_to_dict_omits_none(self): 101 r = WriteResult(bytes_written=100) 102 d = r.to_dict() 103 assert d["bytes_written"] == 100 104 assert "error" not in d 105 assert "warning" not in d 106 107 def test_to_dict_includes_error(self): 108 r = WriteResult(error="Permission denied") 109 d = r.to_dict() 110 assert d["error"] == "Permission denied" 111 112 113 class TestPatchResult: 114 def test_to_dict_success(self): 115 r = PatchResult(success=True, diff="--- a\n+++ b", files_modified=["a.py"]) 116 d = r.to_dict() 117 assert d["success"] is True 118 assert d["diff"] == "--- a\n+++ b" 119 assert d["files_modified"] == ["a.py"] 120 121 def test_to_dict_error(self): 122 r = PatchResult(error="File not found") 123 d = r.to_dict() 124 assert d["success"] is False 125 assert d["error"] == "File not found" 126 127 128 class TestSearchResult: 129 def test_to_dict_with_matches(self): 130 m = SearchMatch(path="a.py", line_number=10, content="hello") 131 r = SearchResult(matches=[m], total_count=1) 132 d = r.to_dict() 133 assert d["total_count"] == 1 134 assert len(d["matches"]) == 1 135 assert d["matches"][0]["path"] == "a.py" 136 137 def test_to_dict_empty(self): 138 r = SearchResult() 139 d = r.to_dict() 140 assert d["total_count"] == 0 141 assert "matches" not in d 142 143 def test_to_dict_files_mode(self): 144 r = SearchResult(files=["a.py", "b.py"], total_count=2) 145 d = r.to_dict() 146 assert d["files"] == ["a.py", "b.py"] 147 148 def test_to_dict_count_mode(self): 149 r = SearchResult(counts={"a.py": 3, "b.py": 1}, total_count=4) 150 d = r.to_dict() 151 assert d["counts"]["a.py"] == 3 152 153 def test_truncated_flag(self): 154 r = SearchResult(total_count=100, truncated=True) 155 d = r.to_dict() 156 assert d["truncated"] is True 157 158 159 class TestLintResult: 160 def test_skipped(self): 161 r = LintResult(skipped=True, message="No linter for .md files") 162 d = r.to_dict() 163 assert d["status"] == "skipped" 164 assert d["message"] == "No linter for .md files" 165 166 def test_success(self): 167 r = LintResult(success=True, output="") 168 d = r.to_dict() 169 assert d["status"] == "ok" 170 171 def test_error(self): 172 r = LintResult(success=False, output="SyntaxError line 5") 173 d = r.to_dict() 174 assert d["status"] == "error" 175 assert "SyntaxError" in d["output"] 176 177 178 # ========================================================================= 179 # ShellFileOperations helpers 180 # ========================================================================= 181 182 @pytest.fixture() 183 def mock_env(): 184 """Create a mock terminal environment.""" 185 env = MagicMock() 186 env.cwd = "/tmp/test" 187 env.execute.return_value = {"output": "", "returncode": 0} 188 return env 189 190 191 @pytest.fixture() 192 def file_ops(mock_env): 193 return ShellFileOperations(mock_env) 194 195 196 class TestShellFileOpsHelpers: 197 def test_normalize_read_pagination_clamps_invalid_values(self): 198 assert normalize_read_pagination(offset=0, limit=0) == (1, 1) 199 assert normalize_read_pagination(offset=-10, limit=-5) == (1, 1) 200 assert normalize_read_pagination(offset="bad", limit="bad") == (1, 500) 201 assert normalize_read_pagination(offset=2, limit=999999) == (2, 2000) 202 203 def test_normalize_search_pagination_clamps_invalid_values(self): 204 assert normalize_search_pagination(offset=-10, limit=-5) == (0, 1) 205 assert normalize_search_pagination(offset="bad", limit="bad") == (0, 50) 206 assert normalize_search_pagination(offset=3, limit=0) == (3, 1) 207 208 def test_escape_shell_arg_simple(self, file_ops): 209 assert file_ops._escape_shell_arg("hello") == "'hello'" 210 211 def test_escape_shell_arg_with_quotes(self, file_ops): 212 result = file_ops._escape_shell_arg("it's") 213 assert "'" in result 214 # Should be safely escaped 215 assert result.count("'") >= 4 # wrapping + escaping 216 217 def test_is_likely_binary_by_extension(self, file_ops): 218 assert file_ops._is_likely_binary("photo.png") is True 219 assert file_ops._is_likely_binary("data.db") is True 220 assert file_ops._is_likely_binary("code.py") is False 221 assert file_ops._is_likely_binary("readme.md") is False 222 223 def test_is_likely_binary_by_content(self, file_ops): 224 # High ratio of non-printable chars -> binary 225 binary_content = "\x00\x01\x02\x03" * 250 226 assert file_ops._is_likely_binary("unknown", binary_content) is True 227 228 # Normal text -> not binary 229 assert file_ops._is_likely_binary("unknown", "Hello world\nLine 2\n") is False 230 231 def test_is_image(self, file_ops): 232 assert file_ops._is_image("photo.png") is True 233 assert file_ops._is_image("pic.jpg") is True 234 assert file_ops._is_image("icon.ico") is True 235 assert file_ops._is_image("data.pdf") is False 236 assert file_ops._is_image("code.py") is False 237 238 def test_add_line_numbers(self, file_ops): 239 content = "line one\nline two\nline three" 240 result = file_ops._add_line_numbers(content) 241 assert " 1|line one" in result 242 assert " 2|line two" in result 243 assert " 3|line three" in result 244 245 def test_add_line_numbers_with_offset(self, file_ops): 246 content = "continued\nmore" 247 result = file_ops._add_line_numbers(content, start_line=50) 248 assert " 50|continued" in result 249 assert " 51|more" in result 250 251 def test_add_line_numbers_truncates_long_lines(self, file_ops): 252 long_line = "x" * (MAX_LINE_LENGTH + 100) 253 result = file_ops._add_line_numbers(long_line) 254 assert "[truncated]" in result 255 256 def test_unified_diff(self, file_ops): 257 old = "line1\nline2\nline3\n" 258 new = "line1\nchanged\nline3\n" 259 diff = file_ops._unified_diff(old, new, "test.py") 260 assert "-line2" in diff 261 assert "+changed" in diff 262 assert "test.py" in diff 263 264 def test_cwd_from_env(self, mock_env): 265 mock_env.cwd = "/custom/path" 266 ops = ShellFileOperations(mock_env) 267 assert ops.cwd == "/custom/path" 268 269 def test_cwd_fallback_to_slash(self): 270 env = MagicMock(spec=[]) # no cwd attribute 271 ops = ShellFileOperations(env) 272 assert ops.cwd == "/" 273 274 def test_read_file_strips_leaked_terminal_fence_markers(self, mock_env): 275 leaked = ( 276 "'\x07__HERMES_FENCE_a9f7b3__\x1b]0;cat " 277 "'/tmp/test/a.py' 2> /dev/null\x07\n" 278 "print('ok')\n" 279 "__HERMES_FENCE_a9f7b3__\x07'\n" 280 ) 281 282 def side_effect(command, **kwargs): 283 if command.startswith("wc -c"): 284 return {"output": "12\n", "returncode": 0} 285 if command.startswith("head -c"): 286 return {"output": "print('ok')\n", "returncode": 0} 287 if command.startswith("sed -n"): 288 return {"output": leaked, "returncode": 0} 289 if command.startswith("wc -l"): 290 return {"output": "1\n", "returncode": 0} 291 return {"output": "", "returncode": 0} 292 293 mock_env.execute.side_effect = side_effect 294 ops = ShellFileOperations(mock_env) 295 result = ops.read_file("/tmp/test/a.py") 296 297 assert result.error is None 298 assert "HERMES_FENCE" not in result.content 299 assert "\x1b]" not in result.content 300 assert "\x07" not in result.content 301 assert " 1|print('ok')" in result.content 302 303 def test_read_file_raw_strips_leaked_terminal_fence_markers(self, mock_env): 304 leaked = ( 305 "__HERMES_FENCE_a9f7b3__\x07'\n" 306 "alpha\n" 307 "\x1b]0;cat '/tmp/test/a.txt'\x07__HERMES_FENCE_a9f7b3__\n" 308 ) 309 310 def side_effect(command, **kwargs): 311 if command.startswith("wc -c"): 312 return {"output": "6\n", "returncode": 0} 313 if command.startswith("head -c"): 314 return {"output": "alpha\n", "returncode": 0} 315 if command.startswith("cat "): 316 return {"output": leaked, "returncode": 0} 317 return {"output": "", "returncode": 0} 318 319 mock_env.execute.side_effect = side_effect 320 ops = ShellFileOperations(mock_env) 321 result = ops.read_file_raw("/tmp/test/a.txt") 322 323 assert result.error is None 324 assert result.content == "alpha\n" 325 326 327 class TestSearchPathValidation: 328 """Test that search() returns an error for non-existent paths.""" 329 330 def test_search_nonexistent_path_returns_error(self, mock_env): 331 """search() should return an error when the path doesn't exist.""" 332 def side_effect(command, **kwargs): 333 if "test -e" in command: 334 return {"output": "not_found", "returncode": 1} 335 if "command -v" in command: 336 return {"output": "yes", "returncode": 0} 337 return {"output": "", "returncode": 0} 338 mock_env.execute.side_effect = side_effect 339 ops = ShellFileOperations(mock_env) 340 result = ops.search("pattern", path="/nonexistent/path") 341 assert result.error is not None 342 assert "not found" in result.error.lower() or "Path not found" in result.error 343 344 def test_search_nonexistent_path_files_mode(self, mock_env): 345 """search(target='files') should also return error for bad paths.""" 346 def side_effect(command, **kwargs): 347 if "test -e" in command: 348 return {"output": "not_found", "returncode": 1} 349 if "command -v" in command: 350 return {"output": "yes", "returncode": 0} 351 return {"output": "", "returncode": 0} 352 mock_env.execute.side_effect = side_effect 353 ops = ShellFileOperations(mock_env) 354 result = ops.search("*.py", path="/nonexistent/path", target="files") 355 assert result.error is not None 356 assert "not found" in result.error.lower() or "Path not found" in result.error 357 358 def test_search_existing_path_proceeds(self, mock_env): 359 """search() should proceed normally when the path exists.""" 360 def side_effect(command, **kwargs): 361 if "test -e" in command: 362 return {"output": "exists", "returncode": 0} 363 if "command -v" in command: 364 return {"output": "yes", "returncode": 0} 365 # rg returns exit 1 (no matches) with empty output 366 return {"output": "", "returncode": 1} 367 mock_env.execute.side_effect = side_effect 368 ops = ShellFileOperations(mock_env) 369 result = ops.search("pattern", path="/existing/path") 370 assert result.error is None 371 assert result.total_count == 0 # No matches but no error 372 373 def test_search_rg_error_exit_code(self, mock_env): 374 """search() should report error when rg returns exit code 2.""" 375 call_count = {"n": 0} 376 def side_effect(command, **kwargs): 377 call_count["n"] += 1 378 if "test -e" in command: 379 return {"output": "exists", "returncode": 0} 380 if "command -v" in command: 381 return {"output": "yes", "returncode": 0} 382 # rg returns exit 2 (error) with empty output 383 return {"output": "", "returncode": 2} 384 mock_env.execute.side_effect = side_effect 385 ops = ShellFileOperations(mock_env) 386 result = ops.search("pattern", path="/some/path") 387 assert result.error is not None 388 assert "search failed" in result.error.lower() or "Search error" in result.error 389 390 391 class TestShellFileOpsWriteDenied: 392 def test_write_file_denied_path(self, file_ops): 393 result = file_ops.write_file("~/.ssh/authorized_keys", "evil key") 394 assert result.error is not None 395 assert "denied" in result.error.lower() 396 397 def test_patch_replace_denied_path(self, file_ops): 398 result = file_ops.patch_replace("~/.ssh/authorized_keys", "old", "new") 399 assert result.error is not None 400 assert "denied" in result.error.lower() 401 402 def test_delete_file_denied_path(self, file_ops): 403 result = file_ops.delete_file("~/.ssh/authorized_keys") 404 assert result.error is not None 405 assert "denied" in result.error.lower() 406 407 def test_move_file_src_denied(self, file_ops): 408 result = file_ops.move_file("~/.ssh/id_rsa", "/tmp/dest.txt") 409 assert result.error is not None 410 assert "denied" in result.error.lower() 411 412 def test_move_file_dst_denied(self, file_ops): 413 result = file_ops.move_file("/tmp/src.txt", "~/.aws/credentials") 414 assert result.error is not None 415 assert "denied" in result.error.lower() 416 417 def test_move_file_failure_path(self, mock_env): 418 mock_env.execute.return_value = {"output": "No such file or directory", "returncode": 1} 419 ops = ShellFileOperations(mock_env) 420 result = ops.move_file("/tmp/nonexistent.txt", "/tmp/dest.txt") 421 assert result.error is not None 422 assert "Failed to move" in result.error 423 424 425 class TestPatchReplacePostWriteVerification: 426 """Tests for the post-write verification added in patch_replace. 427 428 Confirms that a silent persistence failure (where write_file's command 429 appears to succeed but the bytes on disk don't match new_content) is 430 surfaced as an error instead of being reported as a successful patch. 431 """ 432 433 def test_patch_replace_fails_when_file_not_persisted(self, mock_env): 434 """write_file reports success but the re-read returns old content: 435 patch_replace must return an error, not success-with-diff.""" 436 file_contents = {"/tmp/test/a.py": "hello world\n"} 437 438 def side_effect(command, **kwargs): 439 # cat reads the file — both the initial read and the verify read 440 if command.startswith("cat "): 441 # Extract path from cat command (strip quotes) 442 for path in file_contents: 443 if path in command: 444 return {"output": file_contents[path], "returncode": 0} 445 return {"output": "", "returncode": 1} 446 # mkdir for parent dir 447 if command.startswith("mkdir "): 448 return {"output": "", "returncode": 0} 449 # wc -c for byte count after write 450 if command.startswith("wc -c"): 451 for path in file_contents: 452 if path in command: 453 return {"output": str(len(file_contents[path].encode())), "returncode": 0} 454 return {"output": "0", "returncode": 0} 455 # Everything else (including the write itself) pretends to succeed 456 # but DOESN'T update file_contents — simulates silent failure 457 return {"output": "", "returncode": 0} 458 459 mock_env.execute.side_effect = side_effect 460 ops = ShellFileOperations(mock_env) 461 result = ops.patch_replace("/tmp/test/a.py", "hello", "hi") 462 assert result.error is not None, ( 463 "Silent persistence failure must surface as error, got: " 464 f"success={result.success}, diff={result.diff}" 465 ) 466 assert "verification failed" in result.error.lower() 467 assert "did not persist" in result.error.lower() 468 469 def test_patch_replace_succeeds_when_file_persisted(self, mock_env): 470 """Normal success path: write persists, verify read returns new bytes.""" 471 state = {"content": "hello world\n"} 472 473 def side_effect(command, stdin_data=None, **kwargs): 474 # Write is `cat > path` — detect by the `>` redirect, NOT just `cat ` 475 if command.startswith("cat >"): 476 if stdin_data is not None: 477 state["content"] = stdin_data 478 return {"output": "", "returncode": 0} 479 if command.startswith("cat "): # read 480 return {"output": state["content"], "returncode": 0} 481 if command.startswith("mkdir "): 482 return {"output": "", "returncode": 0} 483 if command.startswith("wc -c"): 484 return {"output": str(len(state["content"].encode())), "returncode": 0} 485 return {"output": "", "returncode": 0} 486 487 mock_env.execute.side_effect = side_effect 488 ops = ShellFileOperations(mock_env) 489 result = ops.patch_replace("/tmp/test/a.py", "hello", "hi") 490 assert result.error is None, f"Unexpected error: {result.error}" 491 assert result.success is True 492 assert state["content"] == "hi world\n", f"File not actually updated: {state['content']!r}" 493 494 def test_patch_replace_fails_when_verify_read_errors(self, mock_env): 495 """If the verify-read step itself fails (exit code != 0), return an error.""" 496 call_count = {"cat": 0} 497 state = {"content": "hello world\n"} 498 499 def side_effect(command, stdin_data=None, **kwargs): 500 if command.startswith("cat >"): # write 501 if stdin_data is not None: 502 state["content"] = stdin_data 503 return {"output": "", "returncode": 0} 504 if command.startswith("cat "): # read 505 call_count["cat"] += 1 506 # First read (initial fetch) succeeds; second read (verify) fails 507 if call_count["cat"] == 1: 508 return {"output": state["content"], "returncode": 0} 509 return {"output": "", "returncode": 1} 510 if command.startswith("mkdir "): 511 return {"output": "", "returncode": 0} 512 if command.startswith("wc -c"): 513 return {"output": str(len(state["content"].encode())), "returncode": 0} 514 return {"output": "", "returncode": 0} 515 516 mock_env.execute.side_effect = side_effect 517 ops = ShellFileOperations(mock_env) 518 result = ops.patch_replace("/tmp/test/a.py", "hello", "hi") 519 assert result.error is not None 520 assert "could not re-read" in result.error.lower()