/ kubernetes / hack / pool-perf.py
pool-perf.py
  1  # Copyright 2025 Alibaba Group Holding Ltd.
  2  #
  3  # Licensed under the Apache License, Version 2.0 (the "License");
  4  # you may not use this file except in compliance with the License.
  5  # You may obtain a copy of the License at
  6  #
  7  #     http:#www.apache.org/licenses/LICENSE-2.0
  8  #
  9  # Unless required by applicable law or agreed to in writing, software
 10  # distributed under the License is distributed on an "AS IS" BASIS,
 11  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 12  # See the License for the specific language governing permissions and
 13  # limitations under the License.
 14  
 15  import asyncio
 16  import time
 17  import uuid
 18  import sys
 19  import argparse
 20  from kubernetes import client, config
 21  from kubernetes.client.rest import ApiException
 22  
 23  # CRD configurations
 24  GROUP = "sandbox.opensandbox.io"
 25  VERSION = "v1alpha1"
 26  POOL_PLURAL = "pools"
 27  BSB_PLURAL = "batchsandboxes"
 28  NAMESPACE = "default"
 29  
 30  class PoolPerformanceTester:
 31      def __init__(self, pool_name, pool_size, replicas_per_bsb, total_bsb_count, timeout, poll_interval=0.00001):
 32          try:
 33              config.load_kube_config()
 34          except Exception:
 35              # Fall back to in-cluster config if kube config is not available
 36              config.load_incluster_config()
 37          self.custom_api = client.CustomObjectsApi()
 38          self.pool_name = pool_name
 39          self.pool_size = pool_size
 40          self.replicas_per_bsb = replicas_per_bsb
 41          self.total_bsb_count = total_bsb_count
 42          self.timeout = timeout
 43          self.poll_interval = poll_interval
 44          self.bsb_names = []
 45          self.results = {}
 46  
 47      def create_pool_manifest(self, size):
 48          return {
 49              "apiVersion": f"{GROUP}/{VERSION}",
 50              "kind": "Pool",
 51              "metadata": {"name": self.pool_name},
 52              "spec": {
 53                  "template": {
 54                      "spec": {
 55                          "containers": [{"name": "nginx", "image": "nginx:alpine"}]
 56                      }
 57                  },
 58                  "capacitySpec": {
 59                      "bufferMin": 5,
 60                      "bufferMax": 10,
 61                      "poolMin": size,
 62                      "poolMax": size + 20
 63                  }
 64              }
 65          }
 66  
 67      def create_bsb_manifest(self, name):
 68          return {
 69              "apiVersion": f"{GROUP}/{VERSION}",
 70              "kind": "BatchSandbox",
 71              "metadata": {"name": name},
 72              "spec": {
 73                  "replicas": self.replicas_per_bsb,
 74                  "poolRef": self.pool_name
 75              }
 76          }
 77  
 78      async def setup_pool(self):
 79          """Create and wait for the resource pool to be ready"""
 80          print(f"๐Ÿš€ Setting up Pool: {self.pool_name} with size {self.pool_size}...")
 81          try:
 82              self.custom_api.delete_namespaced_custom_object(GROUP, VERSION, NAMESPACE, POOL_PLURAL, self.pool_name)
 83              await asyncio.sleep(5)
 84          except ApiException as e:
 85              if e.status != 404:
 86                  print(f"โš ๏ธ  Failed to delete existing Pool: {e}")
 87          except Exception as e:
 88              print(f"โš ๏ธ  Error during Pool deletion: {e}")
 89  
 90          body = self.create_pool_manifest(self.pool_size)
 91          self.custom_api.create_namespaced_custom_object(GROUP, VERSION, NAMESPACE, POOL_PLURAL, body)
 92          
 93          # Wait for Available count to reach target
 94          while True:
 95              try:
 96                  pool = self.custom_api.get_namespaced_custom_object(GROUP, VERSION, NAMESPACE, POOL_PLURAL, self.pool_name)
 97                  available = pool.get("status", {}).get("available", 0)
 98                  if available >= self.pool_size:
 99                      print(f"โœ… Pool is Ready. Available: {available}")
