/ src / analysis.py
analysis.py
  1  # Python Imports
  2  import re
  3  import sys
  4  import glob
  5  from tqdm_loggable.auto import tqdm
  6  
  7  # Project Imports
  8  from src import analysis_logger
  9  from src import log_parser
 10  
 11  
 12  def update_min_max_tss(tss, min_tss, max_tss):
 13      if tss < min_tss:
 14          min_tss = tss
 15      elif tss > max_tss:
 16          max_tss = tss
 17  
 18      return min_tss, max_tss
 19  
 20  
 21  def get_relay_line_info(log_line):
 22      msg_topics = re.search(r'topics="([^"]+)"', log_line).group(1)
 23      msg_topic = re.search(r'pubsubTopic=([^ ]+)', log_line).group(1)
 24      msg_hash = re.search(r'hash=([^ ]+)', log_line).group(1)
 25      msg_peer_id = re.search(r'peerId=([^ ]+)', log_line).group(1)
 26  
 27      return msg_topics, msg_topic, msg_hash, msg_peer_id
 28  
 29  
 30  def compute_injection_times(injected_msgs_dict):
 31      return [msg['injection_time'] for msg in injected_msgs_dict.values() if msg['status'] == 200]
 32  
 33  
 34  def analyze_published(log_line, node_logs, msgs_dict, msg_publishTime):
 35      msg_topics, msg_topic, msg_hash, msg_peer_id = get_relay_line_info(log_line)
 36      node_logs[msg_peer_id]['published'].append([msg_publishTime, msg_topics, msg_topic, msg_hash])
 37  
 38      if msg_hash not in msgs_dict:
 39          msgs_dict[msg_hash] = {'published': [{'ts': msg_publishTime, 'peer_id': msg_peer_id}],
 40                                 'received': []}
 41      else:
 42          msgs_dict[msg_hash]['published'].append(
 43              {'ts': msg_publishTime, 'peer_id': msg_peer_id})
 44  
 45  
 46  def analyze_received(log_line, node_logs, msgs_dict, msg_receivedTime):
 47      msg_topics, msg_topic, msg_hash, msg_peer_id = get_relay_line_info(log_line)
 48      node_logs[msg_peer_id]['received'].append([msg_receivedTime, msg_topics, msg_topic, msg_hash])
 49  
 50      if msg_hash not in msgs_dict:
 51          msgs_dict[msg_hash] = {'published': [], 'received': [
 52              {'ts': msg_receivedTime, 'peer_id': msg_peer_id}]}
 53      else:
 54          msgs_dict[msg_hash]['received'].append(
 55              {'ts': msg_receivedTime, 'peer_id': msg_peer_id})
 56  
 57  
 58  def parse_lines_in_file(file, node_logs, msgs_dict, min_tss, max_tss):
 59      for log_line in file:
 60          if 'waku.relay' in log_line:
 61              if 'published' in log_line:
 62                  msg_publishTime = int(re.search(r'publishTime=([\d]+)', log_line).group(1))
 63  
 64                  analyze_published(log_line, node_logs, msgs_dict, msg_publishTime)
 65  
 66                  min_tss, max_tss = update_min_max_tss(msg_publishTime, min_tss, max_tss)
 67  
 68              elif 'received' in log_line:
 69                  msg_receivedTime = int(re.search(r'receivedTime=([\d]+)', log_line).group(1))
 70  
 71                  analyze_received(log_line, node_logs, msgs_dict, msg_receivedTime)
 72  
 73                  min_tss, max_tss = update_min_max_tss(msg_receivedTime, min_tss, max_tss)
 74  
 75      return min_tss, max_tss
 76  
 77  
 78  def compute_message_latencies(msgs_dict):
 79      # Compute message latencies and propagation times throughout the network
 80      pbar = tqdm(msgs_dict.items())
 81      for msg_hash, msg_data in pbar:
 82          # NOTE: Careful here as I am assuming that every message is published once ...
 83          if len(msg_data['published']) > 1:
 84              analysis_logger.G_LOGGER.warning('Several publishers of message %s')
 85  
 86          published_ts = int(msg_data['published'][0]['ts'])
 87          peer_id = msg_data['published'][0]['peer_id']
 88  
 89          pbar.set_description('Computing latencies of message %s' % msg_hash)
 90  
 91          # Compute latencies
 92          latencies = []
 93          for received_data in msg_data['received']:
 94              # Skip self
 95              if received_data['peer_id'] == peer_id:
 96                  analysis_logger.G_LOGGER.warning('Message %s received by the same node that published it' % msg_hash)
 97                  continue
 98              # NOTE: We are getting some negative latencies meaning that the message appears to be received before it was sent ...
 99              # I assume this must be because those are the nodes that got the message injected in the first place
100              #  TLDR: Should be safe to ignore all the negative latencies
101              latency = int(received_data['ts']) - published_ts
102              peer_id = msg_data['published'][0]['peer_id']
103              latencies.append(latency)
104  
105          msgs_dict[msg_hash]['latencies'] = latencies
106  
107  
108  def compute_propagation_times(msgs_dict):
109      msg_propagation_times = []
110      pbar = tqdm(msgs_dict.items())
111  
112      for msg_hash, msg_data in pbar:
113          pbar.set_description('Computing propagation time of message %s' % msg_hash)
114          # todo check Why do we round here
115          # msg_propagation_times.append(round(max(msg_data['latencies']) / 1000000))
116          msg_propagation_times.append(max(msg_data['latencies']) / 1000000)
117  
118      return msg_propagation_times
119  
120  
121  def compute_message_delivery(msgs_dict, injected_msgs_dict):
122      # Compute message delivery
123      total_messages = len(injected_msgs_dict)
124      delivered_messages = len(msgs_dict)
125      lost_messages = total_messages - delivered_messages
126      delivery_rate = delivered_messages * 100 / total_messages
127  
128      analysis_logger.G_LOGGER.info(f'{delivered_messages} of {total_messages} messages delivered. '
129                                    f'Lost: {lost_messages}. Delivery rate {delivery_rate}')
130  
131      return delivery_rate
132  
133  
134  def analyze_containers(topology, simulation_path):
135      node_logs = {}
136      msgs_dict = {}
137      max_tss = -sys.maxsize - 1
138      min_tss = sys.maxsize
139  
140      for container_name, container_info in topology["containers"].items():
141          node_pbar = tqdm(container_info["nodes"])
142  
143          node_pbar.set_description(f"Parsing log of container {container_name}")
144  
145          log_parser.prepare_node_in_logs(node_pbar, topology, node_logs, container_name)
146  
147          folder = glob.glob(f'{simulation_path}/{container_name}--*')
148          if len(folder) > 1:
149              raise RuntimeError(f"Error: Multiple containers with same name: {folder}")
150  
151          file = log_parser.open_file(folder)
152          min_tss, max_tss = parse_lines_in_file(file, node_logs, msgs_dict, min_tss, max_tss)
153          file.close()
154  
155      return node_logs, msgs_dict, min_tss, max_tss
156  
157  
158  def inject_metric_in_dict(metrics, key_name, title, y_label, metric_name, values):
159      metrics[key_name] = {}
160      metrics[key_name]["title"] = title
161      metrics[key_name]["y_label"] = y_label
162      metrics[key_name]["metric_name"] = metric_name
163      metrics[key_name]["values"] = values