/ tests / tools / test_file_operations.py
test_file_operations.py
  1  """Tests for tools/file_operations.py — deny list, result dataclasses, helpers."""
  2  
  3  import os
  4  import pytest
  5  from pathlib import Path
  6  from unittest.mock import MagicMock
  7  
  8  from tools.file_operations import (
  9      _is_write_denied,
 10      WRITE_DENIED_PATHS,
 11      WRITE_DENIED_PREFIXES,
 12      ReadResult,
 13      WriteResult,
 14      PatchResult,
 15      SearchResult,
 16      SearchMatch,
 17      LintResult,
 18      ShellFileOperations,
 19      BINARY_EXTENSIONS,
 20      IMAGE_EXTENSIONS,
 21      MAX_LINE_LENGTH,
 22      normalize_read_pagination,
 23      normalize_search_pagination,
 24  )
 25  
 26  
 27  # =========================================================================
 28  # Write deny list
 29  # =========================================================================
 30  
 31  class TestIsWriteDenied:
 32      def test_ssh_authorized_keys_denied(self):
 33          path = os.path.join(str(Path.home()), ".ssh", "authorized_keys")
 34          assert _is_write_denied(path) is True
 35  
 36      def test_ssh_id_rsa_denied(self):
 37          path = os.path.join(str(Path.home()), ".ssh", "id_rsa")
 38          assert _is_write_denied(path) is True
 39  
 40      def test_netrc_denied(self):
 41          path = os.path.join(str(Path.home()), ".netrc")
 42          assert _is_write_denied(path) is True
 43  
 44      def test_aws_prefix_denied(self):
 45          path = os.path.join(str(Path.home()), ".aws", "credentials")
 46          assert _is_write_denied(path) is True
 47  
 48      def test_kube_prefix_denied(self):
 49          path = os.path.join(str(Path.home()), ".kube", "config")
 50          assert _is_write_denied(path) is True
 51  
 52      def test_normal_file_allowed(self, tmp_path):
 53          path = str(tmp_path / "safe_file.txt")
 54          assert _is_write_denied(path) is False
 55  
 56      def test_project_file_allowed(self):
 57          assert _is_write_denied("/tmp/project/main.py") is False
 58  
 59      def test_tilde_expansion(self):
 60          assert _is_write_denied("~/.ssh/authorized_keys") is True
 61  
 62  
 63  
 64  # =========================================================================
 65  # Result dataclasses
 66  # =========================================================================
 67  
 68  class TestReadResult:
 69      def test_to_dict_omits_defaults(self):
 70          r = ReadResult()
 71          d = r.to_dict()
 72          assert "error" not in d    # None omitted
 73          assert "similar_files" not in d  # empty list omitted
 74  
 75      def test_to_dict_preserves_empty_content(self):
 76          """Empty file should still have content key in the dict."""
 77          r = ReadResult(content="", total_lines=0, file_size=0)
 78          d = r.to_dict()
 79          assert "content" in d
 80          assert d["content"] == ""
 81          assert d["total_lines"] == 0
 82          assert d["file_size"] == 0
 83  
 84      def test_to_dict_includes_values(self):
 85          r = ReadResult(content="hello", total_lines=10, file_size=50, truncated=True)
 86          d = r.to_dict()
 87          assert d["content"] == "hello"
 88          assert d["total_lines"] == 10
 89          assert d["truncated"] is True
 90  
 91      def test_binary_fields(self):
 92          r = ReadResult(is_binary=True, is_image=True, mime_type="image/png")
 93          d = r.to_dict()
 94          assert d["is_binary"] is True
 95          assert d["is_image"] is True
 96          assert d["mime_type"] == "image/png"
 97  
 98  
 99  class TestWriteResult:
