test_mcp_server.py
1 """E2E tests for the MCP server authorization.""" 2 3 import json 4 import os 5 import shutil 6 import socket 7 import subprocess 8 import time 9 import urllib.error 10 import urllib.request 11 12 import pytest 13 14 15 def _find_free_port() -> int: 16 with socket.socket() as s: 17 s.bind(("127.0.0.1", 0)) 18 return s.getsockname()[1] 19 20 21 def _wait_for_port(port: int, timeout: float = 10.0) -> bool: 22 deadline = time.monotonic() + timeout 23 while time.monotonic() < deadline: 24 try: 25 with socket.create_connection(("127.0.0.1", port), timeout=1): 26 return True 27 except OSError: 28 time.sleep(0.2) 29 return False 30 31 32 def _find_binary(env_var: str, name: str): 33 """Resolve a binary from env var, PATH, or target/debug.""" 34 from pathlib import Path 35 36 if path := os.environ.get(env_var): 37 p = Path(path) 38 if p.exists(): 39 return p 40 41 if found := shutil.which(name): 42 return Path(found) 43 44 workspace_root = Path(__file__).resolve().parent.parent.parent 45 debug_path = workspace_root / "target" / "debug" / name 46 if debug_path.exists(): 47 return debug_path 48 49 return None 50 51 52 @pytest.fixture(scope="module") 53 def auths_mcp_server_bin(): 54 """Path to the `auths-mcp-server` binary.""" 55 path = _find_binary("AUTHS_MCP_SERVER_BIN", "auths-mcp-server") 56 if path is None: 57 pytest.skip("auths-mcp-server binary not found") 58 return path 59 60 61 @pytest.fixture(scope="module") 62 def mcp_server(tmp_path_factory, auths_mcp_server_bin): 63 """Spawn the MCP server for the test module. 64 65 Requires an OIDC bridge to be running for JWKS validation. 66 For now, the MCP server starts without a live bridge; tests that 67 need token validation will need the bridge fixture too. 68 """ 69 if not shutil.which("openssl"): 70 pytest.skip("openssl CLI not found") 71 72 # Find the OIDC bridge binary 73 oidc_bin = _find_binary("AUTHS_OIDC_BRIDGE_BIN", "auths-oidc-bridge") 74 if oidc_bin is None: 75 pytest.skip("auths-oidc-bridge binary not found") 76 77 work_dir = tmp_path_factory.mktemp("mcp") 78 key_path = work_dir / "signing_key.pem" 79 80 subprocess.run( 81 ["openssl", "genrsa", "-out", str(key_path), "2048"], 82 check=True, 83 capture_output=True, 84 ) 85 86 # Start the OIDC bridge 87 bridge_port = _find_free_port() 88 bridge_env = { 89 "PATH": os.environ.get("PATH", "/usr/bin:/bin"), 90 "AUTHS_OIDC_BIND_ADDR": f"127.0.0.1:{bridge_port}", 91 "AUTHS_OIDC_SIGNING_KEY_PATH": str(key_path), 92 "AUTHS_OIDC_ISSUER_URL": f"http://127.0.0.1:{bridge_port}", 93 "AUTHS_OIDC_AUDIENCE": "auths-mcp-server", 94 "RUST_LOG": "warn", 95 } 96 97 bridge_proc = subprocess.Popen( 98 [str(oidc_bin)], 99 env=bridge_env, 100 stdout=subprocess.PIPE, 101 stderr=subprocess.PIPE, 102 ) 103 104 if not _wait_for_port(bridge_port, timeout=15): 105 bridge_proc.terminate() 106 bridge_proc.wait(timeout=5) 107 pytest.skip("OIDC bridge failed to start") 108 109 # Start the MCP server 110 mcp_port = _find_free_port() 111 mcp_env = { 112 "PATH": os.environ.get("PATH", "/usr/bin:/bin"), 113 "AUTHS_MCP_BIND_ADDR": f"127.0.0.1:{mcp_port}", 114 "AUTHS_MCP_JWKS_URL": f"http://127.0.0.1:{bridge_port}/.well-known/jwks.json", 115 "AUTHS_MCP_EXPECTED_ISSUER": f"http://127.0.0.1:{bridge_port}", 116 "AUTHS_MCP_EXPECTED_AUDIENCE": "auths-mcp-server", 117 "RUST_LOG": "warn", 118 } 119 120 mcp_proc = subprocess.Popen( 121 [str(auths_mcp_server_bin)], 122 env=mcp_env, 123 stdout=subprocess.PIPE, 124 stderr=subprocess.PIPE, 125 ) 126 127 if not _wait_for_port(mcp_port, timeout=15): 128 mcp_proc.terminate() 129 mcp_proc.wait(timeout=5) 130 bridge_proc.terminate() 131 bridge_proc.wait(timeout=5) 132 pytest.skip("MCP server failed to start") 133 134 yield { 135 "mcp_proc": mcp_proc, 136 "mcp_port": mcp_port, 137 "mcp_url": f"http://127.0.0.1:{mcp_port}", 138 "bridge_proc": bridge_proc, 139 "bridge_port": bridge_port, 140 "bridge_url": f"http://127.0.0.1:{bridge_port}", 141 "key_path": key_path, 142 } 143 144 mcp_proc.terminate() 145 bridge_proc.terminate() 146 try: 147 mcp_proc.wait(timeout=5) 148 bridge_proc.wait(timeout=5) 149 except subprocess.TimeoutExpired: 150 mcp_proc.kill() 151 bridge_proc.kill() 152 153 154 @pytest.mark.slow 155 @pytest.mark.requires_binary 156 class TestMcpServer: 157 def test_mcp_health(self, mcp_server): 158 """Health endpoint should return 200 without authentication.""" 159 url = f"{mcp_server['mcp_url']}/health" 160 with urllib.request.urlopen(url, timeout=5) as resp: 161 assert resp.status == 200 162 body = json.loads(resp.read()) 163 assert body["status"] == "ok" 164 165 def test_mcp_protected_resource_metadata(self, mcp_server): 166 """Protected Resource Metadata should return valid JSON.""" 167 url = f"{mcp_server['mcp_url']}/.well-known/oauth-protected-resource" 168 with urllib.request.urlopen(url, timeout=5) as resp: 169 assert resp.status == 200 170 body = json.loads(resp.read()) 171 assert "authorization_servers" in body 172 assert "scopes_supported" in body 173 assert len(body["scopes_supported"]) > 0 174 175 def test_mcp_list_tools(self, mcp_server): 176 """Tool listing endpoint should return registered tools.""" 177 url = f"{mcp_server['mcp_url']}/mcp/tools" 178 with urllib.request.urlopen(url, timeout=5) as resp: 179 assert resp.status == 200 180 tools = json.loads(resp.read()) 181 assert isinstance(tools, list) 182 tool_names = [t["name"] for t in tools] 183 assert "read_file" in tool_names 184 assert "write_file" in tool_names 185 assert "deploy" in tool_names 186 187 def test_mcp_no_token(self, mcp_server): 188 """Tool call without Authorization header should return 401.""" 189 url = f"{mcp_server['mcp_url']}/mcp/tools/read_file" 190 data = json.dumps({"path": "/tmp/test.txt"}).encode() 191 req = urllib.request.Request( 192 url, 193 data=data, 194 headers={"Content-Type": "application/json"}, 195 method="POST", 196 ) 197 try: 198 urllib.request.urlopen(req, timeout=5) 199 pytest.fail("Expected 401 for missing token") 200 except urllib.error.HTTPError as e: 201 assert e.code == 401 202 203 def test_mcp_invalid_token(self, mcp_server): 204 """Tool call with garbage Bearer token should return 401.""" 205 url = f"{mcp_server['mcp_url']}/mcp/tools/read_file" 206 data = json.dumps({"path": "/tmp/test.txt"}).encode() 207 req = urllib.request.Request( 208 url, 209 data=data, 210 headers={ 211 "Content-Type": "application/json", 212 "Authorization": "Bearer not-a-real-jwt", 213 }, 214 method="POST", 215 ) 216 try: 217 urllib.request.urlopen(req, timeout=5) 218 pytest.fail("Expected 401 for invalid token") 219 except urllib.error.HTTPError as e: 220 assert e.code == 401 221 222 def test_mcp_authorized_read(self, mcp_server): 223 """Full flow: exchange attestation for token, call tool. 224 225 GAP: requires attestation chain creation; skipped until CLI 226 integration supports `auths agent provision`. 227 """ 228 pytest.skip( 229 "GAP: requires full attestation chain creation via CLI; " 230 "will be enabled when agent provisioning is implemented" 231 ) 232 233 def test_mcp_unauthorized_tool(self, mcp_server): 234 """Agent with fs:read should get 403 when calling deploy. 235 236 GAP: requires valid JWT with specific capabilities. 237 """ 238 pytest.skip( 239 "GAP: requires valid JWT with scoped capabilities; " 240 "will be enabled when token exchange E2E is complete" 241 ) 242 243 def test_mcp_expired_token(self, mcp_server): 244 """Expired JWT should return 401. 245 246 GAP: requires ability to create a JWT with past expiry. 247 """ 248 pytest.skip( 249 "GAP: requires token with past expiry for testing" 250 )