test_proxy_renew_coordinator.py
1 # Copyright 2026 Alibaba Group Holding Ltd. 2 # 3 # Licensed under the Apache License, Version 2.0 (the "License"); 4 # you may not use this file except in compliance with the License. 5 # You may obtain a copy of the License at 6 # 7 # http://www.apache.org/licenses/LICENSE-2.0 8 # 9 # Unless required by applicable law or agreed to in writing, software 10 # distributed under the License is distributed on an "AS IS" BASIS, 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 # See the License for the specific language governing permissions and 13 # limitations under the License. 14 15 import asyncio 16 from unittest.mock import MagicMock 17 18 import pytest 19 20 from opensandbox_server.config import AppConfig, RenewIntentConfig, RuntimeConfig, ServerConfig 21 from opensandbox_server.integrations.renew_intent.consumer import RenewIntentConsumer, RenewWorkItem 22 from opensandbox_server.integrations.renew_intent.logutil import RENEW_SOURCE_SERVER_PROXY 23 from opensandbox_server.integrations.renew_intent.proxy_renew import ProxyRenewCoordinator 24 25 26 def _app_config(*, renew_enabled: bool = True, min_interval: int = 60) -> AppConfig: 27 return AppConfig( 28 server=ServerConfig(), 29 renew_intent=RenewIntentConfig( 30 enabled=renew_enabled, 31 min_interval_seconds=min_interval, 32 ), 33 runtime=RuntimeConfig(type="docker", execd_image="opensandbox/execd:latest"), 34 ) 35 36 37 def _consumer(cfg: AppConfig) -> RenewIntentConsumer: 38 return RenewIntentConsumer(cfg, MagicMock(), MagicMock(), redis_client=None) 39 40 41 @pytest.mark.asyncio 42 async def test_proxy_schedule_noop_when_disabled(monkeypatch): 43 cfg = _app_config(renew_enabled=False) 44 coord = ProxyRenewCoordinator(cfg, _consumer(cfg)) 45 created: list[asyncio.Task[None]] = [] 46 47 def capture_task(coro, *, name=None): 48 t = asyncio.get_event_loop().create_task(coro, name=name) 49 created.append(t) 50 return t 51 52 monkeypatch.setattr(asyncio, "create_task", capture_task) 53 coord.schedule("sbx-1") 54 await asyncio.sleep(0) 55 assert created == [] 56 57 58 @pytest.mark.asyncio 59 async def test_proxy_schedule_noop_when_consumer_none(): 60 cfg = _app_config(renew_enabled=True) 61 coord = ProxyRenewCoordinator(cfg, None) 62 coord.schedule("sbx-1") 63 64 65 @pytest.mark.asyncio 66 async def test_proxy_min_interval_skips_second_attempt(monkeypatch): 67 cfg = _app_config(renew_enabled=True, min_interval=60) 68 consumer = _consumer(cfg) 69 attempts = {"n": 0} 70 71 def attempt(_sid: str, *, source: str) -> bool: 72 attempts["n"] += 1 73 return True 74 75 consumer._controller.attempt_renew_sync = attempt # type: ignore[method-assign] 76 77 seq = iter([100.0, 100.0, 100.5]) 78 79 def mono(): 80 return next(seq, 999.0) 81 82 monkeypatch.setattr( 83 "opensandbox_server.integrations.renew_intent.consumer.time.monotonic", 84 mono, 85 ) 86 87 work = RenewWorkItem( 88 source=RENEW_SOURCE_SERVER_PROXY, 89 sandbox_id="sbx-1", 90 observed_at=MagicMock(), 91 ) 92 await consumer._process_work(work) 93 await consumer._process_work(work) 94 assert attempts["n"] == 1 95 96 97 @pytest.mark.asyncio 98 async def test_proxy_second_attempt_after_cooldown_window(monkeypatch): 99 cfg = _app_config(renew_enabled=True, min_interval=60) 100 consumer = _consumer(cfg) 101 attempts = {"n": 0} 102 103 def attempt(_sid: str, *, source: str) -> bool: 104 attempts["n"] += 1 105 return True 106 107 consumer._controller.attempt_renew_sync = attempt # type: ignore[method-assign] 108 109 seq = iter([100.0, 100.0, 200.0, 200.0]) 110 111 def mono(): 112 return next(seq, 999.0) 113 114 monkeypatch.setattr( 115 "opensandbox_server.integrations.renew_intent.consumer.time.monotonic", 116 mono, 117 ) 118 119 work = RenewWorkItem( 120 source=RENEW_SOURCE_SERVER_PROXY, 121 sandbox_id="sbx-1", 122 observed_at=MagicMock(), 123 ) 124 await consumer._process_work(work) 125 await consumer._process_work(work) 126 assert attempts["n"] == 2 127 128 129 def test_consumer_mem_lru_drops_oldest_unlocked_entries(): 130 cfg = _app_config(renew_enabled=True) 131 consumer = _consumer(cfg) 132 consumer._max_tracked = 2 133 134 consumer._ensure_mru_mem("a") 135 consumer._ensure_mru_mem("b") 136 assert set(consumer._mem_states) == {"a", "b"} 137 138 consumer._ensure_mru_mem("c") 139 assert set(consumer._mem_states) == {"b", "c"}