test_pool_api.py
1 # Copyright 2025 Alibaba Group Holding Ltd. 2 # 3 # Licensed under the Apache License, Version 2.0 (the "License"); 4 # you may not use this file except in compliance with the License. 5 # You may obtain a copy of the License at 6 # 7 # http://www.apache.org/licenses/LICENSE-2.0 8 # 9 # Unless required by applicable law or agreed to in writing, software 10 # distributed under the License is distributed on an "AS IS" BASIS, 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 # See the License for the specific language governing permissions and 13 # limitations under the License. 14 15 """ 16 Integration-style tests for Pool API routes (opensandbox_server/api/pool.py). 17 18 Routes are exercised via FastAPI TestClient. The K8s PoolService is patched 19 so no real cluster connection is needed. 20 """ 21 22 from unittest.mock import MagicMock, patch 23 from fastapi.testclient import TestClient 24 from fastapi import HTTPException, status as http_status 25 26 from opensandbox_server.api.schema import ( 27 CreatePoolRequest, 28 ListPoolsResponse, 29 PoolCapacitySpec, 30 PoolResponse, 31 PoolStatus, 32 UpdatePoolRequest, 33 ) 34 from opensandbox_server.services.constants import SandboxErrorCodes 35 36 37 _POOL_SERVICE_PATCH = "opensandbox_server.api.pool._get_pool_service" 38 39 40 def _cap(buffer_max=3, buffer_min=1, pool_max=10, pool_min=0) -> PoolCapacitySpec: 41 return PoolCapacitySpec( 42 bufferMax=buffer_max, 43 bufferMin=buffer_min, 44 poolMax=pool_max, 45 poolMin=pool_min, 46 ) 47 48 49 def _pool_response( 50 name: str = "test-pool", 51 buffer_max: int = 3, 52 pool_max: int = 10, 53 total: int = 2, 54 allocated: int = 1, 55 available: int = 1, 56 ) -> PoolResponse: 57 return PoolResponse( 58 name=name, 59 capacitySpec=_cap(buffer_max=buffer_max, pool_max=pool_max), 60 status=PoolStatus( 61 total=total, 62 allocated=allocated, 63 available=available, 64 revision="rev-1", 65 ), 66 ) 67 68 69 def _create_request_body(name: str = "test-pool") -> dict: 70 return { 71 "name": name, 72 "template": { 73 "spec": { 74 "containers": [ 75 { 76 "name": "sandbox", 77 "image": "python:3.11", 78 "command": ["tail", "-f", "/dev/null"], 79 } 80 ] 81 } 82 }, 83 "capacitySpec": { 84 "bufferMax": 3, 85 "bufferMin": 1, 86 "poolMax": 10, 87 "poolMin": 0, 88 }, 89 } 90 91 92 class TestPoolAuthentication: 93 def test_list_pools_without_api_key_returns_401(self, client: TestClient): 94 response = client.get("/pools") 95 assert response.status_code == 401 96 97 def test_create_pool_without_api_key_returns_401(self, client: TestClient): 98 response = client.post("/pools", json=_create_request_body()) 99 assert response.status_code == 401 100 101 def test_get_pool_without_api_key_returns_401(self, client: TestClient): 102 response = client.get("/pools/my-pool") 103 assert response.status_code == 401 104 105 def test_update_pool_without_api_key_returns_401(self, client: TestClient): 106 response = client.put( 107 "/pools/my-pool", 108 json={"capacitySpec": {"bufferMax": 5, "bufferMin": 1, "poolMax": 10, "poolMin": 0}}, 109 ) 110 assert response.status_code == 401 111 112 def test_delete_pool_without_api_key_returns_401(self, client: TestClient): 113 response = client.delete("/pools/my-pool") 114 assert response.status_code == 401 115 116 def test_pool_routes_exist_on_v1_prefix(self, client: TestClient, auth_headers: dict): 117 """Verify the /v1/pools routes are registered (even if they return 501 on docker runtime).""" 118 with patch(_POOL_SERVICE_PATCH) as mock_svc_factory: 119 mock_svc_factory.side_effect = HTTPException( 120 status_code=http_status.HTTP_501_NOT_IMPLEMENTED, 121 detail={"code": "X", "message": "y"}, 122 ) 123 response = client.get("/v1/pools", headers=auth_headers) 124 assert response.status_code == 501 125 126 127 class TestPoolNotSupportedOnDockerRuntime: 128 """Pool endpoints return 501 when PoolService raises 501 (non-k8s runtime).""" 129 130 def _mock_not_supported(self): 131 svc = MagicMock() 132 svc.side_effect = HTTPException( 133 status_code=http_status.HTTP_501_NOT_IMPLEMENTED, 134 detail={ 135 "code": SandboxErrorCodes.K8S_POOL_NOT_SUPPORTED, 136 "message": "Pool management is only available when runtime.type is 'kubernetes'.", 137 }, 138 ) 139 return svc 140 141 def test_list_pools_returns_501(self, client: TestClient, auth_headers: dict): 142 with patch(_POOL_SERVICE_PATCH, side_effect=HTTPException( 143 status_code=501, 144 detail={"code": SandboxErrorCodes.K8S_POOL_NOT_SUPPORTED, "message": "not k8s"}, 145 )): 146 response = client.get("/pools", headers=auth_headers) 147 assert response.status_code == 501 148 assert SandboxErrorCodes.K8S_POOL_NOT_SUPPORTED in response.json()["code"] 149 150 def test_create_pool_returns_501(self, client: TestClient, auth_headers: dict): 151 with patch(_POOL_SERVICE_PATCH, side_effect=HTTPException( 152 status_code=501, 153 detail={"code": SandboxErrorCodes.K8S_POOL_NOT_SUPPORTED, "message": "not k8s"}, 154 )): 155 response = client.post("/pools", json=_create_request_body(), headers=auth_headers) 156 assert response.status_code == 501 157 158 159 class TestCreatePoolRoute: 160 def test_create_pool_success_returns_201(self, client: TestClient, auth_headers: dict): 161 mock_svc = MagicMock() 162 mock_svc.create_pool.return_value = _pool_response(name="new-pool") 163 164 with patch(_POOL_SERVICE_PATCH, return_value=mock_svc): 165 response = client.post("/pools", json=_create_request_body("new-pool"), headers=auth_headers) 166 167 assert response.status_code == 201 168 body = response.json() 169 assert body["name"] == "new-pool" 170 assert body["capacitySpec"]["bufferMax"] == 3 171 assert body["status"]["total"] == 2 172 173 def test_create_pool_missing_name_returns_422(self, client: TestClient, auth_headers: dict): 174 body = _create_request_body() 175 del body["name"] 176 with patch(_POOL_SERVICE_PATCH, return_value=MagicMock()): 177 response = client.post("/pools", json=body, headers=auth_headers) 178 assert response.status_code == 422 179 180 def test_create_pool_missing_template_returns_422(self, client: TestClient, auth_headers: dict): 181 body = _create_request_body() 182 del body["template"] 183 with patch(_POOL_SERVICE_PATCH, return_value=MagicMock()): 184 response = client.post("/pools", json=body, headers=auth_headers) 185 assert response.status_code == 422 186 187 def test_create_pool_missing_capacity_spec_returns_422(self, client: TestClient, auth_headers: dict): 188 body = _create_request_body() 189 del body["capacitySpec"] 190 with patch(_POOL_SERVICE_PATCH, return_value=MagicMock()): 191 response = client.post("/pools", json=body, headers=auth_headers) 192 assert response.status_code == 422 193 194 def test_create_pool_invalid_name_pattern_returns_422(self, client: TestClient, auth_headers: dict): 195 """Pool name must be a valid k8s name (no uppercase, no spaces).""" 196 body = _create_request_body("Invalid_Name") 197 with patch(_POOL_SERVICE_PATCH, return_value=MagicMock()): 198 response = client.post("/pools", json=body, headers=auth_headers) 199 assert response.status_code == 422 200 201 def test_create_pool_negative_buffer_max_returns_422(self, client: TestClient, auth_headers: dict): 202 body = _create_request_body() 203 body["capacitySpec"]["bufferMax"] = -1 204 with patch(_POOL_SERVICE_PATCH, return_value=MagicMock()): 205 response = client.post("/pools", json=body, headers=auth_headers) 206 assert response.status_code == 422 207 208 def test_create_pool_duplicate_returns_409(self, client: TestClient, auth_headers: dict): 209 mock_svc = MagicMock() 210 mock_svc.create_pool.side_effect = HTTPException( 211 status_code=409, 212 detail={ 213 "code": SandboxErrorCodes.K8S_POOL_ALREADY_EXISTS, 214 "message": "Pool 'dup-pool' already exists.", 215 }, 216 ) 217 with patch(_POOL_SERVICE_PATCH, return_value=mock_svc): 218 response = client.post("/pools", json=_create_request_body("dup-pool"), headers=auth_headers) 219 220 assert response.status_code == 409 221 assert SandboxErrorCodes.K8S_POOL_ALREADY_EXISTS in response.json()["code"] 222 223 def test_create_pool_service_error_returns_500(self, client: TestClient, auth_headers: dict): 224 mock_svc = MagicMock() 225 mock_svc.create_pool.side_effect = HTTPException( 226 status_code=500, 227 detail={ 228 "code": SandboxErrorCodes.K8S_POOL_API_ERROR, 229 "message": "k8s api error", 230 }, 231 ) 232 with patch(_POOL_SERVICE_PATCH, return_value=mock_svc): 233 response = client.post("/pools", json=_create_request_body(), headers=auth_headers) 234 235 assert response.status_code == 500 236 237 def test_create_pool_passes_request_to_service(self, client: TestClient, auth_headers: dict): 238 mock_svc = MagicMock() 239 mock_svc.create_pool.return_value = _pool_response() 240 241 with patch(_POOL_SERVICE_PATCH, return_value=mock_svc): 242 client.post("/pools", json=_create_request_body("my-pool"), headers=auth_headers) 243 244 call_args = mock_svc.create_pool.call_args 245 req: CreatePoolRequest = call_args.args[0] 246 assert req.name == "my-pool" 247 assert req.capacity_spec.buffer_max == 3 248 assert req.capacity_spec.pool_max == 10 249 250 251 class TestListPoolsRoute: 252 def test_list_pools_returns_200_and_items(self, client: TestClient, auth_headers: dict): 253 mock_svc = MagicMock() 254 mock_svc.list_pools.return_value = ListPoolsResponse( 255 items=[_pool_response("pool-a"), _pool_response("pool-b")] 256 ) 257 258 with patch(_POOL_SERVICE_PATCH, return_value=mock_svc): 259 response = client.get("/pools", headers=auth_headers) 260 261 assert response.status_code == 200 262 body = response.json() 263 assert len(body["items"]) == 2 264 names = {p["name"] for p in body["items"]} 265 assert names == {"pool-a", "pool-b"} 266 267 def test_list_pools_empty_returns_200_and_empty_list(self, client: TestClient, auth_headers: dict): 268 mock_svc = MagicMock() 269 mock_svc.list_pools.return_value = ListPoolsResponse(items=[]) 270 271 with patch(_POOL_SERVICE_PATCH, return_value=mock_svc): 272 response = client.get("/pools", headers=auth_headers) 273 274 assert response.status_code == 200 275 assert response.json()["items"] == [] 276 277 def test_list_pools_service_error_returns_500(self, client: TestClient, auth_headers: dict): 278 mock_svc = MagicMock() 279 mock_svc.list_pools.side_effect = HTTPException( 280 status_code=500, 281 detail={"code": SandboxErrorCodes.K8S_POOL_API_ERROR, "message": "err"}, 282 ) 283 with patch(_POOL_SERVICE_PATCH, return_value=mock_svc): 284 response = client.get("/pools", headers=auth_headers) 285 286 assert response.status_code == 500 287 288 def test_list_pools_response_has_status_fields(self, client: TestClient, auth_headers: dict): 289 mock_svc = MagicMock() 290 mock_svc.list_pools.return_value = ListPoolsResponse( 291 items=[_pool_response("p", total=5, allocated=3, available=2)] 292 ) 293 with patch(_POOL_SERVICE_PATCH, return_value=mock_svc): 294 response = client.get("/pools", headers=auth_headers) 295 296 pool = response.json()["items"][0] 297 assert pool["status"]["total"] == 5 298 assert pool["status"]["allocated"] == 3 299 assert pool["status"]["available"] == 2 300 301 302 class TestGetPoolRoute: 303 def test_get_pool_success_returns_200(self, client: TestClient, auth_headers: dict): 304 mock_svc = MagicMock() 305 mock_svc.get_pool.return_value = _pool_response(name="my-pool") 306 307 with patch(_POOL_SERVICE_PATCH, return_value=mock_svc): 308 response = client.get("/pools/my-pool", headers=auth_headers) 309 310 assert response.status_code == 200 311 assert response.json()["name"] == "my-pool" 312 313 def test_get_pool_calls_service_with_correct_name(self, client: TestClient, auth_headers: dict): 314 mock_svc = MagicMock() 315 mock_svc.get_pool.return_value = _pool_response() 316 317 with patch(_POOL_SERVICE_PATCH, return_value=mock_svc): 318 client.get("/pools/target-pool", headers=auth_headers) 319 320 mock_svc.get_pool.assert_called_once_with("target-pool") 321 322 def test_get_pool_not_found_returns_404(self, client: TestClient, auth_headers: dict): 323 mock_svc = MagicMock() 324 mock_svc.get_pool.side_effect = HTTPException( 325 status_code=404, 326 detail={ 327 "code": SandboxErrorCodes.K8S_POOL_NOT_FOUND, 328 "message": "Pool 'ghost' not found.", 329 }, 330 ) 331 with patch(_POOL_SERVICE_PATCH, return_value=mock_svc): 332 response = client.get("/pools/ghost", headers=auth_headers) 333 334 assert response.status_code == 404 335 assert SandboxErrorCodes.K8S_POOL_NOT_FOUND in response.json()["code"] 336 337 def test_get_pool_response_includes_capacity_spec(self, client: TestClient, auth_headers: dict): 338 mock_svc = MagicMock() 339 mock_svc.get_pool.return_value = _pool_response(buffer_max=7, pool_max=50) 340 341 with patch(_POOL_SERVICE_PATCH, return_value=mock_svc): 342 response = client.get("/pools/p", headers=auth_headers) 343 344 cap = response.json()["capacitySpec"] 345 assert cap["bufferMax"] == 7 346 assert cap["poolMax"] == 50 347 348 349 class TestUpdatePoolRoute: 350 def _update_body(self, buffer_max=5, pool_max=20) -> dict: 351 return { 352 "capacitySpec": { 353 "bufferMax": buffer_max, 354 "bufferMin": 1, 355 "poolMax": pool_max, 356 "poolMin": 0, 357 } 358 } 359 360 def test_update_pool_success_returns_200(self, client: TestClient, auth_headers: dict): 361 mock_svc = MagicMock() 362 mock_svc.update_pool.return_value = _pool_response(buffer_max=5, pool_max=20) 363 364 with patch(_POOL_SERVICE_PATCH, return_value=mock_svc): 365 response = client.put("/pools/my-pool", json=self._update_body(), headers=auth_headers) 366 367 assert response.status_code == 200 368 assert response.json()["capacitySpec"]["bufferMax"] == 5 369 370 def test_update_pool_calls_service_with_name_and_request( 371 self, client: TestClient, auth_headers: dict 372 ): 373 mock_svc = MagicMock() 374 mock_svc.update_pool.return_value = _pool_response() 375 376 with patch(_POOL_SERVICE_PATCH, return_value=mock_svc): 377 client.put("/pools/target", json=self._update_body(buffer_max=9), headers=auth_headers) 378 379 call_args = mock_svc.update_pool.call_args 380 assert call_args.args[0] == "target" 381 req: UpdatePoolRequest = call_args.args[1] 382 assert req.capacity_spec.buffer_max == 9 383 384 def test_update_pool_not_found_returns_404(self, client: TestClient, auth_headers: dict): 385 mock_svc = MagicMock() 386 mock_svc.update_pool.side_effect = HTTPException( 387 status_code=404, 388 detail={ 389 "code": SandboxErrorCodes.K8S_POOL_NOT_FOUND, 390 "message": "Pool 'x' not found.", 391 }, 392 ) 393 with patch(_POOL_SERVICE_PATCH, return_value=mock_svc): 394 response = client.put("/pools/x", json=self._update_body(), headers=auth_headers) 395 396 assert response.status_code == 404 397 398 def test_update_pool_missing_capacity_spec_returns_422( 399 self, client: TestClient, auth_headers: dict 400 ): 401 with patch(_POOL_SERVICE_PATCH, return_value=MagicMock()): 402 response = client.put("/pools/p", json={}, headers=auth_headers) 403 assert response.status_code == 422 404 405 def test_update_pool_negative_pool_max_returns_422( 406 self, client: TestClient, auth_headers: dict 407 ): 408 with patch(_POOL_SERVICE_PATCH, return_value=MagicMock()): 409 response = client.put( 410 "/pools/p", 411 json={"capacitySpec": {"bufferMax": 1, "bufferMin": 0, "poolMax": -5, "poolMin": 0}}, 412 headers=auth_headers, 413 ) 414 assert response.status_code == 422 415 416 417 class TestDeletePoolRoute: 418 def test_delete_pool_success_returns_204(self, client: TestClient, auth_headers: dict): 419 mock_svc = MagicMock() 420 mock_svc.delete_pool.return_value = None 421 422 with patch(_POOL_SERVICE_PATCH, return_value=mock_svc): 423 response = client.delete("/pools/my-pool", headers=auth_headers) 424 425 assert response.status_code == 204 426 assert response.content == b"" 427 428 def test_delete_pool_calls_service_with_correct_name( 429 self, client: TestClient, auth_headers: dict 430 ): 431 mock_svc = MagicMock() 432 mock_svc.delete_pool.return_value = None 433 434 with patch(_POOL_SERVICE_PATCH, return_value=mock_svc): 435 client.delete("/pools/to-remove", headers=auth_headers) 436 437 mock_svc.delete_pool.assert_called_once_with("to-remove") 438 439 def test_delete_pool_not_found_returns_404(self, client: TestClient, auth_headers: dict): 440 mock_svc = MagicMock() 441 mock_svc.delete_pool.side_effect = HTTPException( 442 status_code=404, 443 detail={ 444 "code": SandboxErrorCodes.K8S_POOL_NOT_FOUND, 445 "message": "Pool 'gone' not found.", 446 }, 447 ) 448 with patch(_POOL_SERVICE_PATCH, return_value=mock_svc): 449 response = client.delete("/pools/gone", headers=auth_headers) 450 451 assert response.status_code == 404 452 assert SandboxErrorCodes.K8S_POOL_NOT_FOUND in response.json()["code"] 453 454 def test_delete_pool_service_error_returns_500(self, client: TestClient, auth_headers: dict): 455 mock_svc = MagicMock() 456 mock_svc.delete_pool.side_effect = HTTPException( 457 status_code=500, 458 detail={"code": SandboxErrorCodes.K8S_POOL_API_ERROR, "message": "err"}, 459 ) 460 with patch(_POOL_SERVICE_PATCH, return_value=mock_svc): 461 response = client.delete("/pools/p", headers=auth_headers) 462 463 assert response.status_code == 500