/ src / prometheus.py
prometheus.py
  1  # Python Imports
  2  import builtins
  3  from datetime import datetime
  4  from tqdm import tqdm
  5  from prometheus_api_client import PrometheusConnect
  6  
  7  # Project Imports
  8  from src import analysis_logger
  9  from src import plotting_configurations
 10  
 11  
 12  def connect_to_prometheus(port):
 13      url = f"http://host.docker.internal:{port}"
 14      try:
 15          analysis_logger.G_LOGGER.debug('Connecting to Prometheus server in %s' % url)
 16          prometheus = PrometheusConnect(url, disable_ssl=True)
 17      except Exception as e:
 18          analysis_logger.G_LOGGER.error('%s: %s' % (e.__doc__, e))
 19          return None
 20  
 21      return prometheus
 22  
 23  
 24  def get_hardware_metrics(metrics, topology, min_tss, max_tss, prom_port):
 25      container_ips = [info["kurtosis_ip"] for info in topology["containers"].values()]
 26      pbar = tqdm(container_ips)
 27      prometheus = connect_to_prometheus(prom_port)
 28  
 29      for container_ip in pbar:
 30          pbar.set_description(f'Fetching hardware stats from container {container_ip}')
 31          try:
 32              fetch_cadvisor_stats_from_prometheus_by_node(metrics, prometheus, container_ip, min_tss,
 33                                                           max_tss)
 34          except Exception as e:
 35              analysis_logger.G_LOGGER.error('%s: %s' % (e.__doc__, e))
 36              continue
 37  
 38      #fetch_cadvisor_stats_from_prometheus_by_simulation(metrics, prometheus, container_ips, min_tss,
 39      #                                                   max_tss)
 40  
 41  
 42  def fetch_cadvisor_stats_from_prometheus_by_simulation(metrics, prom, container_ips, start_ts,
 43                                                         end_ts):
 44      start_timestamp = datetime.utcfromtimestamp(start_ts / 1e9)
 45      end_timestamp = datetime.fromtimestamp(end_ts / 1e9)
 46  
 47      for metric in metrics["by_simulation"]:
 48          plotting_config = plotting_configurations.plotting_config[metric]
 49          plotting_config.setdefault("values", []).append(
 50              fetch_accumulated_metric_for_all_nodes(prom, metric, container_ips,
 51                                                     start_timestamp,
 52                                                     end_timestamp, plotting_config["toMB"]))
 53  
 54  
 55  def fetch_cadvisor_stats_from_prometheus_by_node(metrics, prom, container_ip, start_ts, end_ts):
 56      # Prometheus query example:
 57      # container_network_transmit_bytes_total{container_label_com_kurtosistech_private_ip = "212.209.64.2"}
 58      start_timestamp = datetime.utcfromtimestamp(start_ts / 1e9)
 59      end_timestamp = datetime.fromtimestamp(end_ts / 1e9)
 60  
 61      for metric in metrics["by_node"]:
 62          plotting_config = plotting_configurations.plotting_config[metric]
 63          stat_function = function_dispatcher[plotting_config["statistic"]]
 64          values = fetch_metric(prom, metric, container_ip, start_timestamp, end_timestamp,
 65                                plotting_config["toMB"])
 66          plotting_config.setdefault("values", []).append(stat_function(values))
 67  
 68  
 69  def fetch_accumulated_metric_for_all_nodes(prom, metric, container_ips, start_timestamp,
 70                                             end_timestamp,
 71                                             to_mbytes=False):
 72      result = {}
 73      for ip in container_ips:
 74          values = fetch_metric_with_timestamp(prom, metric, ip, start_timestamp, end_timestamp)
 75          for item in values:
 76              timestamp, value = item
 77              value = int(value)
 78              if to_mbytes:
 79                  value = value / (1024 * 1024)
 80              if timestamp in result:
 81                  result[timestamp] += value
 82              else:
 83                  result[timestamp] = value
 84  
 85      result_list = [value for value in result.values()]
 86  
 87      return result_list
 88  
 89  
 90  def fetch_metric(prom, metric, ip, start_timestamp, end_timestamp, to_mbytes=False):
 91      metric_result = prom.custom_query_range(
 92          f"{metric}{{container_label_com_kurtosistech_private_ip = '{ip}'}}",
 93          start_time=start_timestamp, end_time=end_timestamp, step="1s")
 94      if not metric_result:
 95          analysis_logger.G_LOGGER.error(f"{metric} returns no data. Adding zero.")
 96          return [0]
 97      metric_values = [float(metric_result[0]['values'][i][1]) for i in
 98                       range(len(metric_result[0]['values']))]
 99      if to_mbytes:
100          metric_values = [value / (1024 * 1024) for value in metric_values]
101  
102      return metric_values
103  
104  
105  def fetch_metric_with_timestamp(prom, metric, ip, start_timestamp, end_timestamp):
106      metric_result = prom.custom_query_range(
107          f"{metric}{{container_label_com_kurtosistech_private_ip = '{ip}'}}",
108          start_time=start_timestamp, end_time=end_timestamp, step="1s")
109  
110      if not metric_result:
111          analysis_logger.G_LOGGER.error(f"{metric} returns no data. Adding zero.")
112          return [[0, 0]]
113  
114      return metric_result[0]['values']
115  
116  
117  function_dispatcher = {
118      "max": max,
119      "min": min,
120      "average": lambda x: sum(x) / len(x)
121  }