context.py
  1  """
  2  Context shared across arti_rpc tests.
  3  """
  4  
  5  from __future__ import annotations
  6  
  7  import arti_rpc
  8  import math
  9  import signal
 10  import subprocess
 11  import sys
 12  import time
 13  import tomli_w
 14  
 15  from arti_rpc_tests import FatalException
 16  
 17  from pathlib import Path
 18  from typing import Optional
 19  
 20  
 21  class TestContext:
 22      """
 23      Context shared by a number of tests clients.
 24  
 25      Includes a running arti instance, and the ability to get an RPC connection.
 26      """
 27  
 28      arti_binary: Path
 29      conf_file: Path
 30      socket_path: Path
 31      arti_process: Optional[ArtiProcess]
 32  
 33      def __init__(self, arti_binary: Path, path: Path):
 34          """
 35          Create a new TestContext using the arti binary at `arti_binary`,
 36          storing all of its files at `path`.
 37  
 38          Does not launch arti.
 39          """
 40          path = path.absolute()
 41          conf_file = path.joinpath("arti.toml")
 42          cache_dir = path.joinpath("cache")
 43          state_dir = path.joinpath("state")
 44          socket_path = path.joinpath("arti_rpc.socket")
 45          connpt_path = path.joinpath("arti_connpt.toml")
 46          socks_port = 15986  # "chosen by fair dice roll. guaranteed to be random."
 47  
 48          with open(connpt_path, "w") as f:
 49              f.write(
 50                  f"""
 51  [connect]
 52  socket = "unix:{socket_path}"
 53  auth = "none"
 54  """
 55              )
 56  
 57          configuration = {
 58              "rpc": {
 59                  "enable": True,
 60                  "listen": {
 61                      "user-default": {
 62                          "enable": False,
 63                      },
 64                      "system-default": {
 65                          "enable": False,
 66                      },
 67                      "test-point": {
 68                          "file": str(connpt_path),
 69                      },
 70                  },
 71              },
 72              "storage": {
 73                  "cache_dir": str(cache_dir),
 74                  "state_dir": str(state_dir),
 75              },
 76              "proxy": {
 77                  "socks_listen": socks_port,
 78              },
 79          }
 80          with open(conf_file, "wb") as f:
 81              tomli_w.dump(configuration, f)
 82  
 83          self.arti_binary = arti_binary
 84          self.conf_file = conf_file
 85          self.socket_path = socket_path
 86          self.connpt_path = connpt_path
 87          self.arti_process = None
 88  
 89      def launch_arti(self):
 90          """
 91          Start a new Arti process, and store it in self.arti_process.
 92          """
 93          args = [self.arti_binary, "proxy", "-c", self.conf_file]
 94  
 95          # TODO: Capture the logs from arti somehow.  (As it stands,
 96          # they just go to stdout, which is iffy.
 97          self.arti_process = ArtiProcess(subprocess.Popen(args))
 98          self._wait_for_rpc()
 99  
100      def open_rpc_connection(self) -> arti_rpc.ArtiRpcConn:
101          """
102          Open an RPC connection to Arti.
103          """
104          bld = arti_rpc.ArtiRpcConnBuilder()
105          bld.prepend_literal_path(str(self.connpt_path))
106          return bld.connect()
107  
108      def arti_process_is_running(self) -> bool:
109          """
110          Return true if we have launched an arti process,
111          and it is still running.
112          """
113          if self.arti_process is None:
114              return False
115          return self.arti_process.is_running()
116  
117      def _wait_for_rpc(self, timeout: float = 3.0) -> None:
118          """
119          Wait up to `timeout` seconds until an Arti RPC connection succeeds.
120  
121          Raise an exception if it fails.
122          """
123          interval = 0.1
124          waited = 0.0
125          for _ in range(math.ceil(timeout / interval)):
126              try:
127                  _ = self.open_rpc_connection()
128                  print(f"Waited {waited} seconds for Arti RPC to be reachable.")
129                  return
130              except arti_rpc.ArtiRpcError:
131                  time.sleep(interval)
132                  waited += interval
133  
134          raise FatalException("Arti not reachable after {timeout} seconds.")
135  
136  
137  class ArtiProcess:
138      """
139      Wrapper for an Arti process.
140  
141      Shuts down the process when dropped.
142      """
143  
144      process: Optional[subprocess.Popen]
145  
146      def __init__(self, process: subprocess.Popen):
147          """Wrap a subprocess.Popen as an ArtiProcess."""
148          self.process = process
149  
150      def is_running(self) -> bool:
151          """
152          Return true if the process is running.
153          """
154          return self.process is not None and self.process.poll() is None
155  
156      def close(self, gently: bool) -> None:
157          """Shut down this process.
158  
159          If `gently` is true, start with a SIGINT, and wait a while
160          seconds for the process to exit.
161  
162          if `gently` is false, or SIGINT fails, terminate the process.
163          """
164          if self.process is not None:
165              if gently and sys.platform != "win32":
166                  self.process.send_signal(signal.SIGINT)
167                  try:
168                      self.process.wait(10)
169                  except subprocess.TimeoutExpired:
170                      print("Process ignored SIGINT. Terminating")
171                      self.process.terminate()
172              else:
173                  self.process.terminate()
174  
175              self.process.wait(10)
176              self.process = None
177  
178      def __del__(self):
179          self.close(False)