/ .ci / rest_api_helper.py
rest_api_helper.py
  1  #!/usr/bin/env python3
  2  
  3  import asyncio
  4  import aiohttp
  5  import random
  6  import time
  7  import sys
  8  
  9  from sys import argv
 10  
 11  BLOCK_HEIGHT_URL = "http://localhost:3030/v2/testnet/block/height/latest"
 12  GET_BLOCK_BASE_URL = "http://localhost:3030/v2/testnet/block"
 13  MIN_BLOCK = 1
 14  MAX_BLOCK = 250
 15  NUM_WORKERS = 8
 16  
 17  # Statistics
 18  stats = {
 19      'successful_requests': 0,
 20      'failed_requests': 0,
 21  }
 22  
 23  
 24  def write_results(mode, total_wait, endpoint):
 25      num_ops = stats['successful_requests']
 26      throughput = num_ops / total_wait
 27  
 28      print(f'🎉 REST benchmark "{mode}" done! It took {total_wait} seconds'
 29            f' for {num_ops} ops. Throughput was {throughput} ops/s.')
 30  
 31      with open('info.txt', 'r') as f:
 32          snapshot_info = f.read().replace('\n', '')
 33  
 34      with open("results.json", "a") as f:
 35          f.write(f'{{ "name": "rest-{mode}", "unit": "ops/s", '
 36                  f'"value": {throughput}, "extra": "num_ops={num_ops}, '
 37                  f'total_wait={total_wait}, endpoint={endpoint}, '
 38                  f'{snapshot_info}" }},\n')
 39  
 40  
 41  async def make_request(session, worker_id, mode):
 42      """Make a single async request to the block endpoint"""
 43  
 44      if mode == "get-block":
 45          # Checks that any block can be retrieved in a reasonable time.
 46          block_id = random.randint(MIN_BLOCK, MAX_BLOCK)
 47          url = f"{GET_BLOCK_BASE_URL}/{block_id}"
 48      elif mode == "get-latest-block":
 49          # Tests that the most recent block(s) are cached and can be retrieved even quicker.
 50          url = f"{GET_BLOCK_BASE_URL}/{MAX_BLOCK}"
 51      elif mode == "block-height":
 52          # Fetches the current block height as a basline for the REST API speed.
 53          url = BLOCK_HEIGHT_URL
 54      else:
 55          raise RuntimeError(f'Unknown REST mode "{mode}"')
 56  
 57      try:
 58          async with session.get(url, timeout=aiohttp.ClientTimeout(total=100)) as response:
 59              content = await response.read()
 60  
 61              if response.status == 200:
 62                  stats['successful_requests'] += 1
 63                  return True
 64              else:
 65                  print(f"Request failed: {content}")
 66                  stats['failed_requests'] += 1
 67                  return False
 68  
 69      except asyncio.TimeoutError:
 70          print("ERROR: Request timed out!")
 71          stats['failed_requests'] += 1
 72          return False
 73  
 74      except Exception as err:
 75          print(f"ERROR: Got exception: {err}")
 76          stats['failed_requests'] += 1
 77          return False
 78  
 79  
 80  async def worker(session, worker_id, mode, reqs_per_worker):
 81      """Worker coroutine that makes multiple requests"""
 82      print(f"Worker {worker_id} starting...")
 83      worker_successful = 0
 84      worker_failed = 0
 85  
 86      for i in range(reqs_per_worker):
 87          success = await make_request(session, worker_id, mode)
 88  
 89          if success:
 90              worker_successful += 1
 91              if (i+1) % 10 == 0:  # Log every 10th request
 92                  print(f'Worker {worker_id}: Finished {i+1} of '
 93                        f'{reqs_per_worker} requests')
 94          else:
 95              worker_failed += 1
 96              break
 97  
 98      return worker_successful, worker_failed
 99  
100  
101  async def main(mode, num_workers, reqs_per_worker):
102      """Main async function to coordinate the workers"""
103  
104      if mode in ["get-block", "get-latest-block"]:
105          base_url = GET_BLOCK_BASE_URL
106      elif mode == "block-height":
107          base_url = BLOCK_HEIGHT_URL
108      else:
109          raise RuntimeError(f'Unknown REST mode "{mode}"')
110  
111      print(f'Starting {num_workers} async workers for "{mode}", '
112            f' each making {reqs_per_worker} requests...')
113      print(f"Target endpoint: {base_url}")
114      print("")
115  
116      start_time = time.time()
117  
118      # Create HTTP session with connection pooling
119      connector = aiohttp.TCPConnector(
120          limit=100,  # Total connection pool size
121          limit_per_host=50,  # Max connections per host
122          keepalive_timeout=30,
123          enable_cleanup_closed=True
124      )
125  
126      async with aiohttp.ClientSession(connector=connector) as session:
127          # Create and run all workers concurrently
128          tasks = [worker(session, i+1, mode, reqs_per_worker) for i in range(num_workers)]
129          results = await asyncio.gather(*tasks, return_exceptions=True)
130  
131          # Process results
132          for i, result in enumerate(results):
133              if isinstance(result, Exception):
134                  print(f"Worker {i+1} failed with exception: {result}")
135              else:
136                  worker_successful, worker_failed = result
137  
138      end_time = time.time()
139      duration = end_time - start_time
140  
141      if stats['failed_requests'] > 0:
142          print(f'REST benchmark "{mode}" failed')
143          sys.exit(1)
144      else:
145          write_results(mode, duration, base_url)
146          sys.exit(0)
147  
148  if __name__ == "__main__":
149      # what type of benchmark to run?
150      mode = argv[1]
151      # how many client tasks?
152      num_workers = int(argv[2])
153      # how many requests per client task?
154      reqs_per_worker = int(argv[3])
155  
156      # Run the async main function
157      asyncio.run(main(mode, num_workers, reqs_per_worker))