100      def test_to_dict_omits_none(self):
101          r = WriteResult(bytes_written=100)
102          d = r.to_dict()
103          assert d["bytes_written"] == 100
104          assert "error" not in d
105          assert "warning" not in d
106  
107      def test_to_dict_includes_error(self):
108          r = WriteResult(error="Permission denied")
109          d = r.to_dict()
110          assert d["error"] == "Permission denied"
111  
112  
113  class TestPatchResult:
114      def test_to_dict_success(self):
115          r = PatchResult(success=True, diff="--- a\n+++ b", files_modified=["a.py"])
116          d = r.to_dict()
117          assert d["success"] is True
118          assert d["diff"] == "--- a\n+++ b"
119          assert d["files_modified"] == ["a.py"]
120  
121      def test_to_dict_error(self):
122          r = PatchResult(error="File not found")
123          d = r.to_dict()
124          assert d["success"] is False
125          assert d["error"] == "File not found"
126  
127  
128  class TestSearchResult:
129      def test_to_dict_with_matches(self):
130          m = SearchMatch(path="a.py", line_number=10, content="hello")
131          r = SearchResult(matches=[m], total_count=1)
132          d = r.to_dict()
133          assert d["total_count"] == 1
134          assert len(d["matches"]) == 1
135          assert d["matches"][0]["path"] == "a.py"
136  
137      def test_to_dict_empty(self):
138          r = SearchResult()
139          d = r.to_dict()
140          assert d["total_count"] == 0
141          assert "matches" not in d
142  
143      def test_to_dict_files_mode(self):
144          r = SearchResult(files=["a.py", "b.py"], total_count=2)
145          d = r.to_dict()
146          assert d["files"] == ["a.py", "b.py"]
147  
148      def test_to_dict_count_mode(self):
149          r = SearchResult(counts={"a.py": 3, "b.py": 1}, total_count=4)
150          d = r.to_dict()
151          assert d["counts"]["a.py"] == 3
152  
153      def test_truncated_flag(self):
154          r = SearchResult(total_count=100, truncated=True)
155          d = r.to_dict()
156          assert d["truncated"] is True
157  
158  
159  class TestLintResult:
160      def test_skipped(self):
161          r = LintResult(skipped=True, message="No linter for .md files")
162          d = r.to_dict()
163          assert d["status"] == "skipped"
164          assert d["message"] == "No linter for .md files"
165  
166      def test_success(self):
167          r = LintResult(success=True, output="")
168          d = r.to_dict()
169          assert d["status"] == "ok"
170  
171      def test_error(self):
172          r = LintResult(success=False, output="SyntaxError line 5")
173          d = r.to_dict()
174          assert d["status"] == "error"
175          assert "SyntaxError" in d["output"]
176  
177  
178  # =========================================================================
179  # ShellFileOperations helpers
180  # =========================================================================
181  
182  @pytest.fixture()
183  def mock_env():
184      """Create a mock terminal environment."""
185      env = MagicMock()
186      env.cwd = "/tmp/test"
187      env.execute.return_value = {"output": "", "returncode": 0}
188      return env
189  
190  
191  @pytest.fixture()
192  def file_ops(mock_env):
193      return ShellFileOperations(mock_env)
194  
195  
196  class TestShellFileOpsHelpers:
197      def test_normalize_read_pagination_clamps_invalid_values(self):
198          assert normalize_read_pagination(offset=0, limit=0) == (1, 1)
199          assert normalize_read_pagination(offset=-10, limit=-5) == (1, 1)
200          assert normalize_read_pagination(offset="bad", limit="bad") == (1, 500)
201          assert normalize_read_pagination(offset=2, limit=999999) == (2, 2000)
202  
203      def test_normalize_search_pagination_clamps_invalid_values(self):
204          assert normalize_search_pagination(offset=-10, limit=-5) == (0, 1)
205          assert normalize_search_pagination(offset="bad", limit="bad") == (0, 50)
206          assert normalize_search_pagination(offset=3, limit=0) == (3, 1)
207  
208      def test_escape_shell_arg_simple(self, file_ops):
209          assert file_ops._escape_shell_arg("hello") == "'hello'"
210  
211      def test_escape_shell_arg_with_quotes(self, file_ops):
212          result = file_ops._escape_shell_arg("it's")
213          assert "'" in result
214          # Should be safely escaped
215          assert result.count("'") >= 4  # wrapping + escaping
216  
217      def test_is_likely_binary_by_extension(self, file_ops):
218          assert file_ops._is_likely_binary("photo.png") is True
219          assert file_ops._is_likely_binary("data.db") is True
220          assert file_ops._is_likely_binary("code.py") is False
221          assert file_ops._is_likely_binary("readme.md") is False
222  
223      def test_is_likely_binary_by_content(self, file_ops):
224          # High ratio of non-printable chars -> binary
225          binary_content = "\x00\x01\x02\x03" * 250
226          assert file_ops._is_likely_binary("unknown", binary_content) is True
227  
228          # Normal text -> not binary
229          assert file_ops._is_likely_binary("unknown", "Hello world\nLine 2\n") is False
230  
231      def test_is_image(self, file_ops):
232          assert file_ops._is_image("photo.png") is True
233          assert file_ops._is_image("pic.jpg") is True
234          assert file_ops._is_image("icon.ico") is True
235          assert file_ops._is_image("data.pdf") is False
236          assert file_ops._is_image("code.py") is False
237  
238      def test_add_line_numbers(self, file_ops):
239          content = "line one\nline two\nline three"
240          result = file_ops._add_line_numbers(content)
241          assert "     1|line one" in result
242          assert "     2|line two" in result
243          assert "     3|line three" in result
244  
245      def test_add_line_numbers_with_offset(self, file_ops):
246          content = "continued\nmore"
247          result = file_ops._add_line_numbers(content, start_line=50)
248          assert "    50|continued" in result
249          assert "    51|more" in result
250  
251      def test_add_line_numbers_truncates_long_lines(self, file_ops):
252          long_line = "x" * (MAX_LINE_LENGTH + 100)
253          result = file_ops._add_line_numbers(long_line)
254          assert "[truncated]" in result
255  
256      def test_unified_diff(self, file_ops):
257          old = "line1\nline2\nline3\n"
258          new = "line1\nchanged\nline3\n"
259          diff = file_ops._unified_diff(old, new, "test.py")
260          assert "-line2" in diff
261          assert "+changed" in diff
262          assert "test.py" in diff
263  
264      def test_cwd_from_env(self, mock_env):
265          mock_env.cwd = "/custom/path"
266          ops = ShellFileOperations(mock_env)
267          assert ops.cwd == "/custom/path"
268  
269      def test_cwd_fallback_to_slash(self):
270          env = MagicMock(spec=[])  # no cwd attribute
271          ops = ShellFileOperations(env)
272          assert ops.cwd == "/"
273  
274      def test_read_file_strips_leaked_terminal_fence_markers(self, mock_env):
275          leaked = (
276              "'\x07__HERMES_FENCE_a9f7b3__\x1b]0;cat "
277              "'/tmp/test/a.py' 2> /dev/null\x07\n"
278              "print('ok')\n"
279              "__HERMES_FENCE_a9f7b3__\x07'\n"
280          )
281  
282          def side_effect(command, **kwargs):
283              if command.startswith("wc -c"):
284                  return {"output": "12\n", "returncode": 0}
285              if command.startswith("head -c"):
286                  return {"output": "print('ok')\n", "returncode": 0}
287              if command.startswith("sed -n"):
288                  return {"output": leaked, "returncode": 0}
289              if command.startswith("wc -l"):
290                  return {"output": "1\n", "returncode": 0}
291              return {"output": "", "returncode": 0}
292  
293          mock_env.execute.side_effect = side_effect
294          ops = ShellFileOperations(mock_env)
295          result = ops.read_file("/tmp/test/a.py")
296  
297          assert result.error is None
298          assert "HERMES_FENCE" not in result.content
299          assert "\x1b]" not in result.content
300          assert "\x07" not in result.content
301          assert "     1|print('ok')" in result.content
302  
303      def test_read_file_raw_strips_leaked_terminal_fence_markers(self, mock_env):
304          leaked = (
305              "__HERMES_FENCE_a9f7b3__\x07'\n"
306              "alpha\n"
307              "\x1b]0;cat '/tmp/test/a.txt'\x07__HERMES_FENCE_a9f7b3__\n"
308          )
309  
310          def side_effect(command, **kwargs):
311              if command.startswith("wc -c"):
312                  return {"output": "6\n", "returncode": 0}
313              if command.startswith("head -c"):
314                  return {"output": "alpha\n", "returncode": 0}
315              if command.startswith("cat "):
316                  return {"output": leaked, "returncode": 0}
317              return {"output": "", "returncode": 0}
318  
319          mock_env.execute.side_effect = side_effect
320          ops = ShellFileOperations(mock_env)
321          result = ops.read_file_raw("/tmp/test/a.txt")
322  
323          assert result.error is None
324          assert result.content == "alpha\n"
325  
326  
327  class TestSearchPathValidation:
328      """Test that search() returns an error for non-existent paths."""
329  
330      def test_search_nonexistent_path_returns_error(self, mock_env):
331          """search() should return an error when the path doesn't exist."""
332          def side_effect(command, **kwargs):
333              if "test -e" in command:
334                  return {"output": "not_found", "returncode": 1}
335              if "command -v" in command:
336                  return {"output": "yes", "returncode": 0}
337              return {"output": "", "returncode": 0}
338          mock_env.execute.side_effect = side_effect
339          ops = ShellFileOperations(mock_env)
340          result = ops.search("pattern", path="/nonexistent/path")
341          assert result.error is not None
342          assert "not found" in result.error.lower() or "Path not found" in result.error
343  
344      def test_search_nonexistent_path_files_mode(self, mock_env):
345          """search(target='files') should also return error for bad paths."""
346          def side_effect(command, **kwargs):
347              if "test -e" in command:
348                  return {"output": "not_found", "returncode": 1}
349              if "command -v" in command:
350                  return {"output": "yes", "returncode": 0}
351              return {"output": "", "returncode": 0}
352          mock_env.execute.side_effect = side_effect
353          ops = ShellFileOperations(mock_env)
354          result = ops.search("*.py", path="/nonexistent/path", target="files")
355          assert result.error is not None
356          assert "not found" in result.error.lower() or "Path not found" in result.error
357  
358      def test_search_existing_path_proceeds(self, mock_env):
359          """search() should proceed normally when the path exists."""
360          def side_effect(command, **kwargs):
361              if "test -e" in command:
362                  return {"output": "exists", "returncode": 0}
363              if "command -v" in command:
364                  return {"output": "yes", "returncode": 0}
365              # rg returns exit 1 (no matches) with empty output
366              return {"output": "", "returncode": 1}
367          mock_env.execute.side_effect = side_effect
368          ops = ShellFileOperations(mock_env)
369          result = ops.search("pattern", path="/existing/path")
370          assert result.error is None
371          assert result.total_count == 0  # No matches but no error
372  
373      def test_search_rg_error_exit_code(self, mock_env):
374          """search() should report error when rg returns exit code 2."""
375          call_count = {"n": 0}
376          def side_effect(command, **kwargs):
377              call_count["n"] += 1
378              if "test -e" in command:
379                  return {"output": "exists", "returncode": 0}
380              if "command -v" in command:
381                  return {"output": "yes", "returncode": 0}
382              # rg returns exit 2 (error) with empty output
383              return {"output": "", "returncode": 2}
384          mock_env.execute.side_effect = side_effect
385          ops = ShellFileOperations(mock_env)
386          result = ops.search("pattern", path="/some/path")
387          assert result.error is not None
388          assert "search failed" in result.error.lower() or "Search error" in result.error
389  
390  
391  class TestShellFileOpsWriteDenied:
392      def test_write_file_denied_path(self, file_ops):
393          result = file_ops.write_file("~/.ssh/authorized_keys", "evil key")
394          assert result.error is not None
395          assert "denied" in result.error.lower()
396  
397      def test_patch_replace_denied_path(self, file_ops):
398          result = file_ops.patch_replace("~/.ssh/authorized_keys", "old", "new")
399          assert result.error is not None
400          assert "denied" in result.error.lower()
401  
402      def test_delete_file_denied_path(self, file_ops):
403          result = file_ops.delete_file("~/.ssh/authorized_keys")
404          assert result.error is not None
405          assert "denied" in result.error.lower()
406  
407      def test_move_file_src_denied(self, file_ops):
408          result = file_ops.move_file("~/.ssh/id_rsa", "/tmp/dest.txt")
409          assert result.error is not None
410          assert "denied" in result.error.lower()
411  
412      def test_move_file_dst_denied(self, file_ops):
413          result = file_ops.move_file("/tmp/src.txt", "~/.aws/credentials")
414          assert result.error is not None
415          assert "denied" in result.error.lower()
416  
417      def test_move_file_failure_path(self, mock_env):
418          mock_env.execute.return_value = {"output": "No such file or directory", "returncode": 1}
419          ops = ShellFileOperations(mock_env)
420          result = ops.move_file("/tmp/nonexistent.txt", "/tmp/dest.txt")
421          assert result.error is not None
422          assert "Failed to move" in result.error
423  
424  
425  class TestPatchReplacePostWriteVerification:
426      """Tests for the post-write verification added in patch_replace.
427  
428      Confirms that a silent persistence failure (where write_file's command
429      appears to succeed but the bytes on disk don't match new_content) is
430      surfaced as an error instead of being reported as a successful patch.
431      """
432  
433      def test_patch_replace_fails_when_file_not_persisted(self, mock_env):
434          """write_file reports success but the re-read returns old content:
435          patch_replace must return an error, not success-with-diff."""
436          file_contents = {"/tmp/test/a.py": "hello world\n"}
437  
438          def side_effect(command, **kwargs):
439              # cat reads the file — both the initial read and the verify read
440              if command.startswith("cat "):
441                  # Extract path from cat command (strip quotes)
442                  for path in file_contents:
443                      if path in command:
444                          return {"output": file_contents[path], "returncode": 0}
445                  return {"output": "", "returncode": 1}
446              # mkdir for parent dir
447              if command.startswith("mkdir "):
448                  return {"output": "", "returncode": 0}
449              # wc -c for byte count after write
450              if command.startswith("wc -c"):
451                  for path in file_contents:
452                      if path in command:
453                          return {"output": str(len(file_contents[path].encode())), "returncode": 0}
454                  return {"output": "0", "returncode": 0}
455              # Everything else (including the write itself) pretends to succeed
456              # but DOESN'T update file_contents — simulates silent failure
457              return {"output": "", "returncode": 0}
458  
459          mock_env.execute.side_effect = side_effect
460          ops = ShellFileOperations(mock_env)
461          result = ops.patch_replace("/tmp/test/a.py", "hello", "hi")
462          assert result.error is not None, (
463              "Silent persistence failure must surface as error, got: "
464              f"success={result.success}, diff={result.diff}"
465          )
466          assert "verification failed" in result.error.lower()
467          assert "did not persist" in result.error.lower()
468  
469      def test_patch_replace_succeeds_when_file_persisted(self, mock_env):
470          """Normal success path: write persists, verify read returns new bytes."""
471          state = {"content": "hello world\n"}
472  
473          def side_effect(command, stdin_data=None, **kwargs):
474              # Write is `cat > path` — detect by the `>` redirect, NOT just `cat `
475              if command.startswith("cat >"):
476                  if stdin_data is not None:
477                      state["content"] = stdin_data
478                  return {"output": "", "returncode": 0}
479              if command.startswith("cat "):  # read
480                  return {"output": state["content"], "returncode": 0}
481              if command.startswith("mkdir "):
482                  return {"output": "", "returncode": 0}
483              if command.startswith("wc -c"):
484                  return {"output": str(len(state["content"].encode())), "returncode": 0}
485              return {"output": "", "returncode": 0}
486  
487          mock_env.execute.side_effect = side_effect
488          ops = ShellFileOperations(mock_env)
489          result = ops.patch_replace("/tmp/test/a.py", "hello", "hi")
490          assert result.error is None, f"Unexpected error: {result.error}"
491          assert result.success is True
492          assert state["content"] == "hi world\n", f"File not actually updated: {state['content']!r}"
493  
494      def test_patch_replace_fails_when_verify_read_errors(self, mock_env):
495          """If the verify-read step itself fails (exit code != 0), return an error."""
496          call_count = {"cat": 0}
497          state = {"content": "hello world\n"}
498  
499          def side_effect(command, stdin_data=None, **kwargs):
500              if command.startswith("cat >"):  # write
501                  if stdin_data is not None:
502                      state["content"] = stdin_data
503                  return {"output": "", "returncode": 0}
504              if command.startswith("cat "):  # read
505                  call_count["cat"] += 1
506                  # First read (initial fetch) succeeds; second read (verify) fails
507                  if call_count["cat"] == 1:
508                      return {"output": state["content"], "returncode": 0}
509                  return {"output": "", "returncode": 1}
510              if command.startswith("mkdir "):
511                  return {"output": "", "returncode": 0}
512              if command.startswith("wc -c"):
513                  return {"output": str(len(state["content"].encode())), "returncode": 0}
514              return {"output": "", "returncode": 0}
515  
516          mock_env.execute.side_effect = side_effect
517          ops = ShellFileOperations(mock_env)
518          result = ops.patch_replace("/tmp/test/a.py", "hello", "hi")
519          assert result.error is not None
520          assert "could not re-read" in result.error.lower()