test_rate_limiter.py
1 # Copyright 2025 Alibaba Group Holding Ltd. 2 # 3 # Licensed under the Apache License, Version 2.0 (the "License"); 4 # you may not use this file except in compliance with the License. 5 # You may obtain a copy of the License at 6 # 7 # http://www.apache.org/licenses/LICENSE-2.0 8 # 9 # Unless required by applicable law or agreed to in writing, software 10 # distributed under the License is distributed on an "AS IS" BASIS, 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 # See the License for the specific language governing permissions and 13 # limitations under the License. 14 15 import time 16 import threading 17 from unittest.mock import patch 18 19 import pytest 20 21 from opensandbox_server.services.k8s.rate_limiter import TokenBucketRateLimiter 22 23 class TestTokenBucketRateLimiter: 24 25 def test_invalid_qps_raises_value_error(self): 26 """qps <= 0 must raise ValueError.""" 27 with pytest.raises(ValueError, match="qps must be > 0"): 28 TokenBucketRateLimiter(qps=0) 29 30 def test_negative_qps_raises_value_error(self): 31 """Negative qps must raise ValueError.""" 32 with pytest.raises(ValueError): 33 TokenBucketRateLimiter(qps=-1.0) 34 35 def test_burst_defaults_to_qps_when_zero(self): 36 """burst=0 means the bucket capacity equals qps (minimum 1).""" 37 limiter = TokenBucketRateLimiter(qps=5.0, burst=0) 38 assert limiter._burst == 5.0 39 40 def test_explicit_burst_is_respected(self): 41 """Explicit burst value sets bucket capacity independently from qps.""" 42 limiter = TokenBucketRateLimiter(qps=5.0, burst=20) 43 assert limiter._burst == 20.0 44 45 def test_burst_minimum_is_one_when_qps_below_one(self): 46 """burst is clamped to 1 when qps < 1 and burst is not set.""" 47 limiter = TokenBucketRateLimiter(qps=0.5) 48 assert limiter._burst == 1.0 49 50 def test_low_qps_limiter_can_acquire(self): 51 """A limiter with qps < 1 and default burst must be able to issue a token.""" 52 limiter = TokenBucketRateLimiter(qps=0.5) 53 assert limiter.try_acquire() is True 54 55 def test_try_acquire_succeeds_when_bucket_full(self): 56 """try_acquire returns True when tokens are available.""" 57 limiter = TokenBucketRateLimiter(qps=10.0, burst=10) 58 assert limiter.try_acquire() is True 59 60 def test_try_acquire_fails_when_bucket_empty(self): 61 """try_acquire returns False after exhausting all tokens.""" 62 limiter = TokenBucketRateLimiter(qps=1.0, burst=1) 63 limiter.try_acquire() # consume the only token 64 assert limiter.try_acquire() is False 65 66 def test_try_acquire_consumes_token(self): 67 """Each successful try_acquire reduces available tokens by one.""" 68 limiter = TokenBucketRateLimiter(qps=10.0, burst=3) 69 assert limiter.try_acquire() is True 70 assert limiter.try_acquire() is True 71 assert limiter.try_acquire() is True 72 assert limiter.try_acquire() is False 73 74 def test_acquire_succeeds_immediately_when_tokens_available(self): 75 """acquire completes without sleeping when the bucket has tokens.""" 76 limiter = TokenBucketRateLimiter(qps=100.0, burst=10) 77 start = time.monotonic() 78 limiter.acquire() 79 elapsed = time.monotonic() - start 80 assert elapsed < 0.1 # should be essentially instant 81 82 def test_acquire_blocks_until_token_available(self): 83 """acquire blocks and returns only after a token refills.""" 84 limiter = TokenBucketRateLimiter(qps=10.0, burst=1) 85 limiter.try_acquire() # drain the bucket 86 87 start = time.monotonic() 88 limiter.acquire() # should wait ~0.1s for next token 89 elapsed = time.monotonic() - start 90 91 assert elapsed >= 0.05 # some delay occurred 92 93 def test_acquire_minimum_sleep_prevents_busy_loop(self): 94 """acquire sleeps at least 1 ms even when wait is near-zero.""" 95 limiter = TokenBucketRateLimiter(qps=1.0, burst=1) 96 # Manually set tokens to just below 1 to produce a near-zero wait 97 with limiter._lock: 98 limiter._tokens = 1.0 - 1e-10 99 100 with patch("opensandbox_server.services.k8s.rate_limiter.time.sleep") as mock_sleep: 101 # _try_acquire will succeed on first or second call; we only care 102 # that if sleep is called, the argument is >= 0.001. 103 limiter.acquire() 104 for call in mock_sleep.call_args_list: 105 assert call.args[0] >= 0.001 106 107 def test_tokens_refill_over_time(self): 108 """Tokens are replenished proportional to elapsed time.""" 109 limiter = TokenBucketRateLimiter(qps=100.0, burst=10) 110 # Drain all tokens 111 for _ in range(10): 112 limiter.try_acquire() 113 assert limiter.try_acquire() is False 114 115 time.sleep(0.05) # wait for ~5 tokens to refill at 100 qps 116 117 assert limiter.try_acquire() is True 118 119 def test_tokens_capped_at_burst(self): 120 """Token count never exceeds burst capacity.""" 121 limiter = TokenBucketRateLimiter(qps=10.0, burst=5) 122 time.sleep(0.5) # wait long enough to overflow if cap not applied 123 # Force a refill by calling _try_acquire internals 124 with limiter._lock: 125 limiter._refill() 126 assert limiter._tokens <= 5.0 127 128 def test_concurrent_acquires_do_not_exceed_burst(self): 129 """Concurrent threads must not collectively acquire more than burst tokens.""" 130 burst = 5 131 limiter = TokenBucketRateLimiter(qps=1000.0, burst=burst) 132 successes = [] 133 lock = threading.Lock() 134 135 # Freeze time so _refill() never adds extra tokens during the test 136 fixed_time = limiter._last_refill 137 138 def worker(): 139 with patch("opensandbox_server.services.k8s.rate_limiter.time.monotonic", return_value=fixed_time): 140 if limiter.try_acquire(): 141 with lock: 142 successes.append(1) 143 144 threads = [threading.Thread(target=worker) for _ in range(20)] 145 for t in threads: 146 t.start() 147 for t in threads: 148 t.join() 149 150 assert len(successes) <= burst