prometheus.py
1 # Python Imports 2 import builtins 3 from datetime import datetime 4 from tqdm import tqdm 5 from prometheus_api_client import PrometheusConnect 6 7 # Project Imports 8 from src import analysis_logger 9 from src import plotting_configurations 10 11 12 def connect_to_prometheus(port): 13 url = f"http://host.docker.internal:{port}" 14 try: 15 analysis_logger.G_LOGGER.debug('Connecting to Prometheus server in %s' % url) 16 prometheus = PrometheusConnect(url, disable_ssl=True) 17 except Exception as e: 18 analysis_logger.G_LOGGER.error('%s: %s' % (e.__doc__, e)) 19 return None 20 21 return prometheus 22 23 24 def get_hardware_metrics(metrics, topology, min_tss, max_tss, prom_port): 25 container_ips = [info["kurtosis_ip"] for info in topology["containers"].values()] 26 pbar = tqdm(container_ips) 27 prometheus = connect_to_prometheus(prom_port) 28 29 for container_ip in pbar: 30 pbar.set_description(f'Fetching hardware stats from container {container_ip}') 31 try: 32 fetch_cadvisor_stats_from_prometheus_by_node(metrics, prometheus, container_ip, min_tss, 33 max_tss) 34 except Exception as e: 35 analysis_logger.G_LOGGER.error('%s: %s' % (e.__doc__, e)) 36 continue 37 38 #fetch_cadvisor_stats_from_prometheus_by_simulation(metrics, prometheus, container_ips, min_tss, 39 # max_tss) 40 41 42 def fetch_cadvisor_stats_from_prometheus_by_simulation(metrics, prom, container_ips, start_ts, 43 end_ts): 44 start_timestamp = datetime.utcfromtimestamp(start_ts / 1e9) 45 end_timestamp = datetime.fromtimestamp(end_ts / 1e9) 46 47 for metric in metrics["by_simulation"]: 48 plotting_config = plotting_configurations.plotting_config[metric] 49 plotting_config.setdefault("values", []).append( 50 fetch_accumulated_metric_for_all_nodes(prom, metric, container_ips, 51 start_timestamp, 52 end_timestamp, plotting_config["toMB"])) 53 54 55 def fetch_cadvisor_stats_from_prometheus_by_node(metrics, prom, container_ip, start_ts, end_ts): 56 # Prometheus query example: 57 # container_network_transmit_bytes_total{container_label_com_kurtosistech_private_ip = "212.209.64.2"} 58 start_timestamp = datetime.utcfromtimestamp(start_ts / 1e9) 59 end_timestamp = datetime.fromtimestamp(end_ts / 1e9) 60 61 for metric in metrics["by_node"]: 62 plotting_config = plotting_configurations.plotting_config[metric] 63 stat_function = function_dispatcher[plotting_config["statistic"]] 64 values = fetch_metric(prom, metric, container_ip, start_timestamp, end_timestamp, 65 plotting_config["toMB"]) 66 plotting_config.setdefault("values", []).append(stat_function(values)) 67 68 69 def fetch_accumulated_metric_for_all_nodes(prom, metric, container_ips, start_timestamp, 70 end_timestamp, 71 to_mbytes=False): 72 result = {} 73 for ip in container_ips: 74 values = fetch_metric_with_timestamp(prom, metric, ip, start_timestamp, end_timestamp) 75 for item in values: 76 timestamp, value = item 77 value = int(value) 78 if to_mbytes: 79 value = value / (1024 * 1024) 80 if timestamp in result: 81 result[timestamp] += value 82 else: 83 result[timestamp] = value 84 85 result_list = [value for value in result.values()] 86 87 return result_list 88 89 90 def fetch_metric(prom, metric, ip, start_timestamp, end_timestamp, to_mbytes=False): 91 metric_result = prom.custom_query_range( 92 f"{metric}{{container_label_com_kurtosistech_private_ip = '{ip}'}}", 93 start_time=start_timestamp, end_time=end_timestamp, step="1s") 94 if not metric_result: 95 analysis_logger.G_LOGGER.error(f"{metric} returns no data. Adding zero.") 96 return [0] 97 metric_values = [float(metric_result[0]['values'][i][1]) for i in 98 range(len(metric_result[0]['values']))] 99 if to_mbytes: 100 metric_values = [value / (1024 * 1024) for value in metric_values] 101 102 return metric_values 103 104 105 def fetch_metric_with_timestamp(prom, metric, ip, start_timestamp, end_timestamp): 106 metric_result = prom.custom_query_range( 107 f"{metric}{{container_label_com_kurtosistech_private_ip = '{ip}'}}", 108 start_time=start_timestamp, end_time=end_timestamp, step="1s") 109 110 if not metric_result: 111 analysis_logger.G_LOGGER.error(f"{metric} returns no data. Adding zero.") 112 return [[0, 0]] 113 114 return metric_result[0]['values'] 115 116 117 function_dispatcher = { 118 "max": max, 119 "min": min, 120 "average": lambda x: sum(x) / len(x) 121 }