/ server / tests / k8s / test_rate_limiter.py
test_rate_limiter.py
  1  # Copyright 2025 Alibaba Group Holding Ltd.
  2  #
  3  # Licensed under the Apache License, Version 2.0 (the "License");
  4  # you may not use this file except in compliance with the License.
  5  # You may obtain a copy of the License at
  6  #
  7  #     http://www.apache.org/licenses/LICENSE-2.0
  8  #
  9  # Unless required by applicable law or agreed to in writing, software
 10  # distributed under the License is distributed on an "AS IS" BASIS,
 11  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 12  # See the License for the specific language governing permissions and
 13  # limitations under the License.
 14  
 15  import time
 16  import threading
 17  from unittest.mock import patch
 18  
 19  import pytest
 20  
 21  from opensandbox_server.services.k8s.rate_limiter import TokenBucketRateLimiter
 22  
 23  class TestTokenBucketRateLimiter:
 24  
 25      def test_invalid_qps_raises_value_error(self):
 26          """qps <= 0 must raise ValueError."""
 27          with pytest.raises(ValueError, match="qps must be > 0"):
 28              TokenBucketRateLimiter(qps=0)
 29  
 30      def test_negative_qps_raises_value_error(self):
 31          """Negative qps must raise ValueError."""
 32          with pytest.raises(ValueError):
 33              TokenBucketRateLimiter(qps=-1.0)
 34  
 35      def test_burst_defaults_to_qps_when_zero(self):
 36          """burst=0 means the bucket capacity equals qps (minimum 1)."""
 37          limiter = TokenBucketRateLimiter(qps=5.0, burst=0)
 38          assert limiter._burst == 5.0
 39  
 40      def test_explicit_burst_is_respected(self):
 41          """Explicit burst value sets bucket capacity independently from qps."""
 42          limiter = TokenBucketRateLimiter(qps=5.0, burst=20)
 43          assert limiter._burst == 20.0
 44  
 45      def test_burst_minimum_is_one_when_qps_below_one(self):
 46          """burst is clamped to 1 when qps < 1 and burst is not set."""
 47          limiter = TokenBucketRateLimiter(qps=0.5)
 48          assert limiter._burst == 1.0
 49  
 50      def test_low_qps_limiter_can_acquire(self):
 51          """A limiter with qps < 1 and default burst must be able to issue a token."""
 52          limiter = TokenBucketRateLimiter(qps=0.5)
 53          assert limiter.try_acquire() is True
 54  
 55      def test_try_acquire_succeeds_when_bucket_full(self):
 56          """try_acquire returns True when tokens are available."""
 57          limiter = TokenBucketRateLimiter(qps=10.0, burst=10)
 58          assert limiter.try_acquire() is True
 59  
 60      def test_try_acquire_fails_when_bucket_empty(self):
 61          """try_acquire returns False after exhausting all tokens."""
 62          limiter = TokenBucketRateLimiter(qps=1.0, burst=1)
 63          limiter.try_acquire()  # consume the only token
 64          assert limiter.try_acquire() is False
 65  
 66      def test_try_acquire_consumes_token(self):
 67          """Each successful try_acquire reduces available tokens by one."""
 68          limiter = TokenBucketRateLimiter(qps=10.0, burst=3)
 69          assert limiter.try_acquire() is True
 70          assert limiter.try_acquire() is True
 71          assert limiter.try_acquire() is True
 72          assert limiter.try_acquire() is False
 73  
 74      def test_acquire_succeeds_immediately_when_tokens_available(self):
 75          """acquire completes without sleeping when the bucket has tokens."""
 76          limiter = TokenBucketRateLimiter(qps=100.0, burst=10)
 77          start = time.monotonic()
 78          limiter.acquire()
 79          elapsed = time.monotonic() - start
 80          assert elapsed < 0.1  # should be essentially instant
 81  
 82      def test_acquire_blocks_until_token_available(self):
 83          """acquire blocks and returns only after a token refills."""
 84          limiter = TokenBucketRateLimiter(qps=10.0, burst=1)
 85          limiter.try_acquire()  # drain the bucket
 86  
 87          start = time.monotonic()
 88          limiter.acquire()  # should wait ~0.1s for next token
 89          elapsed = time.monotonic() - start
 90  
 91          assert elapsed >= 0.05  # some delay occurred
 92  
 93      def test_acquire_minimum_sleep_prevents_busy_loop(self):
 94          """acquire sleeps at least 1 ms even when wait is near-zero."""
 95          limiter = TokenBucketRateLimiter(qps=1.0, burst=1)
 96          # Manually set tokens to just below 1 to produce a near-zero wait
 97          with limiter._lock:
 98              limiter._tokens = 1.0 - 1e-10
 99  
100          with patch("opensandbox_server.services.k8s.rate_limiter.time.sleep") as mock_sleep:
101              # _try_acquire will succeed on first or second call; we only care
102              # that if sleep is called, the argument is >= 0.001.
103              limiter.acquire()
104              for call in mock_sleep.call_args_list:
105                  assert call.args[0] >= 0.001
106  
107      def test_tokens_refill_over_time(self):
108          """Tokens are replenished proportional to elapsed time."""
109          limiter = TokenBucketRateLimiter(qps=100.0, burst=10)
110          # Drain all tokens
111          for _ in range(10):
112              limiter.try_acquire()
113          assert limiter.try_acquire() is False
114  
115          time.sleep(0.05)  # wait for ~5 tokens to refill at 100 qps
116  
117          assert limiter.try_acquire() is True
118  
119      def test_tokens_capped_at_burst(self):
120          """Token count never exceeds burst capacity."""
121          limiter = TokenBucketRateLimiter(qps=10.0, burst=5)
122          time.sleep(0.5)  # wait long enough to overflow if cap not applied
123          # Force a refill by calling _try_acquire internals
124          with limiter._lock:
125              limiter._refill()
126          assert limiter._tokens <= 5.0
127  
128      def test_concurrent_acquires_do_not_exceed_burst(self):
129          """Concurrent threads must not collectively acquire more than burst tokens."""
130          burst = 5
131          limiter = TokenBucketRateLimiter(qps=1000.0, burst=burst)
132          successes = []
133          lock = threading.Lock()
134  
135          # Freeze time so _refill() never adds extra tokens during the test
136          fixed_time = limiter._last_refill
137  
138          def worker():
139              with patch("opensandbox_server.services.k8s.rate_limiter.time.monotonic", return_value=fixed_time):
140                  if limiter.try_acquire():
141                      with lock:
142                          successes.append(1)
143  
144          threads = [threading.Thread(target=worker) for _ in range(20)]
145          for t in threads:
146              t.start()
147          for t in threads:
148              t.join()
149  
150          assert len(successes) <= burst