redis_client.py
1 # Copyright 2026 Alibaba Group Holding Ltd. 2 # 3 # Licensed under the Apache License, Version 2.0 (the "License"); 4 # you may not use this file except in compliance with the License. 5 # You may obtain a copy of the License at 6 # 7 # http://www.apache.org/licenses/LICENSE-2.0 8 # 9 # Unless required by applicable law or agreed to in writing, software 10 # distributed under the License is distributed on an "AS IS" BASIS, 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 # See the License for the specific language governing permissions and 13 # limitations under the License. 14 15 """Async Redis client for renew-intent queue consumers.""" 16 17 from __future__ import annotations 18 19 import logging 20 from typing import Optional 21 22 import redis.asyncio as redis_async 23 from redis.asyncio import Redis 24 25 from opensandbox_server.config import AppConfig 26 from opensandbox_server.integrations.renew_intent.logutil import ( 27 RENEW_EVENT_REDIS_CONNECTED, 28 RENEW_SOURCE_REDIS_QUEUE, 29 renew_bundle, 30 ) 31 32 logger = logging.getLogger(__name__) 33 34 35 async def connect_renew_intent_redis_from_config( 36 app_config: AppConfig, 37 ) -> Optional[Redis]: 38 """Connect (with ``PING``) or ``None`` if renew-intent Redis is disabled.""" 39 ri = app_config.renew_intent 40 if not ri.enabled or not ri.redis.enabled: 41 return None 42 43 dsn = ri.redis.dsn 44 if dsn is None or not str(dsn).strip(): 45 return None 46 47 client = redis_async.from_url( 48 str(dsn).strip(), 49 decode_responses=True, 50 ) 51 await client.ping() 52 line, ex = renew_bundle( 53 event=RENEW_EVENT_REDIS_CONNECTED, 54 source=RENEW_SOURCE_REDIS_QUEUE, 55 queue_key=ri.redis.queue_key, 56 consumer_concurrency=ri.redis.consumer_concurrency, 57 ) 58 logger.info(f"renew_intent {line}", extra=ex) 59 return client 60 61 62 async def close_renew_intent_redis_client(client: Optional[Redis]) -> None: 63 """Close client; no-op for ``None``.""" 64 if client is None: 65 return 66 await client.aclose()