context.py
1 """ 2 Context shared across arti_rpc tests. 3 """ 4 5 from __future__ import annotations 6 7 import arti_rpc 8 import math 9 import signal 10 import subprocess 11 import sys 12 import time 13 import tomli_w 14 15 from arti_rpc_tests import FatalException 16 17 from pathlib import Path 18 from typing import Optional 19 20 21 class TestContext: 22 """ 23 Context shared by a number of tests clients. 24 25 Includes a running arti instance, and the ability to get an RPC connection. 26 """ 27 28 arti_binary: Path 29 conf_file: Path 30 socket_path: Path 31 arti_process: Optional[ArtiProcess] 32 33 def __init__(self, arti_binary: Path, path: Path): 34 """ 35 Create a new TestContext using the arti binary at `arti_binary`, 36 storing all of its files at `path`. 37 38 Does not launch arti. 39 """ 40 path = path.absolute() 41 conf_file = path.joinpath("arti.toml") 42 cache_dir = path.joinpath("cache") 43 state_dir = path.joinpath("state") 44 socket_path = path.joinpath("arti_rpc.socket") 45 connpt_path = path.joinpath("arti_connpt.toml") 46 socks_port = 15986 # "chosen by fair dice roll. guaranteed to be random." 47 48 with open(connpt_path, "w") as f: 49 f.write( 50 f""" 51 [connect] 52 socket = "unix:{socket_path}" 53 auth = "none" 54 """ 55 ) 56 57 configuration = { 58 "rpc": { 59 "enable": True, 60 "listen": { 61 "user-default": { 62 "enable": False, 63 }, 64 "system-default": { 65 "enable": False, 66 }, 67 "test-point": { 68 "file": str(connpt_path), 69 }, 70 }, 71 }, 72 "storage": { 73 "cache_dir": str(cache_dir), 74 "state_dir": str(state_dir), 75 }, 76 "proxy": { 77 "socks_listen": socks_port, 78 }, 79 } 80 with open(conf_file, "wb") as f: 81 tomli_w.dump(configuration, f) 82 83 self.arti_binary = arti_binary 84 self.conf_file = conf_file 85 self.socket_path = socket_path 86 self.connpt_path = connpt_path 87 self.arti_process = None 88 89 def launch_arti(self): 90 """ 91 Start a new Arti process, and store it in self.arti_process. 92 """ 93 args = [self.arti_binary, "proxy", "-c", self.conf_file] 94 95 # TODO: Capture the logs from arti somehow. (As it stands, 96 # they just go to stdout, which is iffy. 97 self.arti_process = ArtiProcess(subprocess.Popen(args)) 98 self._wait_for_rpc() 99 100 def open_rpc_connection(self) -> arti_rpc.ArtiRpcConn: 101 """ 102 Open an RPC connection to Arti. 103 """ 104 bld = arti_rpc.ArtiRpcConnBuilder() 105 bld.prepend_literal_path(str(self.connpt_path)) 106 return bld.connect() 107 108 def arti_process_is_running(self) -> bool: 109 """ 110 Return true if we have launched an arti process, 111 and it is still running. 112 """ 113 if self.arti_process is None: 114 return False 115 return self.arti_process.is_running() 116 117 def _wait_for_rpc(self, timeout: float = 3.0) -> None: 118 """ 119 Wait up to `timeout` seconds until an Arti RPC connection succeeds. 120 121 Raise an exception if it fails. 122 """ 123 interval = 0.1 124 waited = 0.0 125 for _ in range(math.ceil(timeout / interval)): 126 try: 127 _ = self.open_rpc_connection() 128 print(f"Waited {waited} seconds for Arti RPC to be reachable.") 129 return 130 except arti_rpc.ArtiRpcError: 131 time.sleep(interval) 132 waited += interval 133 134 raise FatalException("Arti not reachable after {timeout} seconds.") 135 136 137 class ArtiProcess: 138 """ 139 Wrapper for an Arti process. 140 141 Shuts down the process when dropped. 142 """ 143 144 process: Optional[subprocess.Popen] 145 146 def __init__(self, process: subprocess.Popen): 147 """Wrap a subprocess.Popen as an ArtiProcess.""" 148 self.process = process 149 150 def is_running(self) -> bool: 151 """ 152 Return true if the process is running. 153 """ 154 return self.process is not None and self.process.poll() is None 155 156 def close(self, gently: bool) -> None: 157 """Shut down this process. 158 159 If `gently` is true, start with a SIGINT, and wait a while 160 seconds for the process to exit. 161 162 if `gently` is false, or SIGINT fails, terminate the process. 163 """ 164 if self.process is not None: 165 if gently and sys.platform != "win32": 166 self.process.send_signal(signal.SIGINT) 167 try: 168 self.process.wait(10) 169 except subprocess.TimeoutExpired: 170 print("Process ignored SIGINT. Terminating") 171 self.process.terminate() 172 else: 173 self.process.terminate() 174 175 self.process.wait(10) 176 self.process = None 177 178 def __del__(self): 179 self.close(False)