/ tests / e2e / test_mcp_server.py
test_mcp_server.py
  1  """E2E tests for the MCP server authorization."""
  2  
  3  import json
  4  import os
  5  import shutil
  6  import socket
  7  import subprocess
  8  import time
  9  import urllib.error
 10  import urllib.request
 11  
 12  import pytest
 13  
 14  
 15  def _find_free_port() -> int:
 16      with socket.socket() as s:
 17          s.bind(("127.0.0.1", 0))
 18          return s.getsockname()[1]
 19  
 20  
 21  def _wait_for_port(port: int, timeout: float = 10.0) -> bool:
 22      deadline = time.monotonic() + timeout
 23      while time.monotonic() < deadline:
 24          try:
 25              with socket.create_connection(("127.0.0.1", port), timeout=1):
 26                  return True
 27          except OSError:
 28              time.sleep(0.2)
 29      return False
 30  
 31  
 32  def _find_binary(env_var: str, name: str):
 33      """Resolve a binary from env var, PATH, or target/debug."""
 34      from pathlib import Path
 35  
 36      if path := os.environ.get(env_var):
 37          p = Path(path)
 38          if p.exists():
 39              return p
 40  
 41      if found := shutil.which(name):
 42          return Path(found)
 43  
 44      workspace_root = Path(__file__).resolve().parent.parent.parent
 45      debug_path = workspace_root / "target" / "debug" / name
 46      if debug_path.exists():
 47          return debug_path
 48  
 49      return None
 50  
 51  
 52  @pytest.fixture(scope="module")
 53  def auths_mcp_server_bin():
 54      """Path to the `auths-mcp-server` binary."""
 55      path = _find_binary("AUTHS_MCP_SERVER_BIN", "auths-mcp-server")
 56      if path is None:
 57          pytest.skip("auths-mcp-server binary not found")
 58      return path
 59  
 60  
 61  @pytest.fixture(scope="module")
 62  def mcp_server(tmp_path_factory, auths_mcp_server_bin):
 63      """Spawn the MCP server for the test module.
 64  
 65      Requires an OIDC bridge to be running for JWKS validation.
 66      For now, the MCP server starts without a live bridge; tests that
 67      need token validation will need the bridge fixture too.
 68      """
 69      if not shutil.which("openssl"):
 70          pytest.skip("openssl CLI not found")
 71  
 72      # Find the OIDC bridge binary
 73      oidc_bin = _find_binary("AUTHS_OIDC_BRIDGE_BIN", "auths-oidc-bridge")
 74      if oidc_bin is None:
 75          pytest.skip("auths-oidc-bridge binary not found")
 76  
 77      work_dir = tmp_path_factory.mktemp("mcp")
 78      key_path = work_dir / "signing_key.pem"
 79  
 80      subprocess.run(
 81          ["openssl", "genrsa", "-out", str(key_path), "2048"],
 82          check=True,
 83          capture_output=True,
 84      )
 85  
 86      # Start the OIDC bridge
 87      bridge_port = _find_free_port()
 88      bridge_env = {
 89          "PATH": os.environ.get("PATH", "/usr/bin:/bin"),
 90          "AUTHS_OIDC_BIND_ADDR": f"127.0.0.1:{bridge_port}",
 91          "AUTHS_OIDC_SIGNING_KEY_PATH": str(key_path),
 92          "AUTHS_OIDC_ISSUER_URL": f"http://127.0.0.1:{bridge_port}",
 93          "AUTHS_OIDC_AUDIENCE": "auths-mcp-server",
 94          "RUST_LOG": "warn",
 95      }
 96  
 97      bridge_proc = subprocess.Popen(
 98          [str(oidc_bin)],
 99          env=bridge_env,
100          stdout=subprocess.PIPE,
101          stderr=subprocess.PIPE,
102      )
103  
104      if not _wait_for_port(bridge_port, timeout=15):
105          bridge_proc.terminate()
106          bridge_proc.wait(timeout=5)
107          pytest.skip("OIDC bridge failed to start")
108  
109      # Start the MCP server
110      mcp_port = _find_free_port()
111      mcp_env = {
112          "PATH": os.environ.get("PATH", "/usr/bin:/bin"),
113          "AUTHS_MCP_BIND_ADDR": f"127.0.0.1:{mcp_port}",
114          "AUTHS_MCP_JWKS_URL": f"http://127.0.0.1:{bridge_port}/.well-known/jwks.json",
115          "AUTHS_MCP_EXPECTED_ISSUER": f"http://127.0.0.1:{bridge_port}",
116          "AUTHS_MCP_EXPECTED_AUDIENCE": "auths-mcp-server",
117          "RUST_LOG": "warn",
118      }
119  
120      mcp_proc = subprocess.Popen(
121          [str(auths_mcp_server_bin)],
122          env=mcp_env,
123          stdout=subprocess.PIPE,
124          stderr=subprocess.PIPE,
125      )
126  
127      if not _wait_for_port(mcp_port, timeout=15):
128          mcp_proc.terminate()
129          mcp_proc.wait(timeout=5)
130          bridge_proc.terminate()
131          bridge_proc.wait(timeout=5)
132          pytest.skip("MCP server failed to start")
133  
134      yield {
135          "mcp_proc": mcp_proc,
136          "mcp_port": mcp_port,
137          "mcp_url": f"http://127.0.0.1:{mcp_port}",
138          "bridge_proc": bridge_proc,
139          "bridge_port": bridge_port,
140          "bridge_url": f"http://127.0.0.1:{bridge_port}",
141          "key_path": key_path,
142      }
143  
144      mcp_proc.terminate()
145      bridge_proc.terminate()
146      try:
147          mcp_proc.wait(timeout=5)
148          bridge_proc.wait(timeout=5)
149      except subprocess.TimeoutExpired:
150          mcp_proc.kill()
151          bridge_proc.kill()
152  
153  
154  @pytest.mark.slow
155  @pytest.mark.requires_binary
156  class TestMcpServer:
157      def test_mcp_health(self, mcp_server):
158          """Health endpoint should return 200 without authentication."""
159          url = f"{mcp_server['mcp_url']}/health"
160          with urllib.request.urlopen(url, timeout=5) as resp:
161              assert resp.status == 200
162              body = json.loads(resp.read())
163              assert body["status"] == "ok"
164  
165      def test_mcp_protected_resource_metadata(self, mcp_server):
166          """Protected Resource Metadata should return valid JSON."""
167          url = f"{mcp_server['mcp_url']}/.well-known/oauth-protected-resource"
168          with urllib.request.urlopen(url, timeout=5) as resp:
169              assert resp.status == 200
170              body = json.loads(resp.read())
171              assert "authorization_servers" in body
172              assert "scopes_supported" in body
173              assert len(body["scopes_supported"]) > 0
174  
175      def test_mcp_list_tools(self, mcp_server):
176          """Tool listing endpoint should return registered tools."""
177          url = f"{mcp_server['mcp_url']}/mcp/tools"
178          with urllib.request.urlopen(url, timeout=5) as resp:
179              assert resp.status == 200
180              tools = json.loads(resp.read())
181              assert isinstance(tools, list)
182              tool_names = [t["name"] for t in tools]
183              assert "read_file" in tool_names
184              assert "write_file" in tool_names
185              assert "deploy" in tool_names
186  
187      def test_mcp_no_token(self, mcp_server):
188          """Tool call without Authorization header should return 401."""
189          url = f"{mcp_server['mcp_url']}/mcp/tools/read_file"
190          data = json.dumps({"path": "/tmp/test.txt"}).encode()
191          req = urllib.request.Request(
192              url,
193              data=data,
194              headers={"Content-Type": "application/json"},
195              method="POST",
196          )
197          try:
198              urllib.request.urlopen(req, timeout=5)
199              pytest.fail("Expected 401 for missing token")
200          except urllib.error.HTTPError as e:
201              assert e.code == 401
202  
203      def test_mcp_invalid_token(self, mcp_server):
204          """Tool call with garbage Bearer token should return 401."""
205          url = f"{mcp_server['mcp_url']}/mcp/tools/read_file"
206          data = json.dumps({"path": "/tmp/test.txt"}).encode()
207          req = urllib.request.Request(
208              url,
209              data=data,
210              headers={
211                  "Content-Type": "application/json",
212                  "Authorization": "Bearer not-a-real-jwt",
213              },
214              method="POST",
215          )
216          try:
217              urllib.request.urlopen(req, timeout=5)
218              pytest.fail("Expected 401 for invalid token")
219          except urllib.error.HTTPError as e:
220              assert e.code == 401
221  
222      def test_mcp_authorized_read(self, mcp_server):
223          """Full flow: exchange attestation for token, call tool.
224  
225          GAP: requires attestation chain creation; skipped until CLI
226          integration supports `auths agent provision`.
227          """
228          pytest.skip(
229              "GAP: requires full attestation chain creation via CLI; "
230              "will be enabled when agent provisioning is implemented"
231          )
232  
233      def test_mcp_unauthorized_tool(self, mcp_server):
234          """Agent with fs:read should get 403 when calling deploy.
235  
236          GAP: requires valid JWT with specific capabilities.
237          """
238          pytest.skip(
239              "GAP: requires valid JWT with scoped capabilities; "
240              "will be enabled when token exchange E2E is complete"
241          )
242  
243      def test_mcp_expired_token(self, mcp_server):
244          """Expired JWT should return 401.
245  
246          GAP: requires ability to create a JWT with past expiry.
247          """
248          pytest.skip(
249              "GAP: requires token with past expiry for testing"
250          )