test_sandbox_manager_business_logic.py
1 # 2 # Copyright 2025 Alibaba Group Holding Ltd. 3 # 4 # Licensed under the Apache License, Version 2.0 (the "License"); 5 # you may not use this file except in compliance with the License. 6 # You may obtain a copy of the License at 7 # 8 # http://www.apache.org/licenses/LICENSE-2.0 9 # 10 # Unless required by applicable law or agreed to in writing, software 11 # distributed under the License is distributed on an "AS IS" BASIS, 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 # See the License for the specific language governing permissions and 14 # limitations under the License. 15 # 16 from __future__ import annotations 17 18 from datetime import datetime, timedelta, timezone 19 from uuid import uuid4 20 21 import httpx 22 import pytest 23 24 from opensandbox.config import ConnectionConfig 25 from opensandbox.manager import SandboxManager 26 27 28 class _SandboxServiceStub: 29 def __init__(self) -> None: 30 self.renew_calls: list[tuple[object, datetime]] = [] 31 self.pause_calls: list[object] = [] 32 33 async def list_sandboxes(self, _filter): # pragma: no cover 34 raise RuntimeError("not used") 35 36 async def get_sandbox_info(self, _sandbox_id): # pragma: no cover 37 raise RuntimeError("not used") 38 39 async def kill_sandbox(self, _sandbox_id): # pragma: no cover 40 raise RuntimeError("not used") 41 42 async def renew_sandbox_expiration(self, sandbox_id, new_expiration_time: datetime) -> None: 43 self.renew_calls.append((sandbox_id, new_expiration_time)) 44 45 async def pause_sandbox(self, sandbox_id) -> None: 46 self.pause_calls.append(sandbox_id) 47 48 async def resume_sandbox(self, _sandbox_id): # pragma: no cover 49 raise RuntimeError("not used") 50 51 52 @pytest.mark.asyncio 53 async def test_manager_renew_uses_utc_datetime() -> None: 54 svc = _SandboxServiceStub() 55 mgr = SandboxManager(svc, ConnectionConfig()) 56 57 sid = str(uuid4()) 58 await mgr.renew_sandbox(sid, timedelta(seconds=5)) 59 60 assert len(svc.renew_calls) == 1 61 _, dt = svc.renew_calls[0] 62 assert dt.tzinfo is timezone.utc 63 64 65 @pytest.mark.asyncio 66 async def test_manager_close_does_not_close_user_transport() -> None: 67 class CustomTransport(httpx.AsyncBaseTransport): 68 def __init__(self) -> None: 69 self.closed = False 70 71 async def handle_async_request(self, request: httpx.Request) -> httpx.Response: # pragma: no cover 72 raise RuntimeError("not used") 73 74 async def aclose(self) -> None: 75 self.closed = True 76 77 t = CustomTransport() 78 cfg = ConnectionConfig(transport=t) 79 80 mgr = SandboxManager(_SandboxServiceStub(), cfg) 81 await mgr.close() 82 assert t.closed is False