analysis.py
1 # Python Imports 2 import re 3 import sys 4 import glob 5 from tqdm_loggable.auto import tqdm 6 7 # Project Imports 8 from src import analysis_logger 9 from src import log_parser 10 11 12 def update_min_max_tss(tss, min_tss, max_tss): 13 if tss < min_tss: 14 min_tss = tss 15 elif tss > max_tss: 16 max_tss = tss 17 18 return min_tss, max_tss 19 20 21 def get_relay_line_info(log_line): 22 msg_topics = re.search(r'topics="([^"]+)"', log_line).group(1) 23 msg_topic = re.search(r'pubsubTopic=([^ ]+)', log_line).group(1) 24 msg_hash = re.search(r'hash=([^ ]+)', log_line).group(1) 25 msg_peer_id = re.search(r'peerId=([^ ]+)', log_line).group(1) 26 27 return msg_topics, msg_topic, msg_hash, msg_peer_id 28 29 30 def compute_injection_times(injected_msgs_dict): 31 return [msg['injection_time'] for msg in injected_msgs_dict.values() if msg['status'] == 200] 32 33 34 def analyze_published(log_line, node_logs, msgs_dict, msg_publishTime): 35 msg_topics, msg_topic, msg_hash, msg_peer_id = get_relay_line_info(log_line) 36 node_logs[msg_peer_id]['published'].append([msg_publishTime, msg_topics, msg_topic, msg_hash]) 37 38 if msg_hash not in msgs_dict: 39 msgs_dict[msg_hash] = {'published': [{'ts': msg_publishTime, 'peer_id': msg_peer_id}], 40 'received': []} 41 else: 42 msgs_dict[msg_hash]['published'].append( 43 {'ts': msg_publishTime, 'peer_id': msg_peer_id}) 44 45 46 def analyze_received(log_line, node_logs, msgs_dict, msg_receivedTime): 47 msg_topics, msg_topic, msg_hash, msg_peer_id = get_relay_line_info(log_line) 48 node_logs[msg_peer_id]['received'].append([msg_receivedTime, msg_topics, msg_topic, msg_hash]) 49 50 if msg_hash not in msgs_dict: 51 msgs_dict[msg_hash] = {'published': [], 'received': [ 52 {'ts': msg_receivedTime, 'peer_id': msg_peer_id}]} 53 else: 54 msgs_dict[msg_hash]['received'].append( 55 {'ts': msg_receivedTime, 'peer_id': msg_peer_id}) 56 57 58 def parse_lines_in_file(file, node_logs, msgs_dict, min_tss, max_tss): 59 for log_line in file: 60 if 'waku.relay' in log_line: 61 if 'published' in log_line: 62 msg_publishTime = int(re.search(r'publishTime=([\d]+)', log_line).group(1)) 63 64 analyze_published(log_line, node_logs, msgs_dict, msg_publishTime) 65 66 min_tss, max_tss = update_min_max_tss(msg_publishTime, min_tss, max_tss) 67 68 elif 'received' in log_line: 69 msg_receivedTime = int(re.search(r'receivedTime=([\d]+)', log_line).group(1)) 70 71 analyze_received(log_line, node_logs, msgs_dict, msg_receivedTime) 72 73 min_tss, max_tss = update_min_max_tss(msg_receivedTime, min_tss, max_tss) 74 75 return min_tss, max_tss 76 77 78 def compute_message_latencies(msgs_dict): 79 # Compute message latencies and propagation times throughout the network 80 pbar = tqdm(msgs_dict.items()) 81 for msg_hash, msg_data in pbar: 82 # NOTE: Careful here as I am assuming that every message is published once ... 83 if len(msg_data['published']) > 1: 84 analysis_logger.G_LOGGER.warning('Several publishers of message %s') 85 86 published_ts = int(msg_data['published'][0]['ts']) 87 peer_id = msg_data['published'][0]['peer_id'] 88 89 pbar.set_description('Computing latencies of message %s' % msg_hash) 90 91 # Compute latencies 92 latencies = [] 93 for received_data in msg_data['received']: 94 # Skip self 95 if received_data['peer_id'] == peer_id: 96 analysis_logger.G_LOGGER.warning('Message %s received by the same node that published it' % msg_hash) 97 continue 98 # NOTE: We are getting some negative latencies meaning that the message appears to be received before it was sent ... 99 # I assume this must be because those are the nodes that got the message injected in the first place 100 # TLDR: Should be safe to ignore all the negative latencies 101 latency = int(received_data['ts']) - published_ts 102 peer_id = msg_data['published'][0]['peer_id'] 103 latencies.append(latency) 104 105 msgs_dict[msg_hash]['latencies'] = latencies 106 107 108 def compute_propagation_times(msgs_dict): 109 msg_propagation_times = [] 110 pbar = tqdm(msgs_dict.items()) 111 112 for msg_hash, msg_data in pbar: 113 pbar.set_description('Computing propagation time of message %s' % msg_hash) 114 # todo check Why do we round here 115 # msg_propagation_times.append(round(max(msg_data['latencies']) / 1000000)) 116 msg_propagation_times.append(max(msg_data['latencies']) / 1000000) 117 118 return msg_propagation_times 119 120 121 def compute_message_delivery(msgs_dict, injected_msgs_dict): 122 # Compute message delivery 123 total_messages = len(injected_msgs_dict) 124 delivered_messages = len(msgs_dict) 125 lost_messages = total_messages - delivered_messages 126 delivery_rate = delivered_messages * 100 / total_messages 127 128 analysis_logger.G_LOGGER.info(f'{delivered_messages} of {total_messages} messages delivered. ' 129 f'Lost: {lost_messages}. Delivery rate {delivery_rate}') 130 131 return delivery_rate 132 133 134 def analyze_containers(topology, simulation_path): 135 node_logs = {} 136 msgs_dict = {} 137 max_tss = -sys.maxsize - 1 138 min_tss = sys.maxsize 139 140 for container_name, container_info in topology["containers"].items(): 141 node_pbar = tqdm(container_info["nodes"]) 142 143 node_pbar.set_description(f"Parsing log of container {container_name}") 144 145 log_parser.prepare_node_in_logs(node_pbar, topology, node_logs, container_name) 146 147 folder = glob.glob(f'{simulation_path}/{container_name}--*') 148 if len(folder) > 1: 149 raise RuntimeError(f"Error: Multiple containers with same name: {folder}") 150 151 file = log_parser.open_file(folder) 152 min_tss, max_tss = parse_lines_in_file(file, node_logs, msgs_dict, min_tss, max_tss) 153 file.close() 154 155 return node_logs, msgs_dict, min_tss, max_tss 156 157 158 def inject_metric_in_dict(metrics, key_name, title, y_label, metric_name, values): 159 metrics[key_name] = {} 160 metrics[key_name]["title"] = title 161 metrics[key_name]["y_label"] = y_label 162 metrics[key_name]["metric_name"] = metric_name 163 metrics[key_name]["values"] = values