/ server / tests / test_pool_api.py
test_pool_api.py
  1  # Copyright 2025 Alibaba Group Holding Ltd.
  2  #
  3  # Licensed under the Apache License, Version 2.0 (the "License");
  4  # you may not use this file except in compliance with the License.
  5  # You may obtain a copy of the License at
  6  #
  7  #     http://www.apache.org/licenses/LICENSE-2.0
  8  #
  9  # Unless required by applicable law or agreed to in writing, software
 10  # distributed under the License is distributed on an "AS IS" BASIS,
 11  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 12  # See the License for the specific language governing permissions and
 13  # limitations under the License.
 14  
 15  """
 16  Integration-style tests for Pool API routes (opensandbox_server/api/pool.py).
 17  
 18  Routes are exercised via FastAPI TestClient.  The K8s PoolService is patched
 19  so no real cluster connection is needed.
 20  """
 21  
 22  from unittest.mock import MagicMock, patch
 23  from fastapi.testclient import TestClient
 24  from fastapi import HTTPException, status as http_status
 25  
 26  from opensandbox_server.api.schema import (
 27      CreatePoolRequest,
 28      ListPoolsResponse,
 29      PoolCapacitySpec,
 30      PoolResponse,
 31      PoolStatus,
 32      UpdatePoolRequest,
 33  )
 34  from opensandbox_server.services.constants import SandboxErrorCodes
 35  
 36  
 37  _POOL_SERVICE_PATCH = "opensandbox_server.api.pool._get_pool_service"
 38  
 39  
 40  def _cap(buffer_max=3, buffer_min=1, pool_max=10, pool_min=0) -> PoolCapacitySpec:
 41      return PoolCapacitySpec(
 42          bufferMax=buffer_max,
 43          bufferMin=buffer_min,
 44          poolMax=pool_max,
 45          poolMin=pool_min,
 46      )
 47  
 48  
 49  def _pool_response(
 50      name: str = "test-pool",
 51      buffer_max: int = 3,
 52      pool_max: int = 10,
 53      total: int = 2,
 54      allocated: int = 1,
 55      available: int = 1,
 56  ) -> PoolResponse:
 57      return PoolResponse(
 58          name=name,
 59          capacitySpec=_cap(buffer_max=buffer_max, pool_max=pool_max),
 60          status=PoolStatus(
 61              total=total,
 62              allocated=allocated,
 63              available=available,
 64              revision="rev-1",
 65          ),
 66      )
 67  
 68  
 69  def _create_request_body(name: str = "test-pool") -> dict:
 70      return {
 71          "name": name,
 72          "template": {
 73              "spec": {
 74                  "containers": [
 75                      {
 76                          "name": "sandbox",
 77                          "image": "python:3.11",
 78                          "command": ["tail", "-f", "/dev/null"],
 79                      }
 80                  ]
 81              }
 82          },
 83          "capacitySpec": {
 84              "bufferMax": 3,
 85              "bufferMin": 1,
 86              "poolMax": 10,
 87              "poolMin": 0,
 88          },
 89      }
 90  
 91  
 92  class TestPoolAuthentication:
 93      def test_list_pools_without_api_key_returns_401(self, client: TestClient):
 94          response = client.get("/pools")
 95          assert response.status_code == 401
 96  
 97      def test_create_pool_without_api_key_returns_401(self, client: TestClient):
 98          response = client.post("/pools", json=_create_request_body())
 99          assert response.status_code == 401
