rest_api_helper.py
1 #!/usr/bin/env python3 2 3 import asyncio 4 import aiohttp 5 import random 6 import time 7 import sys 8 9 from sys import argv 10 11 BLOCK_HEIGHT_URL = "http://localhost:3030/v2/testnet/block/height/latest" 12 GET_BLOCK_BASE_URL = "http://localhost:3030/v2/testnet/block" 13 MIN_BLOCK = 1 14 MAX_BLOCK = 250 15 NUM_WORKERS = 8 16 17 # Statistics 18 stats = { 19 'successful_requests': 0, 20 'failed_requests': 0, 21 } 22 23 24 def write_results(mode, total_wait, endpoint): 25 num_ops = stats['successful_requests'] 26 throughput = num_ops / total_wait 27 28 print(f'🎉 REST benchmark "{mode}" done! It took {total_wait} seconds' 29 f' for {num_ops} ops. Throughput was {throughput} ops/s.') 30 31 with open('info.txt', 'r') as f: 32 snapshot_info = f.read().replace('\n', '') 33 34 with open("results.json", "a") as f: 35 f.write(f'{{ "name": "rest-{mode}", "unit": "ops/s", ' 36 f'"value": {throughput}, "extra": "num_ops={num_ops}, ' 37 f'total_wait={total_wait}, endpoint={endpoint}, ' 38 f'{snapshot_info}" }},\n') 39 40 41 async def make_request(session, worker_id, mode): 42 """Make a single async request to the block endpoint""" 43 44 if mode == "get-block": 45 # Checks that any block can be retrieved in a reasonable time. 46 block_id = random.randint(MIN_BLOCK, MAX_BLOCK) 47 url = f"{GET_BLOCK_BASE_URL}/{block_id}" 48 elif mode == "get-latest-block": 49 # Tests that the most recent block(s) are cached and can be retrieved even quicker. 50 url = f"{GET_BLOCK_BASE_URL}/{MAX_BLOCK}" 51 elif mode == "block-height": 52 # Fetches the current block height as a basline for the REST API speed. 53 url = BLOCK_HEIGHT_URL 54 else: 55 raise RuntimeError(f'Unknown REST mode "{mode}"') 56 57 try: 58 async with session.get(url, timeout=aiohttp.ClientTimeout(total=100)) as response: 59 content = await response.read() 60 61 if response.status == 200: 62 stats['successful_requests'] += 1 63 return True 64 else: 65 print(f"Request failed: {content}") 66 stats['failed_requests'] += 1 67 return False 68 69 except asyncio.TimeoutError: 70 print("ERROR: Request timed out!") 71 stats['failed_requests'] += 1 72 return False 73 74 except Exception as err: 75 print(f"ERROR: Got exception: {err}") 76 stats['failed_requests'] += 1 77 return False 78 79 80 async def worker(session, worker_id, mode, reqs_per_worker): 81 """Worker coroutine that makes multiple requests""" 82 print(f"Worker {worker_id} starting...") 83 worker_successful = 0 84 worker_failed = 0 85 86 for i in range(reqs_per_worker): 87 success = await make_request(session, worker_id, mode) 88 89 if success: 90 worker_successful += 1 91 if (i+1) % 10 == 0: # Log every 10th request 92 print(f'Worker {worker_id}: Finished {i+1} of ' 93 f'{reqs_per_worker} requests') 94 else: 95 worker_failed += 1 96 break 97 98 return worker_successful, worker_failed 99 100 101 async def main(mode, num_workers, reqs_per_worker): 102 """Main async function to coordinate the workers""" 103 104 if mode in ["get-block", "get-latest-block"]: 105 base_url = GET_BLOCK_BASE_URL 106 elif mode == "block-height": 107 base_url = BLOCK_HEIGHT_URL 108 else: 109 raise RuntimeError(f'Unknown REST mode "{mode}"') 110 111 print(f'Starting {num_workers} async workers for "{mode}", ' 112 f' each making {reqs_per_worker} requests...') 113 print(f"Target endpoint: {base_url}") 114 print("") 115 116 start_time = time.time() 117 118 # Create HTTP session with connection pooling 119 connector = aiohttp.TCPConnector( 120 limit=100, # Total connection pool size 121 limit_per_host=50, # Max connections per host 122 keepalive_timeout=30, 123 enable_cleanup_closed=True 124 ) 125 126 async with aiohttp.ClientSession(connector=connector) as session: 127 # Create and run all workers concurrently 128 tasks = [worker(session, i+1, mode, reqs_per_worker) for i in range(num_workers)] 129 results = await asyncio.gather(*tasks, return_exceptions=True) 130 131 # Process results 132 for i, result in enumerate(results): 133 if isinstance(result, Exception): 134 print(f"Worker {i+1} failed with exception: {result}") 135 else: 136 worker_successful, worker_failed = result 137 138 end_time = time.time() 139 duration = end_time - start_time 140 141 if stats['failed_requests'] > 0: 142 print(f'REST benchmark "{mode}" failed') 143 sys.exit(1) 144 else: 145 write_results(mode, duration, base_url) 146 sys.exit(0) 147 148 if __name__ == "__main__": 149 # what type of benchmark to run? 150 mode = argv[1] 151 # how many client tasks? 152 num_workers = int(argv[2]) 153 # how many requests per client task? 154 reqs_per_worker = int(argv[3]) 155 156 # Run the async main function 157 asyncio.run(main(mode, num_workers, reqs_per_worker))