100                      break
101                  print(f"Waiting for Pool Ready... Available: {available}")
102              except Exception as e:
103                  print(f"Waiting for Pool to be created... {e}")
104              await asyncio.sleep(2)
105  
106      async def create_bsb(self, index):
107          """Create BatchSandboxes concurrently"""
108          name = f"perf-test-{uuid.uuid4().hex[:8]}"
109          self.bsb_names.append(name)
110          body = self.create_bsb_manifest(name)
111          
112          start_time = time.time()
113          try:
114              self.custom_api.create_namespaced_custom_object(GROUP, VERSION, NAMESPACE, BSB_PLURAL, body)
115              self.results[name] = {"create_time": time.time() - start_time, "allocated_time": None}
116          except ApiException as e:
117              print(f"โŒ Failed to create {name}: {e}")
118  
119      async def wait_for_allocation(self, name):
120          """Poll for allocation completion"""
121          start_polling = time.time()
122          while True:
123              try:
124                  bsb = self.custom_api.get_namespaced_custom_object(GROUP, VERSION, NAMESPACE, BSB_PLURAL, name)
125                  status = bsb.get("status", {})
126                  allocated = status.get("allocated", 0)
127                  
128                  if allocated >= self.replicas_per_bsb:
129                      print("{0}, endpoint {1}".format(name, bsb.get("metadata", {}).get("annotations", {}).get("sandbox.opensandbox.io/endpoints", "")))
130                      self.results[name]["allocated_time"] = time.time() - start_polling
131                      break
132              except Exception as e:
133                  pass
134              
135              await asyncio.sleep(self.poll_interval)
136              if time.time() - start_polling > self.timeout:
137                  print(f"โฐ Timeout waiting for {name}")
138                  break
139  
140      async def run(self):
141          await self.setup_pool()
142          
143          print(f"๐Ÿ”ฅ Starting concurrent allocation test: {self.total_bsb_count} BatchSandboxes...")
144          start_all = time.time()
145          
146          # Concurrent creation
147          await asyncio.gather(*(self.create_bsb(i) for i in range(self.total_bsb_count)))
148          
149          # Concurrent wait for allocation
150          await asyncio.gather(*(self.wait_for_allocation(name) for name in self.bsb_names))
151          
152          total_duration = time.time() - start_all
153          self.print_report(total_duration)
154  
155      def print_report(self, total_duration):
156          print("\n" + "="*40)
157          print("๐Ÿ“Š PERFORMANCE REPORT")
158          print("="*40)
159          durations = [r["allocated_time"] for r in self.results.values() if r.get("allocated_time") is not None]
160          
161          if durations:
162              avg_lat = sum(durations) / len(durations)
163              max_lat = max(durations)
164              p95 = sorted(durations)[int(len(durations) * 0.95)]
165              
166              print(f"Total BSB:      {self.total_bsb_count}")
167              print(f"Total Duration: {total_duration:.2f}s")
168              print(f"Throughput:     {len(durations)/total_duration:.2f} sandbox/s")
169              print(f"Avg Latency:    {avg_lat:.2f}s")
170              print(f"Max Latency:    {max_lat:.2f}s")
171              print(f"P95 Latency:    {p95:.2f}s")
172              print(f"Success Rate:   {len(durations)/self.total_bsb_count*100:.1f}%")
173          else:
174              print("No successful allocations recorded.")
175          print("="*40)
176  
177      def cleanup(self):
178          print("๐Ÿงน Cleaning up...")
179          for name in self.bsb_names:
180              try:
181                  self.custom_api.delete_namespaced_custom_object(GROUP, VERSION, NAMESPACE, BSB_PLURAL, name)
182              except Exception as e:
183                  # Silently ignore deletion errors during cleanup
184                  pass
185          try:
186              self.custom_api.delete_namespaced_custom_object(GROUP, VERSION, NAMESPACE, POOL_PLURAL, self.pool_name)
187          except Exception as e:
188              # Silently ignore deletion errors during cleanup
189              pass
190  
191  if __name__ == "__main__":
192      parser = argparse.ArgumentParser(description="Pool Performance Tester")
193      parser.add_argument("--pool-name", type=str, default="perf-pool", help="Pool name (default: perf-pool)")
194      parser.add_argument("--pool-size", type=int, default=50, help="Pool size (default: 50)")
195      parser.add_argument("--replicas", type=int, default=1, help="Replicas per BatchSandbox (default: 1)")
196      parser.add_argument("--bsb-count", type=int, default=50, help="Number of BatchSandboxes to create concurrently (default: 50)")
197      parser.add_argument("--namespace", type=str, default="default", help="Kubernetes namespace (default: default)")
198      parser.add_argument("--timeout", type=int, default=120, help="Timeout in seconds for each BatchSandbox allocation (default: 120)")
199      parser.add_argument("--poll-interval", type=float, default=0.00001, help="Poll interval in seconds for checking BatchSandbox status (default: 0.00001)")
200      
201      args = parser.parse_args()
202      
203      # Update global namespace
204      NAMESPACE = args.namespace
205      
206      print(f"๐Ÿ”ง Test Configuration:")
207      print(f"   Pool Name:    {args.pool_name}")
208      print(f"   Pool Size:    {args.pool_size}")
209      print(f"   Replicas:     {args.replicas}")
210      print(f"   BSB Count:    {args.bsb_count}")
211      print(f"   Namespace:    {args.namespace}")
212      print(f"   Timeout:      {args.timeout}s")
213      print(f"   Poll Interval: {args.poll_interval}s")
214      print()
215      
216      tester = PoolPerformanceTester(
217          pool_name=args.pool_name,
218          pool_size=args.pool_size,
219          replicas_per_bsb=args.replicas,
220          total_bsb_count=args.bsb_count,
221          timeout=args.timeout,
222          poll_interval=args.poll_interval
223      )
224      try:
225          asyncio.run(tester.run())
226      except KeyboardInterrupt:
227          print("\nInterrupted by user")
228      finally:
229          tester.cleanup()