100  
101      def test_get_pool_without_api_key_returns_401(self, client: TestClient):
102          response = client.get("/pools/my-pool")
103          assert response.status_code == 401
104  
105      def test_update_pool_without_api_key_returns_401(self, client: TestClient):
106          response = client.put(
107              "/pools/my-pool",
108              json={"capacitySpec": {"bufferMax": 5, "bufferMin": 1, "poolMax": 10, "poolMin": 0}},
109          )
110          assert response.status_code == 401
111  
112      def test_delete_pool_without_api_key_returns_401(self, client: TestClient):
113          response = client.delete("/pools/my-pool")
114          assert response.status_code == 401
115  
116      def test_pool_routes_exist_on_v1_prefix(self, client: TestClient, auth_headers: dict):
117          """Verify the /v1/pools routes are registered (even if they return 501 on docker runtime)."""
118          with patch(_POOL_SERVICE_PATCH) as mock_svc_factory:
119              mock_svc_factory.side_effect = HTTPException(
120                  status_code=http_status.HTTP_501_NOT_IMPLEMENTED,
121                  detail={"code": "X", "message": "y"},
122              )
123              response = client.get("/v1/pools", headers=auth_headers)
124          assert response.status_code == 501
125  
126  
127  class TestPoolNotSupportedOnDockerRuntime:
128      """Pool endpoints return 501 when PoolService raises 501 (non-k8s runtime)."""
129  
130      def _mock_not_supported(self):
131          svc = MagicMock()
132          svc.side_effect = HTTPException(
133              status_code=http_status.HTTP_501_NOT_IMPLEMENTED,
134              detail={
135                  "code": SandboxErrorCodes.K8S_POOL_NOT_SUPPORTED,
136                  "message": "Pool management is only available when runtime.type is 'kubernetes'.",
137              },
138          )
139          return svc
140  
141      def test_list_pools_returns_501(self, client: TestClient, auth_headers: dict):
142          with patch(_POOL_SERVICE_PATCH, side_effect=HTTPException(
143              status_code=501,
144              detail={"code": SandboxErrorCodes.K8S_POOL_NOT_SUPPORTED, "message": "not k8s"},
145          )):
146              response = client.get("/pools", headers=auth_headers)
147          assert response.status_code == 501
148          assert SandboxErrorCodes.K8S_POOL_NOT_SUPPORTED in response.json()["code"]
149  
150      def test_create_pool_returns_501(self, client: TestClient, auth_headers: dict):
151          with patch(_POOL_SERVICE_PATCH, side_effect=HTTPException(
152              status_code=501,
153              detail={"code": SandboxErrorCodes.K8S_POOL_NOT_SUPPORTED, "message": "not k8s"},
154          )):
155              response = client.post("/pools", json=_create_request_body(), headers=auth_headers)
156          assert response.status_code == 501
157  
158  
159  class TestCreatePoolRoute:
160      def test_create_pool_success_returns_201(self, client: TestClient, auth_headers: dict):
161          mock_svc = MagicMock()
162          mock_svc.create_pool.return_value = _pool_response(name="new-pool")
163  
164          with patch(_POOL_SERVICE_PATCH, return_value=mock_svc):
165              response = client.post("/pools", json=_create_request_body("new-pool"), headers=auth_headers)
166  
167          assert response.status_code == 201
168          body = response.json()
169          assert body["name"] == "new-pool"
170          assert body["capacitySpec"]["bufferMax"] == 3
171          assert body["status"]["total"] == 2
172  
173      def test_create_pool_missing_name_returns_422(self, client: TestClient, auth_headers: dict):
174          body = _create_request_body()
175          del body["name"]
176          with patch(_POOL_SERVICE_PATCH, return_value=MagicMock()):
177              response = client.post("/pools", json=body, headers=auth_headers)
178          assert response.status_code == 422
179  
180      def test_create_pool_missing_template_returns_422(self, client: TestClient, auth_headers: dict):
181          body = _create_request_body()
182          del body["template"]
183          with patch(_POOL_SERVICE_PATCH, return_value=MagicMock()):
184              response = client.post("/pools", json=body, headers=auth_headers)
185          assert response.status_code == 422
186  
187      def test_create_pool_missing_capacity_spec_returns_422(self, client: TestClient, auth_headers: dict):
188          body = _create_request_body()
189          del body["capacitySpec"]
190          with patch(_POOL_SERVICE_PATCH, return_value=MagicMock()):
191              response = client.post("/pools", json=body, headers=auth_headers)
192          assert response.status_code == 422
193  
194      def test_create_pool_invalid_name_pattern_returns_422(self, client: TestClient, auth_headers: dict):
195          """Pool name must be a valid k8s name (no uppercase, no spaces)."""
196          body = _create_request_body("Invalid_Name")
197          with patch(_POOL_SERVICE_PATCH, return_value=MagicMock()):
198              response = client.post("/pools", json=body, headers=auth_headers)
199          assert response.status_code == 422
200  
201      def test_create_pool_negative_buffer_max_returns_422(self, client: TestClient, auth_headers: dict):
202          body = _create_request_body()
203          body["capacitySpec"]["bufferMax"] = -1
204          with patch(_POOL_SERVICE_PATCH, return_value=MagicMock()):
205              response = client.post("/pools", json=body, headers=auth_headers)
206          assert response.status_code == 422
207  
208      def test_create_pool_duplicate_returns_409(self, client: TestClient, auth_headers: dict):
209          mock_svc = MagicMock()
210          mock_svc.create_pool.side_effect = HTTPException(
211              status_code=409,
212              detail={
213                  "code": SandboxErrorCodes.K8S_POOL_ALREADY_EXISTS,
214                  "message": "Pool 'dup-pool' already exists.",
215              },
216          )
217          with patch(_POOL_SERVICE_PATCH, return_value=mock_svc):
218              response = client.post("/pools", json=_create_request_body("dup-pool"), headers=auth_headers)
219  
220          assert response.status_code == 409
221          assert SandboxErrorCodes.K8S_POOL_ALREADY_EXISTS in response.json()["code"]
222  
223      def test_create_pool_service_error_returns_500(self, client: TestClient, auth_headers: dict):
224          mock_svc = MagicMock()
225          mock_svc.create_pool.side_effect = HTTPException(
226              status_code=500,
227              detail={
228                  "code": SandboxErrorCodes.K8S_POOL_API_ERROR,
229                  "message": "k8s api error",
230              },
231          )
232          with patch(_POOL_SERVICE_PATCH, return_value=mock_svc):
233              response = client.post("/pools", json=_create_request_body(), headers=auth_headers)
234  
235          assert response.status_code == 500
236  
237      def test_create_pool_passes_request_to_service(self, client: TestClient, auth_headers: dict):
238          mock_svc = MagicMock()
239          mock_svc.create_pool.return_value = _pool_response()
240  
241          with patch(_POOL_SERVICE_PATCH, return_value=mock_svc):
242              client.post("/pools", json=_create_request_body("my-pool"), headers=auth_headers)
243  
244          call_args = mock_svc.create_pool.call_args
245          req: CreatePoolRequest = call_args.args[0]
246          assert req.name == "my-pool"
247          assert req.capacity_spec.buffer_max == 3
248          assert req.capacity_spec.pool_max == 10
249  
250  
251  class TestListPoolsRoute:
252      def test_list_pools_returns_200_and_items(self, client: TestClient, auth_headers: dict):
253          mock_svc = MagicMock()
254          mock_svc.list_pools.return_value = ListPoolsResponse(
255              items=[_pool_response("pool-a"), _pool_response("pool-b")]
256          )
257  
258          with patch(_POOL_SERVICE_PATCH, return_value=mock_svc):
259              response = client.get("/pools", headers=auth_headers)
260  
261          assert response.status_code == 200
262          body = response.json()
263          assert len(body["items"]) == 2
264          names = {p["name"] for p in body["items"]}
265          assert names == {"pool-a", "pool-b"}
266  
267      def test_list_pools_empty_returns_200_and_empty_list(self, client: TestClient, auth_headers: dict):
268          mock_svc = MagicMock()
269          mock_svc.list_pools.return_value = ListPoolsResponse(items=[])
270  
271          with patch(_POOL_SERVICE_PATCH, return_value=mock_svc):
272              response = client.get("/pools", headers=auth_headers)
273  
274          assert response.status_code == 200
275          assert response.json()["items"] == []
276  
277      def test_list_pools_service_error_returns_500(self, client: TestClient, auth_headers: dict):
278          mock_svc = MagicMock()
279          mock_svc.list_pools.side_effect = HTTPException(
280              status_code=500,
281              detail={"code": SandboxErrorCodes.K8S_POOL_API_ERROR, "message": "err"},
282          )
283          with patch(_POOL_SERVICE_PATCH, return_value=mock_svc):
284              response = client.get("/pools", headers=auth_headers)
285  
286          assert response.status_code == 500
287  
288      def test_list_pools_response_has_status_fields(self, client: TestClient, auth_headers: dict):
289          mock_svc = MagicMock()
290          mock_svc.list_pools.return_value = ListPoolsResponse(
291              items=[_pool_response("p", total=5, allocated=3, available=2)]
292          )
293          with patch(_POOL_SERVICE_PATCH, return_value=mock_svc):
294              response = client.get("/pools", headers=auth_headers)
295  
296          pool = response.json()["items"][0]
297          assert pool["status"]["total"] == 5
298          assert pool["status"]["allocated"] == 3
299          assert pool["status"]["available"] == 2
300  
301  
302  class TestGetPoolRoute:
303      def test_get_pool_success_returns_200(self, client: TestClient, auth_headers: dict):
304          mock_svc = MagicMock()
305          mock_svc.get_pool.return_value = _pool_response(name="my-pool")
306  
307          with patch(_POOL_SERVICE_PATCH, return_value=mock_svc):
308              response = client.get("/pools/my-pool", headers=auth_headers)
309  
310          assert response.status_code == 200
311          assert response.json()["name"] == "my-pool"
312  
313      def test_get_pool_calls_service_with_correct_name(self, client: TestClient, auth_headers: dict):
314          mock_svc = MagicMock()
315          mock_svc.get_pool.return_value = _pool_response()
316  
317          with patch(_POOL_SERVICE_PATCH, return_value=mock_svc):
318              client.get("/pools/target-pool", headers=auth_headers)
319  
320          mock_svc.get_pool.assert_called_once_with("target-pool")
321  
322      def test_get_pool_not_found_returns_404(self, client: TestClient, auth_headers: dict):
323          mock_svc = MagicMock()
324          mock_svc.get_pool.side_effect = HTTPException(
325              status_code=404,
326              detail={
327                  "code": SandboxErrorCodes.K8S_POOL_NOT_FOUND,
328                  "message": "Pool 'ghost' not found.",
329              },
330          )
331          with patch(_POOL_SERVICE_PATCH, return_value=mock_svc):
332              response = client.get("/pools/ghost", headers=auth_headers)
333  
334          assert response.status_code == 404
335          assert SandboxErrorCodes.K8S_POOL_NOT_FOUND in response.json()["code"]
336  
337      def test_get_pool_response_includes_capacity_spec(self, client: TestClient, auth_headers: dict):
338          mock_svc = MagicMock()
339          mock_svc.get_pool.return_value = _pool_response(buffer_max=7, pool_max=50)
340  
341          with patch(_POOL_SERVICE_PATCH, return_value=mock_svc):
342              response = client.get("/pools/p", headers=auth_headers)
343  
344          cap = response.json()["capacitySpec"]
345          assert cap["bufferMax"] == 7
346          assert cap["poolMax"] == 50
347  
348  
349  class TestUpdatePoolRoute:
350      def _update_body(self, buffer_max=5, pool_max=20) -> dict:
351          return {
352              "capacitySpec": {
353                  "bufferMax": buffer_max,
354                  "bufferMin": 1,
355                  "poolMax": pool_max,
356                  "poolMin": 0,
357              }
358          }
359  
360      def test_update_pool_success_returns_200(self, client: TestClient, auth_headers: dict):
361          mock_svc = MagicMock()
362          mock_svc.update_pool.return_value = _pool_response(buffer_max=5, pool_max=20)
363  
364          with patch(_POOL_SERVICE_PATCH, return_value=mock_svc):
365              response = client.put("/pools/my-pool", json=self._update_body(), headers=auth_headers)
366  
367          assert response.status_code == 200
368          assert response.json()["capacitySpec"]["bufferMax"] == 5
369  
370      def test_update_pool_calls_service_with_name_and_request(
371          self, client: TestClient, auth_headers: dict
372      ):
373          mock_svc = MagicMock()
374          mock_svc.update_pool.return_value = _pool_response()
375  
376          with patch(_POOL_SERVICE_PATCH, return_value=mock_svc):
377              client.put("/pools/target", json=self._update_body(buffer_max=9), headers=auth_headers)
378  
379          call_args = mock_svc.update_pool.call_args
380          assert call_args.args[0] == "target"
381          req: UpdatePoolRequest = call_args.args[1]
382          assert req.capacity_spec.buffer_max == 9
383  
384      def test_update_pool_not_found_returns_404(self, client: TestClient, auth_headers: dict):
385          mock_svc = MagicMock()
386          mock_svc.update_pool.side_effect = HTTPException(
387              status_code=404,
388              detail={
389                  "code": SandboxErrorCodes.K8S_POOL_NOT_FOUND,
390                  "message": "Pool 'x' not found.",
391              },
392          )
393          with patch(_POOL_SERVICE_PATCH, return_value=mock_svc):
394              response = client.put("/pools/x", json=self._update_body(), headers=auth_headers)
395  
396          assert response.status_code == 404
397  
398      def test_update_pool_missing_capacity_spec_returns_422(
399          self, client: TestClient, auth_headers: dict
400      ):
401          with patch(_POOL_SERVICE_PATCH, return_value=MagicMock()):
402              response = client.put("/pools/p", json={}, headers=auth_headers)
403          assert response.status_code == 422
404  
405      def test_update_pool_negative_pool_max_returns_422(
406          self, client: TestClient, auth_headers: dict
407      ):
408          with patch(_POOL_SERVICE_PATCH, return_value=MagicMock()):
409              response = client.put(
410                  "/pools/p",
411                  json={"capacitySpec": {"bufferMax": 1, "bufferMin": 0, "poolMax": -5, "poolMin": 0}},
412                  headers=auth_headers,
413              )
414          assert response.status_code == 422
415  
416  
417  class TestDeletePoolRoute:
418      def test_delete_pool_success_returns_204(self, client: TestClient, auth_headers: dict):
419          mock_svc = MagicMock()
420          mock_svc.delete_pool.return_value = None
421  
422          with patch(_POOL_SERVICE_PATCH, return_value=mock_svc):
423              response = client.delete("/pools/my-pool", headers=auth_headers)
424  
425          assert response.status_code == 204
426          assert response.content == b""
427  
428      def test_delete_pool_calls_service_with_correct_name(
429          self, client: TestClient, auth_headers: dict
430      ):
431          mock_svc = MagicMock()
432          mock_svc.delete_pool.return_value = None
433  
434          with patch(_POOL_SERVICE_PATCH, return_value=mock_svc):
435              client.delete("/pools/to-remove", headers=auth_headers)
436  
437          mock_svc.delete_pool.assert_called_once_with("to-remove")
438  
439      def test_delete_pool_not_found_returns_404(self, client: TestClient, auth_headers: dict):
440          mock_svc = MagicMock()
441          mock_svc.delete_pool.side_effect = HTTPException(
442              status_code=404,
443              detail={
444                  "code": SandboxErrorCodes.K8S_POOL_NOT_FOUND,
445                  "message": "Pool 'gone' not found.",
446              },
447          )
448          with patch(_POOL_SERVICE_PATCH, return_value=mock_svc):
449              response = client.delete("/pools/gone", headers=auth_headers)
450  
451          assert response.status_code == 404
452          assert SandboxErrorCodes.K8S_POOL_NOT_FOUND in response.json()["code"]
453  
454      def test_delete_pool_service_error_returns_500(self, client: TestClient, auth_headers: dict):
455          mock_svc = MagicMock()
456          mock_svc.delete_pool.side_effect = HTTPException(
457              status_code=500,
458              detail={"code": SandboxErrorCodes.K8S_POOL_API_ERROR, "message": "err"},
459          )
460          with patch(_POOL_SERVICE_PATCH, return_value=mock_svc):
461              response = client.delete("/pools/p", headers=auth_headers)
462  
463          assert response.status_code == 500