consumer.py
1 # Copyright 2026 Alibaba Group Holding Ltd. 2 # 3 # Licensed under the Apache License, Version 2.0 (the "License"); 4 # you may not use this file except in compliance with the License. 5 # You may obtain a copy of the License at 6 # 7 # http://www.apache.org/licenses/LICENSE-2.0 8 # 9 # Unless required by applicable law or agreed to in writing, software 10 # distributed under the License is distributed on an "AS IS" BASIS, 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 # See the License for the specific language governing permissions and 13 # limitations under the License. 14 15 """Single renew-intent pipeline: Redis BRPOP feeders + proxy submits → one asyncio queue → processors.""" 16 17 from __future__ import annotations 18 19 import asyncio 20 import logging 21 import time 22 from collections import OrderedDict 23 from dataclasses import dataclass 24 from datetime import datetime, timezone 25 from functools import partial 26 from typing import TYPE_CHECKING, Optional 27 28 from redis.exceptions import RedisError 29 30 from opensandbox_server.config import AppConfig 31 from opensandbox_server.integrations.renew_intent.constants import ( 32 BRPOP_TIMEOUT_SECONDS, 33 INTENT_MAX_AGE_SECONDS, 34 PROXY_RENEW_MAX_TRACKED_SANDBOXES, 35 ) 36 from opensandbox_server.integrations.renew_intent.controller import AccessRenewController 37 from opensandbox_server.integrations.renew_intent.intent import parse_renew_intent_json 38 from opensandbox_server.integrations.renew_intent.logutil import ( 39 RENEW_EVENT_WORKERS_NOT_STARTED, 40 RENEW_EVENT_WORKERS_STARTED, 41 RENEW_SOURCE_REDIS_QUEUE, 42 RENEW_SOURCE_SERVER_PROXY, 43 renew_bundle, 44 ) 45 from opensandbox_server.integrations.renew_intent.redis_client import connect_renew_intent_redis_from_config 46 from opensandbox_server.services.extension_service import ExtensionService, require_extension_service 47 from opensandbox_server.services.factory import create_sandbox_service 48 from opensandbox_server.services.sandbox_service import SandboxService 49 50 if TYPE_CHECKING: 51 from redis.asyncio import Redis 52 53 logger = logging.getLogger(__name__) 54 55 56 @dataclass(frozen=True) 57 class RenewWorkItem: 58 """One unit of work for the shared renew pipeline.""" 59 60 source: str 61 sandbox_id: str 62 observed_at: datetime 63 64 65 @dataclass 66 class _MemSandboxState: 67 lock: asyncio.Lock 68 last_success_monotonic: float | None = None 69 70 71 class RenewIntentConsumer: 72 """ 73 Feeds renew work from Redis BRPOP (optional) and server-proxy ``schedule`` into one queue. 74 Per-sandbox ``asyncio.Lock`` serializes work; without Redis, ``min_interval`` throttles proxy 75 renews (ingress throttling is producer-side). 76 """ 77 78 def __init__( 79 self, 80 app_config: AppConfig, 81 sandbox_service: SandboxService, 82 extension_service: ExtensionService, 83 redis_client: Optional["Redis"], 84 ) -> None: 85 self._app_config = app_config 86 self._redis = redis_client 87 ri = app_config.renew_intent 88 self._queue_key = ri.redis.queue_key 89 self._feeder_count = ri.redis.consumer_concurrency if redis_client else 0 90 self._processor_count = max(1, ri.redis.consumer_concurrency) 91 self._min_interval = float(ri.min_interval_seconds) 92 self._controller = AccessRenewController(sandbox_service, extension_service) 93 self._work_queue: asyncio.Queue[RenewWorkItem] = asyncio.Queue() 94 self._stop = asyncio.Event() 95 self._tasks: list[asyncio.Task[None]] = [] 96 self._mem_states: OrderedDict[str, _MemSandboxState] = OrderedDict() 97 self._max_tracked = PROXY_RENEW_MAX_TRACKED_SANDBOXES 98 99 @classmethod 100 async def start( 101 cls, 102 app_config: AppConfig, 103 sandbox_service: SandboxService, 104 extension_service: ExtensionService, 105 ) -> Optional["RenewIntentConsumer"]: 106 if not app_config.renew_intent.enabled: 107 return None 108 109 redis_client: Optional["Redis"] = None 110 if app_config.renew_intent.redis.enabled: 111 try: 112 redis_client = await connect_renew_intent_redis_from_config(app_config) 113 except (RedisError, OSError, TimeoutError) as exc: 114 line, ex = renew_bundle( 115 event=RENEW_EVENT_WORKERS_NOT_STARTED, 116 source=RENEW_SOURCE_REDIS_QUEUE, 117 skip_reason="redis_connect_failed", 118 error_type=type(exc).__name__, 119 ) 120 logger.error(f"renew_intent {line} error={exc!s}", extra=ex) 121 redis_client = None 122 if redis_client is None and app_config.renew_intent.redis.enabled: 123 line, ex = renew_bundle( 124 event=RENEW_EVENT_WORKERS_NOT_STARTED, 125 source=RENEW_SOURCE_REDIS_QUEUE, 126 skip_reason="redis_client_none", 127 ) 128 logger.warning( 129 f"renew_intent {line}; continuing with proxy-only renew pipeline", 130 extra=ex, 131 ) 132 133 consumer = cls(app_config, sandbox_service, extension_service, redis_client) 134 consumer._spawn_tasks() 135 if redis_client is not None: 136 line, ex = renew_bundle( 137 event=RENEW_EVENT_WORKERS_STARTED, 138 source=RENEW_SOURCE_REDIS_QUEUE, 139 worker_count=consumer._feeder_count + consumer._processor_count, 140 queue_key=consumer._queue_key, 141 ) 142 logger.info( 143 f"🧪 [EXPERIMENTAL] renew_intent is enabled: Redis BRPOP feeders + " 144 f"unified processors started ({line})", 145 extra=ex, 146 ) 147 else: 148 logger.info( 149 "🧪 [EXPERIMENTAL] renew_intent is enabled: unified in-process renew pipeline " 150 "(proxy path only; no Redis BRPOP)" 151 ) 152 return consumer 153 154 def submit_from_proxy(self, sandbox_id: str) -> None: 155 """Enqueue renew work from ``/sandboxes/.../proxy/...`` (non-blocking).""" 156 if not self._app_config.renew_intent.enabled: 157 return 158 asyncio.create_task( 159 self._enqueue_proxy(sandbox_id), 160 name=f"renew_intent_proxy_enqueue_{sandbox_id}", 161 ) 162 163 async def _enqueue_proxy(self, sandbox_id: str) -> None: 164 await self._work_queue.put( 165 RenewWorkItem( 166 source=RENEW_SOURCE_SERVER_PROXY, 167 sandbox_id=sandbox_id, 168 observed_at=datetime.now(timezone.utc), 169 ) 170 ) 171 172 def _spawn_tasks(self) -> None: 173 for i in range(self._processor_count): 174 self._tasks.append( 175 asyncio.create_task( 176 self._processor_loop(i), 177 name=f"renew_intent_processor_{i}", 178 ) 179 ) 180 for i in range(self._feeder_count): 181 self._tasks.append( 182 asyncio.create_task( 183 self._brpop_feeder_loop(i), 184 name=f"renew_intent_brpop_{i}", 185 ) 186 ) 187 188 @staticmethod 189 def _is_stale(observed_at: datetime) -> bool: 190 now = datetime.now(timezone.utc) 191 age = (now - observed_at).total_seconds() 192 return age > INTENT_MAX_AGE_SECONDS 193 194 def _ensure_mru_mem(self, sandbox_id: str) -> _MemSandboxState: 195 if sandbox_id in self._mem_states: 196 st = self._mem_states[sandbox_id] 197 self._mem_states.move_to_end(sandbox_id) 198 else: 199 st = _MemSandboxState(lock=asyncio.Lock()) 200 self._mem_states[sandbox_id] = st 201 self._mem_states.move_to_end(sandbox_id) 202 self._evict_mem_lru_unlocked() 203 return st 204 205 def _evict_mem_lru_unlocked(self) -> None: 206 rotations = 0 207 max_rotations = max(len(self._mem_states), 1) 208 while len(self._mem_states) > self._max_tracked and rotations < max_rotations: 209 k, st = self._mem_states.popitem(last=False) 210 if st.lock.locked(): 211 self._mem_states[k] = st 212 self._mem_states.move_to_end(k) 213 rotations += 1 214 else: 215 rotations = 0 216 217 async def _brpop_feeder_loop(self, worker_id: int) -> None: 218 assert self._redis is not None 219 while not self._stop.is_set(): 220 try: 221 result = await self._redis.brpop( 222 self._queue_key, 223 BRPOP_TIMEOUT_SECONDS, 224 ) 225 except asyncio.CancelledError: 226 raise 227 except (RedisError, OSError) as exc: 228 line, ex = renew_bundle( 229 event="worker_redis_error", 230 source=RENEW_SOURCE_REDIS_QUEUE, 231 worker_id=worker_id, 232 error_type=type(exc).__name__, 233 ) 234 logger.warning(f"renew_intent {line} error={exc!s}", extra=ex) 235 await asyncio.sleep(1.0) 236 continue 237 238 if result is None: 239 continue 240 _, payload = result 241 if not isinstance(payload, str): 242 continue 243 try: 244 intent = parse_renew_intent_json(payload) 245 if intent is None: 246 continue 247 if self._is_stale(intent.observed_at): 248 continue 249 await self._work_queue.put( 250 RenewWorkItem( 251 source=RENEW_SOURCE_REDIS_QUEUE, 252 sandbox_id=intent.sandbox_id, 253 observed_at=intent.observed_at, 254 ) 255 ) 256 except Exception as exc: 257 line, ex = renew_bundle( 258 event="worker_handle_error", 259 source=RENEW_SOURCE_REDIS_QUEUE, 260 worker_id=worker_id, 261 error_type=type(exc).__name__, 262 ) 263 logger.exception(f"renew_intent {line} error={exc!s}", extra=ex) 264 265 async def _processor_loop(self, worker_id: int) -> None: 266 while not self._stop.is_set(): 267 try: 268 work = await asyncio.wait_for(self._work_queue.get(), timeout=1.0) 269 except asyncio.TimeoutError: 270 continue 271 except asyncio.CancelledError: 272 raise 273 try: 274 await self._process_work(work) 275 except Exception as exc: 276 line, ex = renew_bundle( 277 event="processor_error", 278 source=work.source, 279 sandbox_id=work.sandbox_id, 280 worker_id=worker_id, 281 error_type=type(exc).__name__, 282 ) 283 logger.exception(f"renew_intent {line} error={exc!s}", extra=ex) 284 finally: 285 self._work_queue.task_done() 286 287 async def _process_work(self, work: RenewWorkItem) -> None: 288 if self._redis is None and work.source != RENEW_SOURCE_SERVER_PROXY: 289 return 290 291 st = self._ensure_mru_mem(work.sandbox_id) 292 async with st.lock: 293 if self._redis is not None: 294 await self._controller.renew_after_gates(work.sandbox_id, source=work.source) 295 return 296 297 now = time.monotonic() 298 if ( 299 st.last_success_monotonic is not None 300 and (now - st.last_success_monotonic) < self._min_interval 301 ): 302 return 303 ok = await asyncio.to_thread( 304 partial( 305 self._controller.attempt_renew_sync, 306 work.sandbox_id, 307 source=work.source, 308 ) 309 ) 310 if ok: 311 st.last_success_monotonic = time.monotonic() 312 313 async def stop(self) -> None: 314 self._stop.set() 315 for t in self._tasks: 316 t.cancel() 317 await asyncio.gather(*self._tasks, return_exceptions=True) 318 self._tasks.clear() 319 if self._redis is not None: 320 try: 321 await self._redis.aclose() 322 except Exception as exc: 323 logger.debug(f"renew_intent redis_close error={exc!s}") 324 325 326 async def start_renew_intent_consumer( 327 app_config: AppConfig, 328 sandbox_service: SandboxService | None = None, 329 extension_service: ExtensionService | None = None, 330 ) -> Optional[RenewIntentConsumer]: 331 """Start consumer or ``None`` when ``renew_intent.enabled`` is false.""" 332 if sandbox_service is None: 333 sandbox_service = create_sandbox_service(config=app_config) 334 if extension_service is None: 335 extension_service = require_extension_service(sandbox_service) 336 return await RenewIntentConsumer.start(app_config, sandbox_service, extension_service)