hproc.py
1 import typer 2 import sys 3 import os 4 import stat 5 import math 6 from pathlib import Path 7 import time 8 import json 9 import networkx as nx 10 import re 11 import logging as log 12 import pandas as pd 13 import numpy as np 14 import matplotlib.pyplot as plt 15 from matplotlib.backends.backend_pdf import PdfPages 16 from sklearn.cluster import KMeans 17 18 from src import vars 19 from src import topology 20 from src import log_parser 21 from src import analysis 22 23 # check if the path exists and is of appropriate type 24 def path_ok(path : Path, isDir=False): 25 if not path.exists(): 26 log.error(f'"{path}" does not exist') 27 return False 28 mode = path.stat().st_mode 29 if not isDir and not stat.S_ISREG(mode): 30 log.error(f'File expected: "{path}" is not a file') 31 return False 32 if isDir and not stat.S_ISDIR(mode): 33 log.error(f'Directory expected: "{path}" is not a directory') 34 return False 35 # lay off the permission checks; resolve them lazily with open 36 return True 37 38 39 # define singleton 40 class Singleton(type): 41 _instances = {} 42 def __call__(cls, *args, **kwargs): 43 if cls not in cls._instances: 44 cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) 45 return cls._instances[cls] 46 47 48 # convert human readable sizes to bytes 49 class Human2BytesConverter(metaclass=Singleton): 50 def __init__(self): # add any human readable format/size and multiplier here 51 self.letters = {} 52 self.letters[3] = {'GiB' : 1024*1024*1024, 'MiB' : 1024*1024, 'KiB' : 1024} 53 self.letters[2] = {'GB' : 1024*1024*1024, 'MB' : 1024*1024, 'KB' : 1024, 54 'gB' : 1000*1000*1000, 'mB' : 1000*1000, 'kB' : 1000} 55 self.letters[1] = {'B':1} 56 57 def convert(self, value): 58 for i in [3, 2, 1]: 59 k = value[-i:] 60 if k in self.letters[i]: 61 return float(value[:-i]) * self.letters[i][k] 62 return np.nan 63 64 65 # Base class for plots and common helpers 66 class Plots(metaclass=Singleton): 67 def __init__(self, log_dir, oprefix, jf, to_plot, cfile): 68 self.log_dir, self.oprefix = log_dir, oprefix 69 self.df, self.n, self.keys, self.cols = pd.DataFrame(), 0, [], [] 70 self.col2title, self.col2units, self.key2nodes = {}, {}, {} 71 self.msg_settling_times, self.msg_injection_times = {}, {} 72 self.grp2idx, self.idx2grp = {}, {} 73 self.fig, self.axes = "", "" 74 self.json_fname, self.G = jf, nx.empty_graph() 75 self.to_plot, self.to_compare = to_plot, [] 76 self.run_summary, self.cfile = "", cfile 77 78 # waku log processing 79 def compute_msg_settling_times(self): 80 ldir = str(self.log_dir) 81 topology_info = topology.load_json(f'{ldir}/{vars.G_TOPOLOGY_FILE_NAME}') 82 topology.load_topics_into_topology(topology_info, f'{ldir}/config/topology_generated/') 83 injected_msgs_dict = log_parser.load_messages(ldir) 84 node_logs, msgs_dict, min_tss, max_tss = analysis.analyze_containers(topology_info, ldir) 85 simulation_time_ms = round((max_tss - min_tss) / 1000000) 86 log.info((f'Simulation started at {min_tss}, ended at {max_tss}. ' 87 f'Effective simulation time was {simulation_time_ms} ms.')) 88 analysis.compute_message_delivery(msgs_dict, injected_msgs_dict) 89 analysis.compute_message_latencies(msgs_dict) 90 self.msg_settling_times = analysis.compute_propagation_times(msgs_dict) 91 self.msg_injection_times = analysis.compute_injection_times(injected_msgs_dict) 92 #print("message propagation_times: ", self.msg_settling_times) 93 94 def get_key(self): 95 return self.df.Key 96 97 def set_keys(self): 98 self.keys = self.df['Key'].unique() 99 self.keys.sort() 100 101 # set the summary for the run: to be used for plot title 102 def set_summary(self): 103 with open(self.cfile, 'r') as f: # Load config file 104 conf = json.load(f) 105 minsize = int(conf["wls"]["min_packet_size"]/1024) 106 maxsize = int(conf["wls"]["max_packet_size"]/1024) 107 self.run_summary = (f'{conf["gennet"]["num_nodes"]}by' 108 f'{conf["gennet"]["container_size"]}fo' 109 f'{conf["gennet"]["fanout"]}-' 110 f'{conf["wls"]["message_rate"]}mps-' 111 f'({minsize}-{maxsize})KiB-' 112 f'{conf["wls"]["simulation_time"]}sec') 113 print(f'summary: {self.run_summary} (from {self.cfile})') 114 115 # set the fields that go into the comparison panel 116 def set_compare(self, lst): 117 self.to_compare = lst 118 119 # extract the maximal complete sample set 120 def remove_incomplete_samples(self, grp, err=''): 121 #if not err: 122 self.df, minRows = self.df[~self.df.isin([err]).any(axis=1)], sys.maxsize 123 for cid in self.df[grp].unique(): 124 rows = self.df[self.df[grp] == cid].shape[0] 125 minRows = rows if minRows > rows else minRows 126 self.df = self.df.groupby(grp).head(minRows) 127 128 # plot the settling times, both network- and node-wise 129 def plot_msg_settling_times(self): 130 self.set_panel_size(2, 1, False) 131 nmsgs = len(self.msg_settling_times) 132 self.fig.suptitle(f'Settling Time: {self.run_summary}, {nmsgs} msgs') 133 self.fig.supylabel("msecs") 134 self.axes[0].set_xticks([x + 1 for x in range(len(self.keys))]) 135 #axes[0].set_xticks(ticks=[x + 1 for x in range(len(self.waku_cids))], labels=self.df["ContainerID"].unique()) 136 self.axes[0].set_xlabel('TODO: revisit after Jordi added per-container settling times') 137 138 self.axes[1].violinplot(self.msg_settling_times, showmedians=True) 139 self.axes[1].axes.xaxis.set_visible(False) 140 plt.savefig(f'{self.oprefix}-settling-time.pdf', format="pdf", bbox_inches="tight") 141 #plt.show() 142 143 # set the panel params 144 def set_panel_size(self, m, n, shareY=False): 145 self.fig, self.axes = plt.subplots(m, n, layout='constrained', sharey=shareY) 146 self.fig.set_figwidth(12) 147 self.fig.set_figheight(10) 148 149 # plot col panels for selected columns 150 def plot_col_panels(self, agg): 151 for col in self.to_plot["ColPanel"]: 152 if col not in self.df.columns: 153 log.error(f"ColPanel: {col} is not in {self.df.columns}, skipping...") 154 continue 155 if col in ["CPUPerc", "MemUse"]: 156 self.col_panel_helper(col) 157 else: 158 self.col_panel_helper(col, agg) 159 160 # plot degree/col panels for the given set of columns 161 def plot_deg_col_panels(self): 162 for col in self.to_plot["DegColPanel"]: 163 if col not in self.df.columns: 164 log.error(f"DegColPanel: {col} is not in {self.df.columns}, skipping...") 165 continue 166 self.deg_col_panel_helper(col) # only agg for now 167 168 # plot the column panel 169 def col_panel_helper(self, col, agg=True): 170 self.set_panel_size(2, 2) 171 self.fig.suptitle(f'{self.col2title[col]}: {self.run_summary}') 172 self.fig.supylabel(self.col2units[col]) 173 174 per_key_arr = [] 175 176 # per docker violin plot 177 self.axes[0,0].ticklabel_format(style='plain') 178 self.axes[0,0].yaxis.grid(True) 179 for key in self.keys: 180 if agg: 181 tmp = self.df[self.get_key() == key][col].values 182 else: 183 tmp = self.df[self.get_key() == key][col].diff().dropna().values 184 per_key_arr.append(tmp) 185 #all_arr = np.concatenate((all_arr, tmp), axis=0) 186 187 #self.axes[0,0].set_xticks([x + 1 for x in range(len(self.keys))]) 188 labels = [ '{}{}'.format( ' ', k) for i, k in enumerate(self.keys)] 189 #self.axes[0,0].set_xticklabels(labels) 190 legends = self.axes[0,0].violinplot(dataset=per_key_arr, showmedians=True) 191 #text = "" 192 #for key, nodes in self.key2nodes.items(): 193 # text += f'{key} {", ".join(nodes)}\n' 194 #self.axes[0,0].text(0.675, 0.985, text, transform=self.axes[0,0].transAxes, 195 # fontsize=7, verticalalignment='top') 196 197 # consolidated violin plot 198 self.axes[1,0].ticklabel_format(style='plain') 199 self.axes[1,0].yaxis.grid(True) 200 self.axes[1,0].set_xlabel('All Containers') 201 self.axes[1,0].violinplot(self.df[col], showmedians=True) 202 self.axes[1,0].set_xticks([]) 203 self.axes[1,0].axes.xaxis.set_visible(False) 204 205 # per docker scatter plot 206 self.axes[0,1].ticklabel_format(style='plain') 207 self.axes[0,1].yaxis.grid(True) 208 self.axes[0,1].set_xlabel('Time') 209 legends = [] 210 for i, key in enumerate(self.keys): 211 y = per_key_arr[i] 212 legends.append(self.axes[0,1].scatter(x=range(0, len(y)), y=y, marker='.')) 213 #self.axes[0,1].legend(legends, self.keys, scatterpoints=1, 214 # loc='upper left', ncol=3, 215 # fontsize=8) 216 217 # consolidated/summed-up scatter plot 218 self.axes[1,1].ticklabel_format(style='plain') 219 self.axes[1,1].yaxis.grid(True) 220 self.axes[1,1].set_xlabel('Time') 221 out, out_avg, nkeys = [], [], len(per_key_arr) 222 # omit the very last measurement: could be a partial record 223 jindices, iindices = range (len(per_key_arr[0])-1), range(len(per_key_arr)) 224 for j in jindices: 225 out.append(0.0) 226 for i in iindices: 227 out[j] += per_key_arr[i][j] 228 out_avg.append(out[j]/nkeys) 229 self.axes[1,1].plot(out, color='b') 230 self.axes[1,1].plot(out_avg, color='y') 231 self.axes[1,1].legend([f'Total {self.col2title[col]}', f'Average {self.col2title[col]}'], 232 loc='upper right', ncol=1, fontsize=8) 233 plt.savefig(f'{self.oprefix}-col-panel-{col}.pdf') 234 #plt.show() 235 236 # build the key2nodes: useful when $container_size$ > 1 237 def build_key2nodes(self): 238 with open(self.kinspect_fname) as f: 239 for line in f: 240 if "User Services" in line: 241 f.readline() 242 break 243 for line in f: 244 if line == "\n": 245 break 246 larray = line.split() 247 if "containers_" in larray[1]: 248 key = larray[1] 249 self.key2nodes[key] = [larray[2].split("libp2p-")[1].replace(':', '')] 250 elif "libp2p-node" in larray[0]: 251 self.key2nodes[key].append(larray[0].split("libp2p-")[1].replace(':', '')) 252 253 # export the df if needed 254 def get_df(self): 255 return self.df 256 257 # build indices or cluster plots 258 def build_cluster_index(self, grp): 259 lst = self.df[grp].unique() 260 self.grp2idx = { val : i for i, val in enumerate(lst)} 261 self.idx2grp = { i : val for i, val in enumerate(lst)} 262 self.df[f'{grp}_idx'] = self.df[grp].map(lambda x: self.grp2idx[x]) 263 264 # plot the cluster panel 265 def cluster_plot_helper(self, grp, cols): 266 self.build_cluster_index(grp) 267 kmeans = KMeans(n_clusters=10, n_init='auto') 268 269 groups = self.df[grp].unique() 270 groups.sort() 271 xpdf = pd.DataFrame() 272 for g in groups: 273 X =self.df.loc[self.df[grp] == g][cols] 274 Xflat = X.values.flatten() 275 xpdf[g] = Xflat 276 labels = kmeans.fit_predict(X) 277 #TODO: plot better. it is not very interpretable now 278 self.axes[0,1].scatter(x=range(0, len(labels)), y=labels, marker='.') 279 #self.axes[0,1].scatter(X.iloc[:, 0], X.iloc[:, 1], c=labels, cmap='plasma') 280 self.axes[0,1].set_xlabel('Time') 281 #axis.set_yticks([x for x in range(len(groups))]) 282 self.axes[0,1].set_yticks(range(len(groups))) 283 labels = ['{}{}'.format( ' ', k) for i, k in enumerate(self.keys)] 284 self.axes[0,1].set_yticklabels(labels) 285 286 labels = kmeans.fit_predict(xpdf) 287 self.axes[1,1].scatter(xpdf.iloc[:, 0], xpdf.iloc[:, 2], c=labels, cmap='plasma') 288 289 # plot the comparison panel 290 def plot_compare_panel(self): 291 self.set_panel_size(2, 3) 292 self.fig.suptitle(f'{self.run_summary}') 293 k = 0 294 for i in [0,1]: 295 for j in [0,1,2]: 296 col = self.to_compare[k] 297 #self.axes[i,j].ticklabel_format(style='plain') 298 self.axes[i,j].yaxis.grid(True) 299 pc = self.axes[i,j].violinplot(self.df[col], showmedians=True) 300 self.axes[i,j].set_ylabel(self.col2units[col]) 301 self.axes[i,j].set_title(self.col2title[col]) 302 #for p in pc['bodies']: 303 # p.set_facecolor('green') 304 # p.set_edgecolor('k') 305 # p.set_alpha(0.5) 306 k += 1 307 plt.savefig(f'{self.oprefix}-compare.pdf', format="pdf", bbox_inches="tight") 308 #plt.show() 309 310 def phase_plots_helper(self, grp, col): 311 pass 312 313 # build the network from json 314 def read_network(self): 315 with open(self.json_fname) as f: 316 js_graph = json.load(f) 317 for src in js_graph['nodes'].keys(): 318 for dst in js_graph['nodes'][src]['static_nodes']: 319 self.G.add_edge(src, dst) 320 321 # plot the network and degree histogram 322 def plot_network(self): 323 self.set_panel_size(1, 2) 324 self.fig.suptitle(f'Network & Degree Distribution: {self.run_summary}') 325 nx.draw(self.G, ax=self.axes[0], pos=nx.kamada_kawai_layout(self.G), with_labels=True) 326 327 degree_sequence = sorted((d for n, d in self.G.degree()), reverse=True) 328 w = np.ones(len(degree_sequence))/len(degree_sequence) 329 hist, bin_edges = np.histogram(degree_sequence, weights=w, density=False) 330 width = (bin_edges[1] - bin_edges[0]) 331 self.axes[1].bar(x=bin_edges[:-1], height=hist, align='center', 332 width=width, edgecolor='k', facecolor='green', alpha=0.5) 333 self.axes[1].set_xticks(range(max(degree_sequence)+1)) 334 self.axes[1].set_title("Normalised degree histogram") 335 self.axes[1].set_xlabel("Degree") 336 self.axes[1].set_ylabel("% of Nodes") 337 plt.savefig(f'{self.oprefix}-network.pdf', format="pdf", bbox_inches="tight") 338 #plt.show() 339 340 # plot the degree col panel 341 def deg_col_panel_helper(self, col): 342 self.set_panel_size(1, 2, shareY=True) 343 self.fig.suptitle(f'Conditional/Total Normalised Histograms for {self.col2title[col]} : {self.run_summary}') 344 self.fig.supylabel(self.col2units[col]) 345 degree_sequence = sorted((d for n, d in self.G.degree()), reverse=True) 346 x, y = np.unique(degree_sequence, return_counts=True) 347 by_degree = [[] for i in range(x[-1]+1)] 348 for node, degree in self.G.degree(): 349 curr = self.df[self.df.NodeName == node][col].values 350 if len(by_degree[degree]) == 0 : 351 by_degree[degree]=self.df[self.df.NodeName == node][col].values 352 else : 353 np.append(by_degree[degree], self.df[self.df.NodeName == node][col].values) 354 legends = [] 355 for d in by_degree: 356 if len(d) == 0: 357 continue 358 w = np.ones(len(d))/len(d) 359 hist, bin_edges = np.histogram(d, weights=w, density=False) 360 width = (bin_edges[1] - bin_edges[0]) 361 legends.append(self.axes[0].bar(x=bin_edges[:-1], height=hist, align='center', 362 width=width, edgecolor='k', alpha=0.75)) 363 self.axes[0].legend(legends, x, scatterpoints=1, 364 loc='upper left', ncol=5, 365 fontsize=8) 366 self.axes[0].set_title("Conditional Histogram (degree)") 367 d = self.df[col] 368 w = np.ones(len(d))/len(d) 369 hist, bin_edges = np.histogram(d, weights=w, density=False) 370 width = (bin_edges[1] - bin_edges[0]) 371 self.axes[1].bar(x=bin_edges[:-1], height=hist, align='center', 372 width=width, edgecolor='k', facecolor='green', alpha=0.5) 373 self.axes[1].set_title("Total Histogram") 374 plt.savefig(f'{self.oprefix}-deg-col-panel-{col}.pdf') 375 #plt.show() 376 377 378 # handle docker stats 379 class DStats(Plots, metaclass=Singleton): 380 def __init__(self, log_dir, oprefix, jf, to_plot, cfile): 381 Plots.__init__(self, log_dir, oprefix, jf, to_plot, cfile) 382 self.dstats_fname = f'{log_dir}/dstats-data/docker-stats.out' 383 self.kinspect_fname = f'{log_dir}/dstats-data/docker-kinspect.out' 384 self.col2title = { "ContainerID" : "Docker ID", 385 "ContainerName" : "Docker Name", 386 "CPUPerc" : "CPU Utilisation", 387 "MemUse" : "Memory Usage", 388 "MemTotal" : "Total Memory", 389 "MemPerc" : "Memory Utilisation", 390 "NetRecv" : "Network Received", 391 "NetSent" : "Network Sent", 392 "BlockR" : "Block Reads", 393 "BlockW" : "Block Writes", 394 "CPIDS" : "Docker PIDS" 395 } 396 self.col2units = { "ContainerID" : "ID", 397 "ContainerName" : "Name", 398 "CPUPerc" : "Percentage (%)", 399 "MemUse" : "MiB", 400 "MemTotal" : "MiB", 401 "MemPerc" : "Percentage (%)", 402 "NetRecv" : "MiB", 403 "NetSent" : "MiB", 404 "BlockR" : "MiB", 405 "BlockW" : "MiB", 406 "CPIDS" : "PIDS" 407 } 408 self.cols = ["CPUPerc", "MemUse","NetRecv", "NetSent", "BlockR", "BlockW"] 409 410 # remove the formatting artefacts 411 def pre_process(self): 412 if not path_ok(Path(self.dstats_fname)): 413 sys.exit(0) 414 self.set_summary() 415 regex = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') 416 with open(self.dstats_fname) as f: 417 cleaned_txt = regex.sub('', f.read()) 418 with open(self.dstats_fname, 'w') as f: 419 f.write(cleaned_txt) 420 421 # make sure the df is all numeric 422 def post_process(self): 423 for name in ["ContainerID", "ContainerName"]: 424 self.df[name] = self.df[name].map(lambda x: x.strip()) 425 h2b, n = Human2BytesConverter(), len(self.keys) 426 for percent in ["CPUPerc", "MemPerc"]: 427 self.df[percent] = self.df[percent].str.replace('%','').astype(float) 428 for size in ["MemUse", "MemTotal"]: 429 self.df[size] = self.df[size].map(lambda x:h2b.convert(x.strip())/(1024*1024)) # MiB 430 for size in ["NetRecv", "NetSent"]: 431 self.df[size] = self.df[size].map(lambda x:h2b.convert(x.strip())/(1024*1024)) # MiB 432 for size in ["BlockR", "BlockW"]: 433 self.df[size] = self.df[size].map(lambda x:h2b.convert(x.strip())/(1024*1024)) # MiB 434 self.df['Key'] = self.df['ContainerName'].map(lambda x: x.split("--")[0]) 435 self.build_key2nodes() 436 self.df['NodeName'] = self.df['Key'].map(lambda x: self.key2nodes[x][0]) 437 self.set_keys() 438 439 # build df from csv 440 def process_data(self): 441 log.info(f'processing {self.dstats_fname}...') 442 self.pre_process() 443 self.df = pd.read_csv(self.dstats_fname, header=0, comment='#', 444 skipinitialspace = True, delimiter='/', 445 usecols=["ContainerID", "ContainerName", 446 "CPUPerc", "MemUse", "MemTotal", "MemPerc", 447 "NetRecv", "NetSent", "BlockR","BlockW", "CPIDS"]) 448 self.post_process() 449 self.remove_incomplete_samples(grp='Key', err='--') 450 self.df.to_csv(f'{self.oprefix}-cleaned.csv', sep='/') 451 452 453 class HostProc(Plots, metaclass=Singleton): 454 def __init__(self, log_dir, oprefix, jf, to_plot, cfile): 455 Plots.__init__(self, log_dir, oprefix, jf, to_plot, cfile) 456 self.fname = f'{log_dir}/host-proc-data/docker-proc.out' 457 self.kinspect_fname = f'{log_dir}/host-proc-data/docker-kinspect.out' 458 self.col2title = { 'CPUPerc' : 'CPU Utilisation', 459 'VmPeak' : 'Peak Virtual Memory Usage', 460 'MemUse' : 'Peak Virtual Memory Usage', 461 'VmSize' : 'Current Virtual Memory Usage', 462 'VmRSS' : 'Peak Physical Memory Usage', 463 'VmData' : 'Size of Data Segment', 464 'VmStk' : 'Size of Stack Segment', 465 'NetRecv' : 'Network1: Received Bytes', 466 'NetRecvPkts' : 'Network1: Received Packets', 467 'NetSent' : 'Network1: Transmitted Bytes', 468 'NetSentPkts' : 'Network1: Transmitted Packets', 469 'NetRX' : 'Network2: Received Bytes', 470 'NetWX' : 'Network2: Transmitted Bytes', 471 'InOctets' : 'Network3: InOctets', 472 'OutOctets' : 'Network3: OutOctets', 473 'BlockR' : 'Block Reads', 474 'BlockW' : 'Block Writes' 475 } 476 self.col2units = { 'CPUPerc' : '%', 477 'VmPeak' : 'MiB', 478 'MemUse' : 'MiB', 479 'VmSize' : 'MiB', 480 'VmRSS' : 'MiB', 481 'VmData' : 'MiB', 482 'VmStk' : 'MiB', 483 'NetRecv' : 'MiB', 484 'NetRecvPkts' : 'Packets', 485 'NetSent' : 'MiB', 486 'NetSentPkts' : 'Packets', 487 'NetRX' : 'MiB', 488 'NetWX' : 'MiB', 489 'InOctets' : 'MiB', 490 'OutOctets' : 'MiB', 491 'BlockR' : 'MiB', 492 'BlockW' : 'MiB' 493 } 494 self.cols = ['CPUPerc', 'VmPeak', 'MemUse', 'VmSize', 'VmRSS', 'VmData', 'VmStk', 495 'RxBytes', 'RxPackets', 'TxBytes', 'TxPackets', 'NetRecv', 'NetSent', 496 'InOctets', 'OutOctets', 'BlockR', 'BlockW'] 497 498 def process_data(self): 499 if not path_ok(Path(self.fname)): 500 sys.exit(0) 501 self.set_summary() 502 self.df = pd.read_csv(self.fname, header=0, comment='#', 503 skipinitialspace = True, delimiter=r"\s+", 504 usecols= ['EpochId', 'PID', 'TimeStamp', 505 'ContainerName', 'ContainerID', 'NodeName', 506 'VmPeak', 'VmPeakUnit', 'VmSize', 'VmSizeUnit', 507 'VmRSS', 'VmRSSUnit', 'VmData','VmDataUnit', 'VmStk', 'VmStkUnit', 508 'HostVIF', 'NetSent', 'NetSentPkts', 'NetRecv', 'NetRecvPkts', 509 #'VETH', 'InOctets', 'OutOctets', 510 #'DockerVIF', 'NetRecv', 'NetSent', 511 #'VETH', 'InOctets', 'OutOctets', 512 'BlockR', 'BlockW', 513 'CPUPerc']) 514 self.post_process() 515 self.remove_incomplete_samples(grp='Key') 516 self.df.to_csv(f'{self.oprefix}-cleaned.csv', sep='/') 517 518 # normalise the units 519 def post_process(self): 520 #h2b = Human2BytesConverter() 521 for size in ['VmPeak', 'VmSize','VmRSS', 'VmData','VmStk']: 522 self.df[size] = self.df[size].map(lambda x: x/1024) # MiBs 523 for size in ['NetRecv','NetSent']: 524 self.df[size] = self.df[size].map(lambda x: x/(1024*1024)) # MiBs 525 for size in ['BlockR', 'BlockW']: 526 self.df[size] = self.df[size].map(lambda x: x/(1024*1024)) # MiBs 527 #self.df['Key'] = self.df['ContainerName'].map(lambda x: x.split("--")[0]) 528 self.df['Key'] = self.df['NodeName']#.map(lambda x: x.split("--")[0]) 529 self.df['MemUse'] = self.df['VmPeak'] 530 self.build_key2nodes() 531 #self.df.rename(columns={'NodeName': 'Key'}, inplace=True) 532 self.set_keys() 533 self.df.fillna(0) 534 535 # not very helpful plots atm 536 def plot_clusters(self, grp, agg, axes): 537 self.cluster_plot_helper(col=['CPUPerc', 'VmPeak', 'VmRSS', 'MemUse', 'VmData'], grp=grp, axes=axes) 538 539 540 # sanity check config file 541 def _config_file_callback(ctx: typer.Context, param: typer.CallbackParam, cfile: str): 542 if cfile: 543 typer.echo(f"Loading config file: {os.path.basename(cfile)}") 544 ctx.default_map = ctx.default_map or {} # Init the default map 545 try: 546 with open(cfile, 'r') as f: # Load config file 547 conf = json.load(f) 548 if "plotting" not in conf: 549 log.error(f"No plotting is requested in {cfile}. Skipping plotting.") 550 sys.exit(0) 551 # Merge config and default_map 552 if ctx.command.name in conf["plotting"]: 553 ctx.default_map.update(conf["plotting"][ctx.command.name]) 554 else: 555 if "hproc" in conf["plotting"]: 556 ctx.default_map.update(conf["plotting"]["hproc"]) 557 else: 558 log.info(f"No dstats/host-proc params in config. Sticking to defaults") 559 #ctx.default_map.update(conf["plotting"]) # Merge config and default_map 560 except Exception as ex: 561 raise typer.BadParameter(str(ex)) 562 return cfile 563 564 565 # instantiate typer and set the commands 566 app = typer.Typer() 567 568 # common cmd processor/plotter 569 def cmd_helper(metric_infra, to_plot, agg, to_compare): 570 metric_infra.process_data() 571 # always plot the compare plots; rest on demand 572 metric_infra.set_compare(to_compare) 573 metric_infra.plot_compare_panel() 574 log.info(f'to_plot : {to_plot}') 575 if "Network" in to_plot and to_plot["Network"]: 576 metric_infra.read_network() 577 metric_infra.plot_network() 578 if "ColPanel" in to_plot and to_plot["ColPanel"]: 579 metric_infra.plot_col_panels(agg) 580 if "ValueCluster" in to_plot and to_plot["ValueCluster"]: 581 # TODO: find interpretable cluster plot 582 metric_infra.build_cluster_index('ContainerID') 583 if "DegColPanel" in to_plot: 584 metric_infra.plot_deg_col_panels() 585 if "SettlingTime" in to_plot and to_plot["SettlingTime"]: 586 metric_infra.compute_msg_settling_times() 587 metric_infra.plot_msg_settling_times() 588 #if "Compare" in to_plot and to_plot["Compare"]: 589 590 591 # process / plot docker-procfs.out 592 @app.command() 593 def host_proc(ctx: typer.Context, log_dir: Path, # <- mandatory path 594 out_prefix: str = typer.Option("output", help="Specify the prefix for the plot pdfs"), 595 aggregate: bool = typer.Option(True, help="Specify whether to aggregate"), 596 config_file: str = typer.Option("", callback=_config_file_callback, is_eager=True, 597 help="Set the input config file (JSON)")): 598 if not path_ok(log_dir, True): 599 sys.exit(0) 600 601 to_plot = ctx.default_map["to_plot"] if ctx.default_map and "to_plot" in ctx.default_map else [] 602 jf = f'{os.path.abspath(log_dir)}/config/topology_generated/network_data.json' 603 if os.path.exists("plots"): 604 os.system('rm -rf plots') 605 os.makedirs("plots") 606 host_proc = HostProc(log_dir, f'plots/{out_prefix}-host-proc', jf, to_plot, config_file) 607 cmd_helper(host_proc, to_plot, agg=aggregate, 608 to_compare=["CPUPerc", "MemUse", "NetRecv", "NetSent", "BlockR", "BlockW"]) 609 log.info(f'Done: {log_dir}') 610 611 612 # process / plot docker-dstats.out 613 @app.command() 614 def dstats(ctx: typer.Context, log_dir: Path, # <- mandatory path 615 out_prefix: str = typer.Option("output", help="Specify the prefix for the plot pdfs"), 616 aggregate: bool = typer.Option(True, help="Specify whether to aggregate"), 617 config_file: str = typer.Option("", callback=_config_file_callback, is_eager=True, 618 help="Set the input config file (JSON)")): 619 if not path_ok(log_dir, True): 620 sys.exit(0) 621 622 to_plot = ctx.default_map["to_plot"] if ctx.default_map and "to_plot" in ctx.default_map else [] 623 jf = f'{os.path.abspath(log_dir)}/config/topology_generated/network_data.json' 624 if os.path.exists("plots"): 625 os.system('rm -rf plots') 626 os.makedirs("plots") 627 dstats = DStats(log_dir, f'plots/{out_prefix}-dstats', jf, to_plot, config_file) 628 cmd_helper(dstats, to_plot, agg=aggregate, 629 to_compare=["CPUPerc", "MemUse", "NetRecv", "NetSent", "BlockR", "BlockW"]) 630 log.info(f'Done: {log_dir}') 631 632 633 if __name__ == "__main__": 634 app()