/ server / tests / test_proxy_renew_coordinator.py
test_proxy_renew_coordinator.py
  1  # Copyright 2026 Alibaba Group Holding Ltd.
  2  #
  3  # Licensed under the Apache License, Version 2.0 (the "License");
  4  # you may not use this file except in compliance with the License.
  5  # You may obtain a copy of the License at
  6  #
  7  #     http://www.apache.org/licenses/LICENSE-2.0
  8  #
  9  # Unless required by applicable law or agreed to in writing, software
 10  # distributed under the License is distributed on an "AS IS" BASIS,
 11  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 12  # See the License for the specific language governing permissions and
 13  # limitations under the License.
 14  
 15  import asyncio
 16  from unittest.mock import MagicMock
 17  
 18  import pytest
 19  
 20  from opensandbox_server.config import AppConfig, RenewIntentConfig, RuntimeConfig, ServerConfig
 21  from opensandbox_server.integrations.renew_intent.consumer import RenewIntentConsumer, RenewWorkItem
 22  from opensandbox_server.integrations.renew_intent.logutil import RENEW_SOURCE_SERVER_PROXY
 23  from opensandbox_server.integrations.renew_intent.proxy_renew import ProxyRenewCoordinator
 24  
 25  
 26  def _app_config(*, renew_enabled: bool = True, min_interval: int = 60) -> AppConfig:
 27      return AppConfig(
 28          server=ServerConfig(),
 29          renew_intent=RenewIntentConfig(
 30              enabled=renew_enabled,
 31              min_interval_seconds=min_interval,
 32          ),
 33          runtime=RuntimeConfig(type="docker", execd_image="opensandbox/execd:latest"),
 34      )
 35  
 36  
 37  def _consumer(cfg: AppConfig) -> RenewIntentConsumer:
 38      return RenewIntentConsumer(cfg, MagicMock(), MagicMock(), redis_client=None)
 39  
 40  
 41  @pytest.mark.asyncio
 42  async def test_proxy_schedule_noop_when_disabled(monkeypatch):
 43      cfg = _app_config(renew_enabled=False)
 44      coord = ProxyRenewCoordinator(cfg, _consumer(cfg))
 45      created: list[asyncio.Task[None]] = []
 46  
 47      def capture_task(coro, *, name=None):
 48          t = asyncio.get_event_loop().create_task(coro, name=name)
 49          created.append(t)
 50          return t
 51  
 52      monkeypatch.setattr(asyncio, "create_task", capture_task)
 53      coord.schedule("sbx-1")
 54      await asyncio.sleep(0)
 55      assert created == []
 56  
 57  
 58  @pytest.mark.asyncio
 59  async def test_proxy_schedule_noop_when_consumer_none():
 60      cfg = _app_config(renew_enabled=True)
 61      coord = ProxyRenewCoordinator(cfg, None)
 62      coord.schedule("sbx-1")
 63  
 64  
 65  @pytest.mark.asyncio
 66  async def test_proxy_min_interval_skips_second_attempt(monkeypatch):
 67      cfg = _app_config(renew_enabled=True, min_interval=60)
 68      consumer = _consumer(cfg)
 69      attempts = {"n": 0}
 70  
 71      def attempt(_sid: str, *, source: str) -> bool:
 72          attempts["n"] += 1
 73          return True
 74  
 75      consumer._controller.attempt_renew_sync = attempt  # type: ignore[method-assign]
 76  
 77      seq = iter([100.0, 100.0, 100.5])
 78  
 79      def mono():
 80          return next(seq, 999.0)
 81  
 82      monkeypatch.setattr(
 83          "opensandbox_server.integrations.renew_intent.consumer.time.monotonic",
 84          mono,
 85      )
 86  
 87      work = RenewWorkItem(
 88          source=RENEW_SOURCE_SERVER_PROXY,
 89          sandbox_id="sbx-1",
 90          observed_at=MagicMock(),
 91      )
 92      await consumer._process_work(work)
 93      await consumer._process_work(work)
 94      assert attempts["n"] == 1
 95  
 96  
 97  @pytest.mark.asyncio
 98  async def test_proxy_second_attempt_after_cooldown_window(monkeypatch):
 99      cfg = _app_config(renew_enabled=True, min_interval=60)
100      consumer = _consumer(cfg)
101      attempts = {"n": 0}
102  
103      def attempt(_sid: str, *, source: str) -> bool:
104          attempts["n"] += 1
105          return True
106  
107      consumer._controller.attempt_renew_sync = attempt  # type: ignore[method-assign]
108  
109      seq = iter([100.0, 100.0, 200.0, 200.0])
110  
111      def mono():
112          return next(seq, 999.0)
113  
114      monkeypatch.setattr(
115          "opensandbox_server.integrations.renew_intent.consumer.time.monotonic",
116          mono,
117      )
118  
119      work = RenewWorkItem(
120          source=RENEW_SOURCE_SERVER_PROXY,
121          sandbox_id="sbx-1",
122          observed_at=MagicMock(),
123      )
124      await consumer._process_work(work)
125      await consumer._process_work(work)
126      assert attempts["n"] == 2
127  
128  
129  def test_consumer_mem_lru_drops_oldest_unlocked_entries():
130      cfg = _app_config(renew_enabled=True)
131      consumer = _consumer(cfg)
132      consumer._max_tracked = 2
133  
134      consumer._ensure_mru_mem("a")
135      consumer._ensure_mru_mem("b")
136      assert set(consumer._mem_states) == {"a", "b"}
137  
138      consumer._ensure_mru_mem("c")
139      assert set(consumer._mem_states) == {"b", "c"}