controller.py
1 # Copyright 2026 Alibaba Group Holding Ltd. 2 # 3 # Licensed under the Apache License, Version 2.0 (the "License"); 4 # you may not use this file except in compliance with the License. 5 # You may obtain a copy of the License at 6 # 7 # http://www.apache.org/licenses/LICENSE-2.0 8 # 9 # Unless required by applicable law or agreed to in writing, software 10 # distributed under the License is distributed on an "AS IS" BASIS, 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 # See the License for the specific language governing permissions and 13 # limitations under the License. 14 15 """Renew access path: eligibility checks then ``renew_expiration``.""" 16 17 from __future__ import annotations 18 19 import asyncio 20 import logging 21 from datetime import datetime, timedelta, timezone 22 from typing import TYPE_CHECKING 23 24 from fastapi import HTTPException 25 26 from opensandbox_server.api.schema import RenewSandboxExpirationRequest 27 from opensandbox_server.integrations.renew_intent.intent import RenewIntent 28 from opensandbox_server.integrations.renew_intent.logutil import ( 29 RENEW_EVENT_FAILED, 30 RENEW_EVENT_SUCCEEDED, 31 RENEW_SOURCE_REDIS_QUEUE, 32 RENEW_SOURCE_SERVER_PROXY, 33 renew_bundle, 34 ) 35 36 if TYPE_CHECKING: 37 from opensandbox_server.services.extension_service import ExtensionService 38 from opensandbox_server.services.sandbox_service import SandboxService 39 40 logger = logging.getLogger(__name__) 41 42 43 def _http_detail_str(detail: object) -> str: 44 if isinstance(detail, dict): 45 return str(detail.get("message", detail)) 46 return str(detail) 47 48 49 class AccessRenewController: 50 """Eligibility gates and ``renew_expiration``; rate limiting is expected upstream (ingress / proxy).""" 51 52 def __init__( 53 self, 54 sandbox_service: "SandboxService", 55 extension_service: "ExtensionService", 56 ) -> None: 57 self._sandbox_service = sandbox_service 58 self._extension_service = extension_service 59 60 def _try_renew_sync(self, sandbox_id: str, *, source: str) -> bool: 61 try: 62 sandbox = self._sandbox_service.get_sandbox(sandbox_id) 63 except HTTPException: 64 return False 65 66 if sandbox.status.state.lower() != "running": 67 return False 68 69 if sandbox.expires_at is None: 70 return False 71 72 extend = self._extension_service.get_access_renew_extend_seconds(sandbox_id) 73 if extend is None: 74 return False 75 76 now = datetime.now(timezone.utc) 77 current = sandbox.expires_at 78 if current.tzinfo is None: 79 current = current.replace(tzinfo=timezone.utc) 80 81 candidate = now + timedelta(seconds=extend) 82 new_expires = max(candidate, current) 83 84 req = RenewSandboxExpirationRequest(expires_at=new_expires) 85 try: 86 self._sandbox_service.renew_expiration(sandbox_id, req) 87 except HTTPException as exc: 88 detail_s = _http_detail_str(exc.detail) 89 line, ex = renew_bundle( 90 event=RENEW_EVENT_FAILED, 91 source=source, 92 sandbox_id=sandbox_id, 93 skip_reason="renew_expiration_rejected", 94 http_detail=detail_s, 95 http_status=getattr(exc, "status_code", None), 96 ) 97 logger.warning(f"renew_intent {line} detail={detail_s}", extra=ex) 98 return False 99 except Exception as exc: 100 line, ex = renew_bundle( 101 event=RENEW_EVENT_FAILED, 102 source=source, 103 sandbox_id=sandbox_id, 104 skip_reason="renew_expiration_error", 105 error_type=type(exc).__name__, 106 ) 107 logger.exception(f"renew_intent {line}", extra=ex) 108 return False 109 110 new_expires_iso = new_expires.isoformat() 111 line, ex = renew_bundle( 112 event=RENEW_EVENT_SUCCEEDED, 113 source=source, 114 sandbox_id=sandbox_id, 115 new_expires_at=new_expires_iso, 116 ) 117 logger.info(f"renew_intent {line}", extra=ex) 118 return True 119 120 def attempt_renew_sync(self, sandbox_id: str, *, source: str = RENEW_SOURCE_SERVER_PROXY) -> bool: 121 """Run gates + renew (sync).""" 122 return self._try_renew_sync(sandbox_id, source=source) 123 124 async def renew_after_gates(self, sandbox_id: str, *, source: str) -> None: 125 """Run renew in a worker thread (caller holds per-sandbox serialization).""" 126 await asyncio.to_thread(self._try_renew_sync, sandbox_id, source=source) 127 128 async def process_intent_after_lock(self, intent: RenewIntent) -> None: 129 await self.renew_after_gates(intent.sandbox_id, source=RENEW_SOURCE_REDIS_QUEUE)