pool-perf.py
1 # Copyright 2025 Alibaba Group Holding Ltd. 2 # 3 # Licensed under the Apache License, Version 2.0 (the "License"); 4 # you may not use this file except in compliance with the License. 5 # You may obtain a copy of the License at 6 # 7 # http:#www.apache.org/licenses/LICENSE-2.0 8 # 9 # Unless required by applicable law or agreed to in writing, software 10 # distributed under the License is distributed on an "AS IS" BASIS, 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 # See the License for the specific language governing permissions and 13 # limitations under the License. 14 15 import asyncio 16 import time 17 import uuid 18 import sys 19 import argparse 20 from kubernetes import client, config 21 from kubernetes.client.rest import ApiException 22 23 # CRD configurations 24 GROUP = "sandbox.opensandbox.io" 25 VERSION = "v1alpha1" 26 POOL_PLURAL = "pools" 27 BSB_PLURAL = "batchsandboxes" 28 NAMESPACE = "default" 29 30 class PoolPerformanceTester: 31 def __init__(self, pool_name, pool_size, replicas_per_bsb, total_bsb_count, timeout, poll_interval=0.00001): 32 try: 33 config.load_kube_config() 34 except Exception: 35 # Fall back to in-cluster config if kube config is not available 36 config.load_incluster_config() 37 self.custom_api = client.CustomObjectsApi() 38 self.pool_name = pool_name 39 self.pool_size = pool_size 40 self.replicas_per_bsb = replicas_per_bsb 41 self.total_bsb_count = total_bsb_count 42 self.timeout = timeout 43 self.poll_interval = poll_interval 44 self.bsb_names = [] 45 self.results = {} 46 47 def create_pool_manifest(self, size): 48 return { 49 "apiVersion": f"{GROUP}/{VERSION}", 50 "kind": "Pool", 51 "metadata": {"name": self.pool_name}, 52 "spec": { 53 "template": { 54 "spec": { 55 "containers": [{"name": "nginx", "image": "nginx:alpine"}] 56 } 57 }, 58 "capacitySpec": { 59 "bufferMin": 5, 60 "bufferMax": 10, 61 "poolMin": size, 62 "poolMax": size + 20 63 } 64 } 65 } 66 67 def create_bsb_manifest(self, name): 68 return { 69 "apiVersion": f"{GROUP}/{VERSION}", 70 "kind": "BatchSandbox", 71 "metadata": {"name": name}, 72 "spec": { 73 "replicas": self.replicas_per_bsb, 74 "poolRef": self.pool_name 75 } 76 } 77 78 async def setup_pool(self): 79 """Create and wait for the resource pool to be ready""" 80 print(f"๐ Setting up Pool: {self.pool_name} with size {self.pool_size}...") 81 try: 82 self.custom_api.delete_namespaced_custom_object(GROUP, VERSION, NAMESPACE, POOL_PLURAL, self.pool_name) 83 await asyncio.sleep(5) 84 except ApiException as e: 85 if e.status != 404: 86 print(f"โ ๏ธ Failed to delete existing Pool: {e}") 87 except Exception as e: 88 print(f"โ ๏ธ Error during Pool deletion: {e}") 89 90 body = self.create_pool_manifest(self.pool_size) 91 self.custom_api.create_namespaced_custom_object(GROUP, VERSION, NAMESPACE, POOL_PLURAL, body) 92 93 # Wait for Available count to reach target 94 while True: 95 try: 96 pool = self.custom_api.get_namespaced_custom_object(GROUP, VERSION, NAMESPACE, POOL_PLURAL, self.pool_name) 97 available = pool.get("status", {}).get("available", 0) 98 if available >= self.pool_size: 99 print(f"โ Pool is Ready. Available: {available}") 100 break 101 print(f"Waiting for Pool Ready... Available: {available}") 102 except Exception as e: 103 print(f"Waiting for Pool to be created... {e}") 104 await asyncio.sleep(2) 105 106 async def create_bsb(self, index): 107 """Create BatchSandboxes concurrently""" 108 name = f"perf-test-{uuid.uuid4().hex[:8]}" 109 self.bsb_names.append(name) 110 body = self.create_bsb_manifest(name) 111 112 start_time = time.time() 113 try: 114 self.custom_api.create_namespaced_custom_object(GROUP, VERSION, NAMESPACE, BSB_PLURAL, body) 115 self.results[name] = {"create_time": time.time() - start_time, "allocated_time": None} 116 except ApiException as e: 117 print(f"โ Failed to create {name}: {e}") 118 119 async def wait_for_allocation(self, name): 120 """Poll for allocation completion""" 121 start_polling = time.time() 122 while True: 123 try: 124 bsb = self.custom_api.get_namespaced_custom_object(GROUP, VERSION, NAMESPACE, BSB_PLURAL, name) 125 status = bsb.get("status", {}) 126 allocated = status.get("allocated", 0) 127 128 if allocated >= self.replicas_per_bsb: 129 print("{0}, endpoint {1}".format(name, bsb.get("metadata", {}).get("annotations", {}).get("sandbox.opensandbox.io/endpoints", ""))) 130 self.results[name]["allocated_time"] = time.time() - start_polling 131 break 132 except Exception as e: 133 pass 134 135 await asyncio.sleep(self.poll_interval) 136 if time.time() - start_polling > self.timeout: 137 print(f"โฐ Timeout waiting for {name}") 138 break 139 140 async def run(self): 141 await self.setup_pool() 142 143 print(f"๐ฅ Starting concurrent allocation test: {self.total_bsb_count} BatchSandboxes...") 144 start_all = time.time() 145 146 # Concurrent creation 147 await asyncio.gather(*(self.create_bsb(i) for i in range(self.total_bsb_count))) 148 149 # Concurrent wait for allocation 150 await asyncio.gather(*(self.wait_for_allocation(name) for name in self.bsb_names)) 151 152 total_duration = time.time() - start_all 153 self.print_report(total_duration) 154 155 def print_report(self, total_duration): 156 print("\n" + "="*40) 157 print("๐ PERFORMANCE REPORT") 158 print("="*40) 159 durations = [r["allocated_time"] for r in self.results.values() if r.get("allocated_time") is not None] 160 161 if durations: 162 avg_lat = sum(durations) / len(durations) 163 max_lat = max(durations) 164 p95 = sorted(durations)[int(len(durations) * 0.95)] 165 166 print(f"Total BSB: {self.total_bsb_count}") 167 print(f"Total Duration: {total_duration:.2f}s") 168 print(f"Throughput: {len(durations)/total_duration:.2f} sandbox/s") 169 print(f"Avg Latency: {avg_lat:.2f}s") 170 print(f"Max Latency: {max_lat:.2f}s") 171 print(f"P95 Latency: {p95:.2f}s") 172 print(f"Success Rate: {len(durations)/self.total_bsb_count*100:.1f}%") 173 else: 174 print("No successful allocations recorded.") 175 print("="*40) 176 177 def cleanup(self): 178 print("๐งน Cleaning up...") 179 for name in self.bsb_names: 180 try: 181 self.custom_api.delete_namespaced_custom_object(GROUP, VERSION, NAMESPACE, BSB_PLURAL, name) 182 except Exception as e: 183 # Silently ignore deletion errors during cleanup 184 pass 185 try: 186 self.custom_api.delete_namespaced_custom_object(GROUP, VERSION, NAMESPACE, POOL_PLURAL, self.pool_name) 187 except Exception as e: 188 # Silently ignore deletion errors during cleanup 189 pass 190 191 if __name__ == "__main__": 192 parser = argparse.ArgumentParser(description="Pool Performance Tester") 193 parser.add_argument("--pool-name", type=str, default="perf-pool", help="Pool name (default: perf-pool)") 194 parser.add_argument("--pool-size", type=int, default=50, help="Pool size (default: 50)") 195 parser.add_argument("--replicas", type=int, default=1, help="Replicas per BatchSandbox (default: 1)") 196 parser.add_argument("--bsb-count", type=int, default=50, help="Number of BatchSandboxes to create concurrently (default: 50)") 197 parser.add_argument("--namespace", type=str, default="default", help="Kubernetes namespace (default: default)") 198 parser.add_argument("--timeout", type=int, default=120, help="Timeout in seconds for each BatchSandbox allocation (default: 120)") 199 parser.add_argument("--poll-interval", type=float, default=0.00001, help="Poll interval in seconds for checking BatchSandbox status (default: 0.00001)") 200 201 args = parser.parse_args() 202 203 # Update global namespace 204 NAMESPACE = args.namespace 205 206 print(f"๐ง Test Configuration:") 207 print(f" Pool Name: {args.pool_name}") 208 print(f" Pool Size: {args.pool_size}") 209 print(f" Replicas: {args.replicas}") 210 print(f" BSB Count: {args.bsb_count}") 211 print(f" Namespace: {args.namespace}") 212 print(f" Timeout: {args.timeout}s") 213 print(f" Poll Interval: {args.poll_interval}s") 214 print() 215 216 tester = PoolPerformanceTester( 217 pool_name=args.pool_name, 218 pool_size=args.pool_size, 219 replicas_per_bsb=args.replicas, 220 total_bsb_count=args.bsb_count, 221 timeout=args.timeout, 222 poll_interval=args.poll_interval 223 ) 224 try: 225 asyncio.run(tester.run()) 226 except KeyboardInterrupt: 227 print("\nInterrupted by user") 228 finally: 229 tester.cleanup()