/ src / evidently / ui / runner / utils.py
utils.py
 1  import subprocess
 2  
 3  
 4  def get_url_to_service_by_port(*, port: int) -> str:
 5      return f"http://127.0.0.1:{port}"
 6  
 7  
 8  def call_service_healt_check(*, port: int):
 9      import requests
10  
11      response = requests.get(f"{get_url_to_service_by_port(port=port)}/api/version")
12      if response.status_code == 200:
13          if response.headers.get("Content-Type") == "application/json":
14              data = response.json()
15              return data
16      return {}
17  
18  
19  def is_service_running(*, port: int):
20      try:
21          data = call_service_healt_check(port=port)
22  
23          version = data.get("version")
24          application = data.get("application")
25  
26          if application == "Evidently UI" and version:
27              return True
28      except Exception:
29          pass
30  
31      return False
32  
33  
34  def terminate_process(process: subprocess.Popen):
35      # Send termination signal
36      process.terminate()
37      print("Sent termination signal to the running process.")
38  
39      # Wait for process to terminate (with timeout)
40      try:
41          print("Waiting for the process to terminate gracefully...")
42          process.wait(timeout=5)
43      except subprocess.TimeoutExpired:
44          # Force kill if it doesn't terminate gracefully
45          print("Process did not terminate gracefully. Sending kill signal...")
46          process.kill()
47          process.wait()
48  
49      # Verify process has terminated
50      if process.poll() is not None:
51          print("Successfully terminated the running process.")
52      else:
53          print("Warning: Process termination status unclear.")