redis_client.py
 1  # Copyright 2026 Alibaba Group Holding Ltd.
 2  #
 3  # Licensed under the Apache License, Version 2.0 (the "License");
 4  # you may not use this file except in compliance with the License.
 5  # You may obtain a copy of the License at
 6  #
 7  #     http://www.apache.org/licenses/LICENSE-2.0
 8  #
 9  # Unless required by applicable law or agreed to in writing, software
10  # distributed under the License is distributed on an "AS IS" BASIS,
11  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  # See the License for the specific language governing permissions and
13  # limitations under the License.
14  
15  """Async Redis client for renew-intent queue consumers."""
16  
17  from __future__ import annotations
18  
19  import logging
20  from typing import Optional
21  
22  import redis.asyncio as redis_async
23  from redis.asyncio import Redis
24  
25  from opensandbox_server.config import AppConfig
26  from opensandbox_server.integrations.renew_intent.logutil import (
27      RENEW_EVENT_REDIS_CONNECTED,
28      RENEW_SOURCE_REDIS_QUEUE,
29      renew_bundle,
30  )
31  
32  logger = logging.getLogger(__name__)
33  
34  
35  async def connect_renew_intent_redis_from_config(
36      app_config: AppConfig,
37  ) -> Optional[Redis]:
38      """Connect (with ``PING``) or ``None`` if renew-intent Redis is disabled."""
39      ri = app_config.renew_intent
40      if not ri.enabled or not ri.redis.enabled:
41          return None
42  
43      dsn = ri.redis.dsn
44      if dsn is None or not str(dsn).strip():
45          return None
46  
47      client = redis_async.from_url(
48          str(dsn).strip(),
49          decode_responses=True,
50      )
51      await client.ping()
52      line, ex = renew_bundle(
53          event=RENEW_EVENT_REDIS_CONNECTED,
54          source=RENEW_SOURCE_REDIS_QUEUE,
55          queue_key=ri.redis.queue_key,
56          consumer_concurrency=ri.redis.consumer_concurrency,
57      )
58      logger.info(f"renew_intent {line}", extra=ex)
59      return client
60  
61  
62  async def close_renew_intent_redis_client(client: Optional[Redis]) -> None:
63      """Close client; no-op for ``None``."""
64      if client is None:
65          return
66      await client.aclose()