test_sandbox_manager_sync_business_logic.py
1 # 2 # Copyright 2025 Alibaba Group Holding Ltd. 3 # 4 # Licensed under the Apache License, Version 2.0 (the "License"); 5 # you may not use this file except in compliance with the License. 6 # You may obtain a copy of the License at 7 # 8 # http://www.apache.org/licenses/LICENSE-2.0 9 # 10 # Unless required by applicable law or agreed to in writing, software 11 # distributed under the License is distributed on an "AS IS" BASIS, 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 # See the License for the specific language governing permissions and 14 # limitations under the License. 15 # 16 from __future__ import annotations 17 18 from datetime import datetime, timedelta, timezone 19 from uuid import uuid4 20 21 import httpx 22 23 from opensandbox.config.connection_sync import ConnectionConfigSync 24 from opensandbox.sync.manager import SandboxManagerSync 25 26 27 class _SandboxServiceStub: 28 def __init__(self) -> None: 29 self.renew_calls: list[tuple[object, datetime]] = [] 30 31 def list_sandboxes(self, _filter): # pragma: no cover 32 raise RuntimeError("not used") 33 34 def get_sandbox_info(self, _sandbox_id): # pragma: no cover 35 raise RuntimeError("not used") 36 37 def kill_sandbox(self, _sandbox_id): # pragma: no cover 38 raise RuntimeError("not used") 39 40 def renew_sandbox_expiration(self, sandbox_id, new_expiration_time: datetime) -> None: 41 self.renew_calls.append((sandbox_id, new_expiration_time)) 42 43 def pause_sandbox(self, _sandbox_id) -> None: # pragma: no cover 44 raise RuntimeError("not used") 45 46 def resume_sandbox(self, _sandbox_id): # pragma: no cover 47 raise RuntimeError("not used") 48 49 50 def test_sync_manager_renew_uses_utc_datetime() -> None: 51 svc = _SandboxServiceStub() 52 mgr = SandboxManagerSync(svc, ConnectionConfigSync()) 53 54 sid = str(uuid4()) 55 mgr.renew_sandbox(sid, timedelta(seconds=5)) 56 57 assert len(svc.renew_calls) == 1 58 _, dt = svc.renew_calls[0] 59 assert dt.tzinfo is timezone.utc 60 61 62 def test_sync_manager_close_does_not_close_user_transport() -> None: 63 class CustomTransport(httpx.BaseTransport): 64 def __init__(self) -> None: 65 self.closed = False 66 67 def handle_request(self, request: httpx.Request) -> httpx.Response: # pragma: no cover 68 raise RuntimeError("not used") 69 70 def close(self) -> None: 71 self.closed = True 72 73 t = CustomTransport() 74 cfg = ConnectionConfigSync(transport=t) 75 76 mgr = SandboxManagerSync(_SandboxServiceStub(), cfg) 77 mgr.close() 78 assert t.closed is False