test_copilot_acp_client.py
1 """Focused regressions for the Copilot ACP shim safety layer.""" 2 3 from __future__ import annotations 4 5 import io 6 import json 7 import os 8 import tempfile 9 import unittest 10 from pathlib import Path 11 from unittest.mock import patch 12 13 from agent.copilot_acp_client import CopilotACPClient 14 15 16 class _FakeProcess: 17 def __init__(self) -> None: 18 self.stdin = io.StringIO() 19 20 21 class CopilotACPClientSafetyTests(unittest.TestCase): 22 def setUp(self) -> None: 23 self.client = CopilotACPClient(acp_cwd="/tmp") 24 25 def _dispatch(self, message: dict, *, cwd: str) -> dict: 26 process = _FakeProcess() 27 handled = self.client._handle_server_message( 28 message, 29 process=process, 30 cwd=cwd, 31 text_parts=[], 32 reasoning_parts=[], 33 ) 34 self.assertTrue(handled) 35 payload = process.stdin.getvalue().strip() 36 self.assertTrue(payload) 37 return json.loads(payload) 38 39 def test_request_permission_is_not_auto_allowed(self) -> None: 40 response = self._dispatch( 41 { 42 "jsonrpc": "2.0", 43 "id": 1, 44 "method": "session/request_permission", 45 "params": {}, 46 }, 47 cwd="/tmp", 48 ) 49 50 outcome = (((response.get("result") or {}).get("outcome") or {}).get("outcome")) 51 self.assertEqual(outcome, "cancelled") 52 53 def test_read_text_file_blocks_internal_hermes_hub_files(self) -> None: 54 with tempfile.TemporaryDirectory() as tmpdir: 55 home = Path(tmpdir) / "home" 56 blocked = home / ".hermes" / "skills" / ".hub" / "index-cache" / "entry.json" 57 blocked.parent.mkdir(parents=True, exist_ok=True) 58 blocked.write_text('{"token":"sk-test-secret-1234567890"}') 59 60 with patch.dict( 61 os.environ, 62 {"HOME": str(home), "HERMES_HOME": str(home / ".hermes")}, 63 clear=False, 64 ): 65 response = self._dispatch( 66 { 67 "jsonrpc": "2.0", 68 "id": 2, 69 "method": "fs/read_text_file", 70 "params": {"path": str(blocked)}, 71 }, 72 cwd=str(home), 73 ) 74 75 self.assertIn("error", response) 76 77 def test_read_text_file_redacts_sensitive_content(self) -> None: 78 with tempfile.TemporaryDirectory() as tmpdir: 79 root = Path(tmpdir) 80 secret_file = root / "config.env" 81 secret_file.write_text("OPENAI_API_KEY=sk-proj-abc123def456ghi789jkl012") 82 83 # agent.redact snapshots HERMES_REDACT_SECRETS at import time into 84 # _REDACT_ENABLED, so patching os.environ is a no-op. Flip the 85 # module-level constant directly for the duration of the call. 86 with patch("agent.redact._REDACT_ENABLED", True): 87 response = self._dispatch( 88 { 89 "jsonrpc": "2.0", 90 "id": 3, 91 "method": "fs/read_text_file", 92 "params": {"path": str(secret_file)}, 93 }, 94 cwd=str(root), 95 ) 96 97 content = ((response.get("result") or {}).get("content") or "") 98 self.assertNotIn("abc123def456", content) 99 self.assertIn("OPENAI_API_KEY=", content) 100 101 def test_write_text_file_reuses_write_denylist(self) -> None: 102 with tempfile.TemporaryDirectory() as tmpdir: 103 home = Path(tmpdir) / "home" 104 target = home / ".ssh" / "id_rsa" 105 target.parent.mkdir(parents=True, exist_ok=True) 106 107 with patch("agent.copilot_acp_client.is_write_denied", return_value=True, create=True): 108 response = self._dispatch( 109 { 110 "jsonrpc": "2.0", 111 "id": 4, 112 "method": "fs/write_text_file", 113 "params": { 114 "path": str(target), 115 "content": "fake-private-key", 116 }, 117 }, 118 cwd=str(home), 119 ) 120 121 self.assertIn("error", response) 122 self.assertFalse(target.exists()) 123 124 def test_write_text_file_respects_safe_root(self) -> None: 125 with tempfile.TemporaryDirectory() as tmpdir: 126 root = Path(tmpdir) 127 safe_root = root / "workspace" 128 safe_root.mkdir() 129 outside = root / "outside.txt" 130 131 with patch.dict(os.environ, {"HERMES_WRITE_SAFE_ROOT": str(safe_root)}, clear=False): 132 response = self._dispatch( 133 { 134 "jsonrpc": "2.0", 135 "id": 5, 136 "method": "fs/write_text_file", 137 "params": { 138 "path": str(outside), 139 "content": "should-not-write", 140 }, 141 }, 142 cwd=str(root), 143 ) 144 145 self.assertIn("error", response) 146 self.assertFalse(outside.exists()) 147 148 149 if __name__ == "__main__": 150 unittest.main() 151 152 153 # ── HOME env propagation tests (from PR #11285) ───────────────────── 154 155 from unittest.mock import patch as _patch 156 import pytest 157 158 159 def _make_home_client(tmp_path): 160 return CopilotACPClient( 161 api_key="copilot-acp", 162 base_url="acp://copilot", 163 acp_command="copilot", 164 acp_args=["--acp", "--stdio"], 165 acp_cwd=str(tmp_path), 166 ) 167 168 169 def _fake_popen_capture(captured): 170 def _fake(cmd, **kwargs): 171 captured["cmd"] = cmd 172 captured["kwargs"] = kwargs 173 raise FileNotFoundError("copilot not found") 174 return _fake 175 176 177 def test_run_prompt_prefers_profile_home_when_available(monkeypatch, tmp_path): 178 hermes_home = tmp_path / "hermes" 179 profile_home = hermes_home / "home" 180 profile_home.mkdir(parents=True) 181 182 monkeypatch.delenv("HOME", raising=False) 183 monkeypatch.setenv("HERMES_HOME", str(hermes_home)) 184 185 captured = {} 186 client = _make_home_client(tmp_path) 187 188 with _patch("agent.copilot_acp_client.subprocess.Popen", side_effect=_fake_popen_capture(captured)): 189 with pytest.raises(RuntimeError, match="Could not start Copilot ACP command"): 190 client._run_prompt("hello", timeout_seconds=1) 191 192 assert captured["kwargs"]["env"]["HOME"] == str(profile_home) 193 194 195 def test_run_prompt_passes_home_when_parent_env_is_clean(monkeypatch, tmp_path): 196 monkeypatch.delenv("HOME", raising=False) 197 monkeypatch.delenv("HERMES_HOME", raising=False) 198 199 captured = {} 200 client = _make_home_client(tmp_path) 201 202 with _patch("agent.copilot_acp_client.subprocess.Popen", side_effect=_fake_popen_capture(captured)): 203 with pytest.raises(RuntimeError, match="Could not start Copilot ACP command"): 204 client._run_prompt("hello", timeout_seconds=1) 205 206 assert "env" in captured["kwargs"] 207 assert captured["kwargs"]["env"]["HOME"]