consumer.py
  1  # Copyright 2026 Alibaba Group Holding Ltd.
  2  #
  3  # Licensed under the Apache License, Version 2.0 (the "License");
  4  # you may not use this file except in compliance with the License.
  5  # You may obtain a copy of the License at
  6  #
  7  #     http://www.apache.org/licenses/LICENSE-2.0
  8  #
  9  # Unless required by applicable law or agreed to in writing, software
 10  # distributed under the License is distributed on an "AS IS" BASIS,
 11  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 12  # See the License for the specific language governing permissions and
 13  # limitations under the License.
 14  
 15  """Single renew-intent pipeline: Redis BRPOP feeders + proxy submits → one asyncio queue → processors."""
 16  
 17  from __future__ import annotations
 18  
 19  import asyncio
 20  import logging
 21  import time
 22  from collections import OrderedDict
 23  from dataclasses import dataclass
 24  from datetime import datetime, timezone
 25  from functools import partial
 26  from typing import TYPE_CHECKING, Optional
 27  
 28  from redis.exceptions import RedisError
 29  
 30  from opensandbox_server.config import AppConfig
 31  from opensandbox_server.integrations.renew_intent.constants import (
 32      BRPOP_TIMEOUT_SECONDS,
 33      INTENT_MAX_AGE_SECONDS,
 34      PROXY_RENEW_MAX_TRACKED_SANDBOXES,
 35  )
 36  from opensandbox_server.integrations.renew_intent.controller import AccessRenewController
 37  from opensandbox_server.integrations.renew_intent.intent import parse_renew_intent_json
 38  from opensandbox_server.integrations.renew_intent.logutil import (
 39      RENEW_EVENT_WORKERS_NOT_STARTED,
 40      RENEW_EVENT_WORKERS_STARTED,
 41      RENEW_SOURCE_REDIS_QUEUE,
 42      RENEW_SOURCE_SERVER_PROXY,
 43      renew_bundle,
 44  )
 45  from opensandbox_server.integrations.renew_intent.redis_client import connect_renew_intent_redis_from_config
 46  from opensandbox_server.services.extension_service import ExtensionService, require_extension_service
 47  from opensandbox_server.services.factory import create_sandbox_service
 48  from opensandbox_server.services.sandbox_service import SandboxService
 49  
 50  if TYPE_CHECKING:
 51      from redis.asyncio import Redis
 52  
 53  logger = logging.getLogger(__name__)
 54  
 55  
 56  @dataclass(frozen=True)
 57  class RenewWorkItem:
 58      """One unit of work for the shared renew pipeline."""
 59  
 60      source: str
 61      sandbox_id: str
 62      observed_at: datetime
 63  
 64  
 65  @dataclass
 66  class _MemSandboxState:
 67      lock: asyncio.Lock
 68      last_success_monotonic: float | None = None
 69  
 70  
 71  class RenewIntentConsumer:
 72      """
 73      Feeds renew work from Redis BRPOP (optional) and server-proxy ``schedule`` into one queue.
 74      Per-sandbox ``asyncio.Lock`` serializes work; without Redis, ``min_interval`` throttles proxy
 75      renews (ingress throttling is producer-side).
 76      """
 77  
 78      def __init__(
 79          self,
 80          app_config: AppConfig,
 81          sandbox_service: SandboxService,
 82          extension_service: ExtensionService,
 83          redis_client: Optional["Redis"],
 84      ) -> None:
 85          self._app_config = app_config
 86          self._redis = redis_client
 87          ri = app_config.renew_intent
 88          self._queue_key = ri.redis.queue_key
 89          self._feeder_count = ri.redis.consumer_concurrency if redis_client else 0
 90          self._processor_count = max(1, ri.redis.consumer_concurrency)
 91          self._min_interval = float(ri.min_interval_seconds)
 92          self._controller = AccessRenewController(sandbox_service, extension_service)
 93          self._work_queue: asyncio.Queue[RenewWorkItem] = asyncio.Queue()
 94          self._stop = asyncio.Event()
 95          self._tasks: list[asyncio.Task[None]] = []
 96          self._mem_states: OrderedDict[str, _MemSandboxState] = OrderedDict()
 97          self._max_tracked = PROXY_RENEW_MAX_TRACKED_SANDBOXES
 98  
 99      @classmethod
