utils.py
1 import subprocess 2 3 4 def get_url_to_service_by_port(*, port: int) -> str: 5 return f"http://127.0.0.1:{port}" 6 7 8 def call_service_healt_check(*, port: int): 9 import requests 10 11 response = requests.get(f"{get_url_to_service_by_port(port=port)}/api/version") 12 if response.status_code == 200: 13 if response.headers.get("Content-Type") == "application/json": 14 data = response.json() 15 return data 16 return {} 17 18 19 def is_service_running(*, port: int): 20 try: 21 data = call_service_healt_check(port=port) 22 23 version = data.get("version") 24 application = data.get("application") 25 26 if application == "Evidently UI" and version: 27 return True 28 except Exception: 29 pass 30 31 return False 32 33 34 def terminate_process(process: subprocess.Popen): 35 # Send termination signal 36 process.terminate() 37 print("Sent termination signal to the running process.") 38 39 # Wait for process to terminate (with timeout) 40 try: 41 print("Waiting for the process to terminate gracefully...") 42 process.wait(timeout=5) 43 except subprocess.TimeoutExpired: 44 # Force kill if it doesn't terminate gracefully 45 print("Process did not terminate gracefully. Sending kill signal...") 46 process.kill() 47 process.wait() 48 49 # Verify process has terminated 50 if process.poll() is not None: 51 print("Successfully terminated the running process.") 52 else: 53 print("Warning: Process termination status unclear.")