100      async def start(
101          cls,
102          app_config: AppConfig,
103          sandbox_service: SandboxService,
104          extension_service: ExtensionService,
105      ) -> Optional["RenewIntentConsumer"]:
106          if not app_config.renew_intent.enabled:
107              return None
108  
109          redis_client: Optional["Redis"] = None
110          if app_config.renew_intent.redis.enabled:
111              try:
112                  redis_client = await connect_renew_intent_redis_from_config(app_config)
113              except (RedisError, OSError, TimeoutError) as exc:
114                  line, ex = renew_bundle(
115                      event=RENEW_EVENT_WORKERS_NOT_STARTED,
116                      source=RENEW_SOURCE_REDIS_QUEUE,
117                      skip_reason="redis_connect_failed",
118                      error_type=type(exc).__name__,
119                  )
120                  logger.error(f"renew_intent {line} error={exc!s}", extra=ex)
121                  redis_client = None
122              if redis_client is None and app_config.renew_intent.redis.enabled:
123                  line, ex = renew_bundle(
124                      event=RENEW_EVENT_WORKERS_NOT_STARTED,
125                      source=RENEW_SOURCE_REDIS_QUEUE,
126                      skip_reason="redis_client_none",
127                  )
128                  logger.warning(
129                      f"renew_intent {line}; continuing with proxy-only renew pipeline",
130                      extra=ex,
131                  )
132  
133          consumer = cls(app_config, sandbox_service, extension_service, redis_client)
134          consumer._spawn_tasks()
135          if redis_client is not None:
136              line, ex = renew_bundle(
137                  event=RENEW_EVENT_WORKERS_STARTED,
138                  source=RENEW_SOURCE_REDIS_QUEUE,
139                  worker_count=consumer._feeder_count + consumer._processor_count,
140                  queue_key=consumer._queue_key,
141              )
142              logger.info(
143                  f"🧪 [EXPERIMENTAL] renew_intent is enabled: Redis BRPOP feeders + "
144                  f"unified processors started ({line})",
145                  extra=ex,
146              )
147          else:
148              logger.info(
149                  "🧪 [EXPERIMENTAL] renew_intent is enabled: unified in-process renew pipeline "
150                  "(proxy path only; no Redis BRPOP)"
151              )
152          return consumer
153  
154      def submit_from_proxy(self, sandbox_id: str) -> None:
155          """Enqueue renew work from ``/sandboxes/.../proxy/...`` (non-blocking)."""
156          if not self._app_config.renew_intent.enabled:
157              return
158          asyncio.create_task(
159              self._enqueue_proxy(sandbox_id),
160              name=f"renew_intent_proxy_enqueue_{sandbox_id}",
161          )
162  
163      async def _enqueue_proxy(self, sandbox_id: str) -> None:
164          await self._work_queue.put(
165              RenewWorkItem(
166                  source=RENEW_SOURCE_SERVER_PROXY,
167                  sandbox_id=sandbox_id,
168                  observed_at=datetime.now(timezone.utc),
169              )
170          )
171  
172      def _spawn_tasks(self) -> None:
173          for i in range(self._processor_count):
174              self._tasks.append(
175                  asyncio.create_task(
176                      self._processor_loop(i),
177                      name=f"renew_intent_processor_{i}",
178                  )
179              )
180          for i in range(self._feeder_count):
181              self._tasks.append(
182                  asyncio.create_task(
183                      self._brpop_feeder_loop(i),
184                      name=f"renew_intent_brpop_{i}",
185                  )
186              )
187  
188      @staticmethod
189      def _is_stale(observed_at: datetime) -> bool:
190          now = datetime.now(timezone.utc)
191          age = (now - observed_at).total_seconds()
192          return age > INTENT_MAX_AGE_SECONDS
193  
194      def _ensure_mru_mem(self, sandbox_id: str) -> _MemSandboxState:
195          if sandbox_id in self._mem_states:
196              st = self._mem_states[sandbox_id]
197              self._mem_states.move_to_end(sandbox_id)
198          else:
199              st = _MemSandboxState(lock=asyncio.Lock())
200              self._mem_states[sandbox_id] = st
201              self._mem_states.move_to_end(sandbox_id)
202          self._evict_mem_lru_unlocked()
203          return st
204  
205      def _evict_mem_lru_unlocked(self) -> None:
206          rotations = 0
207          max_rotations = max(len(self._mem_states), 1)
208          while len(self._mem_states) > self._max_tracked and rotations < max_rotations:
209              k, st = self._mem_states.popitem(last=False)
210              if st.lock.locked():
211                  self._mem_states[k] = st
212                  self._mem_states.move_to_end(k)
213                  rotations += 1
214              else:
215                  rotations = 0
216  
217      async def _brpop_feeder_loop(self, worker_id: int) -> None:
218          assert self._redis is not None
219          while not self._stop.is_set():
220              try:
221                  result = await self._redis.brpop(
222                      self._queue_key,
223                      BRPOP_TIMEOUT_SECONDS,
224                  )
225              except asyncio.CancelledError:
226                  raise
227              except (RedisError, OSError) as exc:
228                  line, ex = renew_bundle(
229                      event="worker_redis_error",
230                      source=RENEW_SOURCE_REDIS_QUEUE,
231                      worker_id=worker_id,
232                      error_type=type(exc).__name__,
233                  )
234                  logger.warning(f"renew_intent {line} error={exc!s}", extra=ex)
235                  await asyncio.sleep(1.0)
236                  continue
237  
238              if result is None:
239                  continue
240              _, payload = result
241              if not isinstance(payload, str):
242                  continue
243              try:
244                  intent = parse_renew_intent_json(payload)
245                  if intent is None:
246                      continue
247                  if self._is_stale(intent.observed_at):
248                      continue
249                  await self._work_queue.put(
250                      RenewWorkItem(
251                          source=RENEW_SOURCE_REDIS_QUEUE,
252                          sandbox_id=intent.sandbox_id,
253                          observed_at=intent.observed_at,
254                      )
255                  )
256              except Exception as exc:
257                  line, ex = renew_bundle(
258                      event="worker_handle_error",
259                      source=RENEW_SOURCE_REDIS_QUEUE,
260                      worker_id=worker_id,
261                      error_type=type(exc).__name__,
262                  )
263                  logger.exception(f"renew_intent {line} error={exc!s}", extra=ex)
264  
265      async def _processor_loop(self, worker_id: int) -> None:
266          while not self._stop.is_set():
267              try:
268                  work = await asyncio.wait_for(self._work_queue.get(), timeout=1.0)
269              except asyncio.TimeoutError:
270                  continue
271              except asyncio.CancelledError:
272                  raise
273              try:
274                  await self._process_work(work)
275              except Exception as exc:
276                  line, ex = renew_bundle(
277                      event="processor_error",
278                      source=work.source,
279                      sandbox_id=work.sandbox_id,
280                      worker_id=worker_id,
281                      error_type=type(exc).__name__,
282                  )
283                  logger.exception(f"renew_intent {line} error={exc!s}", extra=ex)
284              finally:
285                  self._work_queue.task_done()
286  
287      async def _process_work(self, work: RenewWorkItem) -> None:
288          if self._redis is None and work.source != RENEW_SOURCE_SERVER_PROXY:
289              return
290  
291          st = self._ensure_mru_mem(work.sandbox_id)
292          async with st.lock:
293              if self._redis is not None:
294                  await self._controller.renew_after_gates(work.sandbox_id, source=work.source)
295                  return
296  
297              now = time.monotonic()
298              if (
299                  st.last_success_monotonic is not None
300                  and (now - st.last_success_monotonic) < self._min_interval
301              ):
302                  return
303              ok = await asyncio.to_thread(
304                  partial(
305                      self._controller.attempt_renew_sync,
306                      work.sandbox_id,
307                      source=work.source,
308                  )
309              )
310              if ok:
311                  st.last_success_monotonic = time.monotonic()
312  
313      async def stop(self) -> None:
314          self._stop.set()
315          for t in self._tasks:
316              t.cancel()
317          await asyncio.gather(*self._tasks, return_exceptions=True)
318          self._tasks.clear()
319          if self._redis is not None:
320              try:
321                  await self._redis.aclose()
322              except Exception as exc:
323                  logger.debug(f"renew_intent redis_close error={exc!s}")
324  
325  
326  async def start_renew_intent_consumer(
327      app_config: AppConfig,
328      sandbox_service: SandboxService | None = None,
329      extension_service: ExtensionService | None = None,
330  ) -> Optional[RenewIntentConsumer]:
331      """Start consumer or ``None`` when ``renew_intent.enabled`` is false."""
332      if sandbox_service is None:
333          sandbox_service = create_sandbox_service(config=app_config)
334      if extension_service is None:
335          extension_service = require_extension_service(sandbox_service)
336      return await RenewIntentConsumer.start(app_config, sandbox_service, extension_service)