/ src / hproc.py
hproc.py
  1  import typer
  2  import sys
  3  import os
  4  import stat
  5  import math
  6  from pathlib import Path
  7  import time
  8  import json
  9  import networkx as nx
 10  import re
 11  import logging as log
 12  import pandas as pd
 13  import numpy as np
 14  import matplotlib.pyplot as plt
 15  from matplotlib.backends.backend_pdf import PdfPages
 16  from sklearn.cluster import KMeans
 17  
 18  from src import vars
 19  from src import topology
 20  from src import log_parser
 21  from src import analysis
 22  
 23  # check if the path exists and is of appropriate type
 24  def path_ok(path : Path, isDir=False):
 25      if not path.exists():
 26          log.error(f'"{path}" does not exist')
 27          return False
 28      mode = path.stat().st_mode
 29      if not isDir and not stat.S_ISREG(mode):
 30          log.error(f'File expected: "{path}" is not a file')
 31          return False
 32      if isDir and not stat.S_ISDIR(mode):
 33          log.error(f'Directory expected: "{path}" is not a directory')
 34          return False
 35      # lay off the permission checks; resolve them lazily with open
 36      return True
 37  
 38  
 39  # define singleton 
 40  class Singleton(type):
 41      _instances = {}
 42      def __call__(cls, *args, **kwargs):
 43          if cls not in cls._instances:
 44              cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
 45          return cls._instances[cls]
 46  
 47  
 48  # convert human readable sizes to bytes
 49  class Human2BytesConverter(metaclass=Singleton):
 50      def __init__(self):     # add any human readable format/size and multiplier here
 51          self.letters    = {}
 52          self.letters[3] = {'GiB' : 1024*1024*1024, 'MiB' : 1024*1024, 'KiB' : 1024}
 53          self.letters[2] = {'GB' : 1024*1024*1024, 'MB' : 1024*1024, 'KB' : 1024,
 54                              'gB' : 1000*1000*1000, 'mB' : 1000*1000, 'kB' : 1000}
 55          self.letters[1] = {'B':1}
 56  
 57      def convert(self, value):
 58          for i in [3, 2, 1]:
 59              k = value[-i:]
 60              if k in self.letters[i]:
 61                  return float(value[:-i]) * self.letters[i][k]
 62          return np.nan
 63  
 64  
 65  # Base class for plots and common helpers
 66  class Plots(metaclass=Singleton):
 67      def __init__(self, log_dir, oprefix, jf, to_plot, cfile):
 68          self.log_dir, self.oprefix = log_dir, oprefix
 69          self.df, self.n, self.keys, self.cols = pd.DataFrame(), 0, [], []
 70          self.col2title, self.col2units, self.key2nodes = {}, {}, {}
 71          self.msg_settling_times, self.msg_injection_times = {}, {}
 72          self.grp2idx, self.idx2grp = {}, {}
 73          self.fig, self.axes = "", ""
 74          self.json_fname, self.G = jf, nx.empty_graph()
 75          self.to_plot, self.to_compare = to_plot, []
 76          self.run_summary, self.cfile = "", cfile
 77  
 78      # waku log processing
 79      def compute_msg_settling_times(self):
 80          ldir = str(self.log_dir)
 81          topology_info = topology.load_json(f'{ldir}/{vars.G_TOPOLOGY_FILE_NAME}')
 82          topology.load_topics_into_topology(topology_info, f'{ldir}/config/topology_generated/')
 83          injected_msgs_dict = log_parser.load_messages(ldir)
 84          node_logs, msgs_dict, min_tss, max_tss = analysis.analyze_containers(topology_info, ldir)
 85          simulation_time_ms = round((max_tss - min_tss) / 1000000)
 86          log.info((f'Simulation started at {min_tss}, ended at {max_tss}. '
 87                    f'Effective simulation time was {simulation_time_ms} ms.'))
 88          analysis.compute_message_delivery(msgs_dict, injected_msgs_dict)
 89          analysis.compute_message_latencies(msgs_dict)
 90          self.msg_settling_times = analysis.compute_propagation_times(msgs_dict)
 91          self.msg_injection_times = analysis.compute_injection_times(injected_msgs_dict)
 92          #print("message propagation_times: ", self.msg_settling_times)
 93  
 94      def get_key(self):
 95          return self.df.Key
 96  
 97      def set_keys(self):
 98          self.keys = self.df['Key'].unique()
 99          self.keys.sort()
100  
101      # set the summary for the run: to be used for plot title
102      def set_summary(self):
103          with open(self.cfile, 'r') as f:  # Load config file
104              conf = json.load(f)
105              minsize = int(conf["wls"]["min_packet_size"]/1024)
106              maxsize = int(conf["wls"]["max_packet_size"]/1024)
107              self.run_summary = (f'{conf["gennet"]["num_nodes"]}by'
108                                  f'{conf["gennet"]["container_size"]}fo'
109                                  f'{conf["gennet"]["fanout"]}-'
110                                  f'{conf["wls"]["message_rate"]}mps-'
111                                  f'({minsize}-{maxsize})KiB-'
112                                  f'{conf["wls"]["simulation_time"]}sec')
113          print(f'summary: {self.run_summary} (from {self.cfile})')
114  
115      # set the fields that go into the comparison panel
116      def set_compare(self, lst):
117          self.to_compare = lst
118  
119      # extract the maximal complete sample set
120      def remove_incomplete_samples(self, grp, err=''):
121          #if not err:
122          self.df, minRows = self.df[~self.df.isin([err]).any(axis=1)], sys.maxsize
123          for cid in self.df[grp].unique():
124              rows = self.df[self.df[grp] == cid].shape[0]
125              minRows = rows if minRows > rows else minRows
126          self.df = self.df.groupby(grp).head(minRows)
127  
128      # plot the settling times, both network- and node-wise
129      def plot_msg_settling_times(self):
130          self.set_panel_size(2, 1, False)
131          nmsgs = len(self.msg_settling_times)
132          self.fig.suptitle(f'Settling Time: {self.run_summary}, {nmsgs} msgs')
133          self.fig.supylabel("msecs")
134          self.axes[0].set_xticks([x + 1 for x in range(len(self.keys))])
135          #axes[0].set_xticks(ticks=[x + 1 for x in range(len(self.waku_cids))], labels=self.df["ContainerID"].unique())
136          self.axes[0].set_xlabel('TODO: revisit after Jordi added per-container settling times')
137  
138          self.axes[1].violinplot(self.msg_settling_times, showmedians=True)
139          self.axes[1].axes.xaxis.set_visible(False)
140          plt.savefig(f'{self.oprefix}-settling-time.pdf', format="pdf", bbox_inches="tight")
141          #plt.show()
142  
143      # set the panel params
144      def set_panel_size(self, m, n, shareY=False):
145          self.fig, self.axes = plt.subplots(m, n, layout='constrained', sharey=shareY)
146          self.fig.set_figwidth(12)
147          self.fig.set_figheight(10)
148  
149      # plot col panels for selected columns
150      def plot_col_panels(self, agg):
151          for col in self.to_plot["ColPanel"]:
152              if col not in self.df.columns:
153                  log.error(f"ColPanel: {col} is not in {self.df.columns}, skipping...")
154                  continue
155              if col in ["CPUPerc", "MemUse"]:
156                  self.col_panel_helper(col)
157              else:
158                  self.col_panel_helper(col, agg)
159  
160      # plot degree/col panels for the given set of columns
161      def plot_deg_col_panels(self):
162          for col in self.to_plot["DegColPanel"]:
163              if col not in self.df.columns:
164                  log.error(f"DegColPanel: {col} is not in {self.df.columns}, skipping...")
165                  continue
166              self.deg_col_panel_helper(col) # only agg for now 
167  
168      # plot the column panel
169      def col_panel_helper(self, col, agg=True):
170          self.set_panel_size(2, 2)
171          self.fig.suptitle(f'{self.col2title[col]}: {self.run_summary}')
172          self.fig.supylabel(self.col2units[col])
173  
174          per_key_arr = []
175  
176          # per docker violin plot
177          self.axes[0,0].ticklabel_format(style='plain')
178          self.axes[0,0].yaxis.grid(True)
179          for key in self.keys:
180              if agg:
181                  tmp = self.df[self.get_key() == key][col].values
182              else:
183                  tmp = self.df[self.get_key() == key][col].diff().dropna().values
184              per_key_arr.append(tmp)
185              #all_arr = np.concatenate((all_arr, tmp), axis=0)
186  
187          #self.axes[0,0].set_xticks([x + 1 for x in range(len(self.keys))])
188          labels = [ '{}{}'.format( ' ', k) for i, k in enumerate(self.keys)]
189          #self.axes[0,0].set_xticklabels(labels)
190          legends = self.axes[0,0].violinplot(dataset=per_key_arr, showmedians=True)
191          #text = ""
192          #for key, nodes in self.key2nodes.items():
193          #    text += f'{key} {", ".join(nodes)}\n'
194          #self.axes[0,0].text(0.675, 0.985, text, transform=self.axes[0,0].transAxes,
195          #        fontsize=7, verticalalignment='top')
196  
197          # consolidated  violin plot
198          self.axes[1,0].ticklabel_format(style='plain')
199          self.axes[1,0].yaxis.grid(True)
200          self.axes[1,0].set_xlabel('All Containers')
201          self.axes[1,0].violinplot(self.df[col], showmedians=True)
202          self.axes[1,0].set_xticks([])
203          self.axes[1,0].axes.xaxis.set_visible(False)
204  
205          # per docker scatter plot
206          self.axes[0,1].ticklabel_format(style='plain')
207          self.axes[0,1].yaxis.grid(True)
208          self.axes[0,1].set_xlabel('Time')
209          legends = []
210          for i, key in enumerate(self.keys):
211              y = per_key_arr[i]
212              legends.append(self.axes[0,1].scatter(x=range(0, len(y)), y=y, marker='.'))
213          #self.axes[0,1].legend(legends, self.keys, scatterpoints=1,
214           #               loc='upper left', ncol=3,
215            #              fontsize=8)
216  
217          # consolidated/summed-up scatter plot
218          self.axes[1,1].ticklabel_format(style='plain')
219          self.axes[1,1].yaxis.grid(True)
220          self.axes[1,1].set_xlabel('Time')
221          out, out_avg, nkeys  = [], [], len(per_key_arr)
222          # omit the very last measurement: could be a partial record
223          jindices, iindices  =  range (len(per_key_arr[0])-1), range(len(per_key_arr))
224          for j in jindices:
225              out.append(0.0)
226              for i in iindices:
227                  out[j] += per_key_arr[i][j]
228              out_avg.append(out[j]/nkeys)
229          self.axes[1,1].plot(out, color='b')
230          self.axes[1,1].plot(out_avg, color='y')
231          self.axes[1,1].legend([f'Total {self.col2title[col]}', f'Average {self.col2title[col]}'],
232                  loc='upper right', ncol=1, fontsize=8)
233          plt.savefig(f'{self.oprefix}-col-panel-{col}.pdf')
234          #plt.show()
235  
236      # build the key2nodes: useful when $container_size$ > 1
237      def build_key2nodes(self):
238          with open(self.kinspect_fname) as f:
239              for line in f:
240                  if "User Services" in line:
241                      f.readline()
242                      break
243              for line in f:
244                  if line == "\n":
245                      break
246                  larray = line.split()
247                  if "containers_" in larray[1]:
248                      key = larray[1]
249                      self.key2nodes[key] = [larray[2].split("libp2p-")[1].replace(':', '')]
250                  elif "libp2p-node" in larray[0]:
251                      self.key2nodes[key].append(larray[0].split("libp2p-")[1].replace(':', ''))
252  
253      # export the df if needed
254      def get_df(self):
255          return self.df
256  
257      # build indices or cluster plots
258      def build_cluster_index(self, grp):
259          lst = self.df[grp].unique()
260          self.grp2idx =  { val : i for i, val in enumerate(lst)}
261          self.idx2grp =  { i : val for i, val in enumerate(lst)}
262          self.df[f'{grp}_idx'] = self.df[grp].map(lambda x: self.grp2idx[x])
263  
264      # plot the cluster panel
265      def cluster_plot_helper(self, grp, cols):
266          self.build_cluster_index(grp)
267          kmeans = KMeans(n_clusters=10, n_init='auto')
268  
269          groups = self.df[grp].unique()
270          groups.sort()
271          xpdf = pd.DataFrame()
272          for g in groups:
273              X =self.df.loc[self.df[grp] == g][cols]
274              Xflat = X.values.flatten()
275              xpdf[g] = Xflat
276              labels = kmeans.fit_predict(X)
277              #TODO: plot better. it is not very interpretable now
278              self.axes[0,1].scatter(x=range(0, len(labels)), y=labels, marker='.')
279              #self.axes[0,1].scatter(X.iloc[:, 0], X.iloc[:, 1], c=labels,  cmap='plasma')
280          self.axes[0,1].set_xlabel('Time')
281          #axis.set_yticks([x  for x in range(len(groups))])
282          self.axes[0,1].set_yticks(range(len(groups)))
283          labels = ['{}{}'.format( ' ', k) for i, k in enumerate(self.keys)]
284          self.axes[0,1].set_yticklabels(labels)
285  
286          labels = kmeans.fit_predict(xpdf)
287          self.axes[1,1].scatter(xpdf.iloc[:, 0], xpdf.iloc[:, 2], c=labels,  cmap='plasma')
288  
289      # plot the comparison panel
290      def plot_compare_panel(self):
291          self.set_panel_size(2, 3)
292          self.fig.suptitle(f'{self.run_summary}')
293          k = 0
294          for i in [0,1]:
295              for j in [0,1,2]:
296                  col = self.to_compare[k]
297                  #self.axes[i,j].ticklabel_format(style='plain')
298                  self.axes[i,j].yaxis.grid(True)
299                  pc = self.axes[i,j].violinplot(self.df[col], showmedians=True)
300                  self.axes[i,j].set_ylabel(self.col2units[col])
301                  self.axes[i,j].set_title(self.col2title[col])
302                  #for p in pc['bodies']:
303                  #    p.set_facecolor('green')
304                  #    p.set_edgecolor('k')
305                  #    p.set_alpha(0.5)
306                  k += 1
307          plt.savefig(f'{self.oprefix}-compare.pdf', format="pdf", bbox_inches="tight")
308          #plt.show()
309  
310      def phase_plots_helper(self, grp, col):
311          pass
312  
313      # build the network from json
314      def read_network(self):
315          with open(self.json_fname) as f:
316              js_graph = json.load(f)
317              for src in js_graph['nodes'].keys():
318                  for dst  in js_graph['nodes'][src]['static_nodes']:
319                      self.G.add_edge(src, dst)
320  
321      # plot the network and degree histogram
322      def plot_network(self):
323          self.set_panel_size(1, 2)
324          self.fig.suptitle(f'Network & Degree Distribution:  {self.run_summary}')
325          nx.draw(self.G, ax=self.axes[0], pos=nx.kamada_kawai_layout(self.G), with_labels=True)
326  
327          degree_sequence = sorted((d for n, d in self.G.degree()), reverse=True)
328          w = np.ones(len(degree_sequence))/len(degree_sequence)
329          hist, bin_edges = np.histogram(degree_sequence, weights=w, density=False)
330          width = (bin_edges[1] - bin_edges[0])
331          self.axes[1].bar(x=bin_edges[:-1], height=hist, align='center',
332                  width=width, edgecolor='k', facecolor='green', alpha=0.5)
333          self.axes[1].set_xticks(range(max(degree_sequence)+1))
334          self.axes[1].set_title("Normalised degree histogram")
335          self.axes[1].set_xlabel("Degree")
336          self.axes[1].set_ylabel("% of Nodes")
337          plt.savefig(f'{self.oprefix}-network.pdf', format="pdf", bbox_inches="tight")
338          #plt.show()
339  
340      # plot the degree col panel
341      def deg_col_panel_helper(self, col):
342          self.set_panel_size(1, 2, shareY=True)
343          self.fig.suptitle(f'Conditional/Total Normalised Histograms for {self.col2title[col]} :  {self.run_summary}')
344          self.fig.supylabel(self.col2units[col])
345          degree_sequence = sorted((d for n, d in self.G.degree()), reverse=True)
346          x, y = np.unique(degree_sequence, return_counts=True)
347          by_degree = [[] for i in range(x[-1]+1)]
348          for node, degree in self.G.degree():
349              curr = self.df[self.df.NodeName == node][col].values
350              if len(by_degree[degree]) == 0 :
351                  by_degree[degree]=self.df[self.df.NodeName == node][col].values
352              else :
353                  np.append(by_degree[degree], self.df[self.df.NodeName == node][col].values)
354          legends = []
355          for d in by_degree:
356              if len(d) == 0:
357                  continue
358              w = np.ones(len(d))/len(d)
359              hist, bin_edges = np.histogram(d, weights=w, density=False)
360              width = (bin_edges[1] - bin_edges[0])
361              legends.append(self.axes[0].bar(x=bin_edges[:-1], height=hist, align='center',
362                  width=width,  edgecolor='k', alpha=0.75))
363          self.axes[0].legend(legends, x, scatterpoints=1,
364                          loc='upper left', ncol=5,
365                          fontsize=8)
366          self.axes[0].set_title("Conditional Histogram (degree)")
367          d = self.df[col]
368          w = np.ones(len(d))/len(d)
369          hist, bin_edges = np.histogram(d, weights=w, density=False)
370          width = (bin_edges[1] - bin_edges[0])
371          self.axes[1].bar(x=bin_edges[:-1], height=hist, align='center',
372                  width=width, edgecolor='k', facecolor='green', alpha=0.5)
373          self.axes[1].set_title("Total Histogram")
374          plt.savefig(f'{self.oprefix}-deg-col-panel-{col}.pdf')
375          #plt.show()
376  
377  
378  # handle docker stats
379  class DStats(Plots, metaclass=Singleton):
380      def __init__(self, log_dir, oprefix, jf, to_plot, cfile):
381          Plots.__init__(self, log_dir, oprefix, jf, to_plot, cfile)
382          self.dstats_fname = f'{log_dir}/dstats-data/docker-stats.out'
383          self.kinspect_fname = f'{log_dir}/dstats-data/docker-kinspect.out'
384          self.col2title = {  "ContainerID"   : "Docker ID",
385                              "ContainerName" : "Docker Name",
386                              "CPUPerc"       : "CPU Utilisation",
387                              "MemUse"        : "Memory Usage",
388                              "MemTotal"      : "Total Memory",
389                              "MemPerc"       : "Memory Utilisation",
390                              "NetRecv"       : "Network Received",
391                              "NetSent"       : "Network Sent",
392                              "BlockR"        : "Block Reads",
393                              "BlockW"        : "Block Writes",
394                              "CPIDS"          : "Docker PIDS"
395                              }
396          self.col2units = {  "ContainerID"   : "ID",
397                              "ContainerName" : "Name",
398                              "CPUPerc"       : "Percentage (%)",
399                              "MemUse"        : "MiB",
400                              "MemTotal"      : "MiB",
401                              "MemPerc"       : "Percentage (%)",
402                              "NetRecv"       : "MiB",
403                              "NetSent"       : "MiB",
404                              "BlockR"        : "MiB",
405                              "BlockW"        : "MiB",
406                              "CPIDS"          : "PIDS"
407                              }
408          self.cols = ["CPUPerc", "MemUse","NetRecv", "NetSent", "BlockR", "BlockW"]
409  
410      # remove the formatting artefacts
411      def pre_process(self):
412          if not path_ok(Path(self.dstats_fname)):
413              sys.exit(0)
414          self.set_summary()
415          regex = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
416          with open(self.dstats_fname) as f:
417              cleaned_txt = regex.sub('', f.read())
418          with open(self.dstats_fname, 'w') as f:
419              f.write(cleaned_txt)
420  
421      # make sure the df is all numeric
422      def post_process(self):
423          for name in ["ContainerID", "ContainerName"]:
424              self.df[name] = self.df[name].map(lambda x: x.strip())
425          h2b, n = Human2BytesConverter(), len(self.keys)
426          for percent in ["CPUPerc", "MemPerc"]:
427              self.df[percent] = self.df[percent].str.replace('%','').astype(float)
428          for size in ["MemUse", "MemTotal"]:
429              self.df[size] = self.df[size].map(lambda x:h2b.convert(x.strip())/(1024*1024)) # MiB
430          for size in ["NetRecv", "NetSent"]:
431              self.df[size] = self.df[size].map(lambda x:h2b.convert(x.strip())/(1024*1024)) # MiB
432          for size in ["BlockR", "BlockW"]:
433              self.df[size] = self.df[size].map(lambda x:h2b.convert(x.strip())/(1024*1024)) # MiB
434          self.df['Key'] = self.df['ContainerName'].map(lambda x: x.split("--")[0])
435          self.build_key2nodes()
436          self.df['NodeName'] = self.df['Key'].map(lambda x: self.key2nodes[x][0])
437          self.set_keys()
438  
439      # build df from csv
440      def process_data(self):
441          log.info(f'processing {self.dstats_fname}...')
442          self.pre_process()
443          self.df = pd.read_csv(self.dstats_fname, header=0,  comment='#',
444                              skipinitialspace = True, delimiter='/',
445                              usecols=["ContainerID", "ContainerName",
446                                      "CPUPerc", "MemUse", "MemTotal", "MemPerc",
447                                      "NetRecv", "NetSent", "BlockR","BlockW",  "CPIDS"])
448          self.post_process()
449          self.remove_incomplete_samples(grp='Key', err='--')
450          self.df.to_csv(f'{self.oprefix}-cleaned.csv', sep='/')
451  
452  
453  class HostProc(Plots, metaclass=Singleton):
454      def __init__(self, log_dir, oprefix, jf, to_plot, cfile):
455          Plots.__init__(self, log_dir, oprefix, jf, to_plot, cfile)
456          self.fname = f'{log_dir}/host-proc-data/docker-proc.out'
457          self.kinspect_fname = f'{log_dir}/host-proc-data/docker-kinspect.out'
458          self.col2title = {  'CPUPerc'   : 'CPU Utilisation',
459                              'VmPeak'    : 'Peak Virtual Memory Usage',
460                              'MemUse'    : 'Peak Virtual Memory Usage',
461                              'VmSize'    : 'Current Virtual Memory Usage',
462                              'VmRSS'     : 'Peak Physical Memory Usage',
463                              'VmData'    : 'Size of Data Segment',
464                              'VmStk'     : 'Size of Stack Segment',
465                              'NetRecv'   : 'Network1: Received Bytes',
466                              'NetRecvPkts' : 'Network1: Received Packets',
467                              'NetSent'   : 'Network1: Transmitted Bytes',
468                              'NetSentPkts' : 'Network1: Transmitted Packets',
469                              'NetRX'     : 'Network2: Received Bytes',
470                              'NetWX'     : 'Network2: Transmitted Bytes',
471                              'InOctets'  : 'Network3: InOctets',
472                              'OutOctets' : 'Network3: OutOctets',
473                              'BlockR'    : 'Block Reads',
474                              'BlockW'    : 'Block Writes'
475                          }
476          self.col2units = {  'CPUPerc'   : '%',
477                              'VmPeak'    : 'MiB',
478                              'MemUse'    : 'MiB',
479                              'VmSize'    : 'MiB',
480                              'VmRSS'     : 'MiB',
481                              'VmData'    : 'MiB',
482                              'VmStk'     : 'MiB',
483                              'NetRecv'   : 'MiB',
484                              'NetRecvPkts' : 'Packets',
485                              'NetSent'   : 'MiB',
486                              'NetSentPkts' : 'Packets',
487                              'NetRX'   : 'MiB',
488                              'NetWX'   : 'MiB',
489                              'InOctets'  : 'MiB',
490                              'OutOctets' : 'MiB',
491                              'BlockR'    : 'MiB',
492                              'BlockW'    : 'MiB'
493                          }
494          self.cols = ['CPUPerc', 'VmPeak', 'MemUse', 'VmSize', 'VmRSS', 'VmData', 'VmStk',
495                              'RxBytes', 'RxPackets', 'TxBytes', 'TxPackets', 'NetRecv', 'NetSent',
496                              'InOctets', 'OutOctets', 'BlockR', 'BlockW']
497  
498      def process_data(self):
499          if not path_ok(Path(self.fname)):
500              sys.exit(0)
501          self.set_summary()
502          self.df = pd.read_csv(self.fname, header=0,  comment='#',
503                  skipinitialspace = True, delimiter=r"\s+",
504                  usecols= ['EpochId', 'PID', 'TimeStamp',
505                      'ContainerName', 'ContainerID', 'NodeName',
506                      'VmPeak', 'VmPeakUnit', 'VmSize', 'VmSizeUnit',
507                      'VmRSS', 'VmRSSUnit', 'VmData','VmDataUnit', 'VmStk', 'VmStkUnit',
508                      'HostVIF', 'NetSent', 'NetSentPkts', 'NetRecv', 'NetRecvPkts',
509                      #'VETH', 'InOctets', 'OutOctets',
510                      #'DockerVIF', 'NetRecv', 'NetSent',
511                      #'VETH',  'InOctets', 'OutOctets',
512                      'BlockR', 'BlockW',
513                      'CPUPerc'])
514          self.post_process()
515          self.remove_incomplete_samples(grp='Key')
516          self.df.to_csv(f'{self.oprefix}-cleaned.csv', sep='/')
517  
518      # normalise the units
519      def post_process(self):
520          #h2b = Human2BytesConverter()
521          for size in ['VmPeak', 'VmSize','VmRSS', 'VmData','VmStk']:
522              self.df[size] = self.df[size].map(lambda x: x/1024) # MiBs
523          for size in ['NetRecv','NetSent']:
524              self.df[size] = self.df[size].map(lambda x: x/(1024*1024)) # MiBs
525          for size in ['BlockR', 'BlockW']:
526              self.df[size] = self.df[size].map(lambda x: x/(1024*1024)) # MiBs
527          #self.df['Key'] = self.df['ContainerName'].map(lambda x: x.split("--")[0])
528          self.df['Key'] = self.df['NodeName']#.map(lambda x: x.split("--")[0])
529          self.df['MemUse'] = self.df['VmPeak']
530          self.build_key2nodes()
531          #self.df.rename(columns={'NodeName': 'Key'}, inplace=True)
532          self.set_keys()
533          self.df.fillna(0)
534  
535      # not very helpful plots atm
536      def plot_clusters(self, grp, agg, axes):
537          self.cluster_plot_helper(col=['CPUPerc', 'VmPeak', 'VmRSS', 'MemUse', 'VmData'], grp=grp, axes=axes)
538  
539  
540  # sanity check config file
541  def _config_file_callback(ctx: typer.Context, param: typer.CallbackParam, cfile: str):
542      if cfile:
543          typer.echo(f"Loading config file: {os.path.basename(cfile)}")
544          ctx.default_map = ctx.default_map or {}  # Init the default map
545          try:
546              with open(cfile, 'r') as f:  # Load config file
547                  conf = json.load(f)
548                  if "plotting" not in conf:
549                      log.error(f"No plotting is requested in {cfile}. Skipping plotting.")
550                      sys.exit(0)
551                  # Merge config and default_map
552                  if ctx.command.name in conf["plotting"]:
553                      ctx.default_map.update(conf["plotting"][ctx.command.name])
554                  else:
555                      if "hproc" in conf["plotting"]:
556                          ctx.default_map.update(conf["plotting"]["hproc"])
557                      else:
558                          log.info(f"No dstats/host-proc params in config. Sticking to defaults")
559              #ctx.default_map.update(conf["plotting"])  # Merge config and default_map
560          except Exception as ex:
561              raise typer.BadParameter(str(ex))
562      return cfile
563  
564  
565  # instantiate typer and set the commands
566  app = typer.Typer()
567  
568  # common cmd processor/plotter
569  def cmd_helper(metric_infra, to_plot, agg, to_compare):
570      metric_infra.process_data()
571      # always plot the compare plots; rest on demand
572      metric_infra.set_compare(to_compare)
573      metric_infra.plot_compare_panel()
574      log.info(f'to_plot : {to_plot}')
575      if "Network" in to_plot and to_plot["Network"]:
576          metric_infra.read_network()
577          metric_infra.plot_network()
578      if "ColPanel" in to_plot and to_plot["ColPanel"]:
579          metric_infra.plot_col_panels(agg)
580      if "ValueCluster" in to_plot and to_plot["ValueCluster"]:
581          # TODO: find interpretable cluster plot
582          metric_infra.build_cluster_index('ContainerID')
583      if "DegColPanel" in to_plot:
584          metric_infra.plot_deg_col_panels()
585      if "SettlingTime" in to_plot and to_plot["SettlingTime"]:
586          metric_infra.compute_msg_settling_times()
587          metric_infra.plot_msg_settling_times()
588      #if "Compare" in to_plot and to_plot["Compare"]:
589  
590  
591  # process / plot docker-procfs.out
592  @app.command()
593  def host_proc(ctx: typer.Context, log_dir: Path, # <- mandatory path
594              out_prefix: str = typer.Option("output", help="Specify the prefix for the plot pdfs"),
595              aggregate: bool = typer.Option(True, help="Specify whether to aggregate"),
596              config_file: str = typer.Option("", callback=_config_file_callback, is_eager=True,
597                  help="Set the input config file (JSON)")):
598      if not path_ok(log_dir, True):
599          sys.exit(0)
600  
601      to_plot = ctx.default_map["to_plot"] if ctx.default_map and "to_plot" in ctx.default_map else []
602      jf = f'{os.path.abspath(log_dir)}/config/topology_generated/network_data.json'
603      if  os.path.exists("plots"):
604          os.system('rm -rf plots')
605      os.makedirs("plots")
606      host_proc = HostProc(log_dir, f'plots/{out_prefix}-host-proc', jf, to_plot, config_file)
607      cmd_helper(host_proc, to_plot, agg=aggregate,
608              to_compare=["CPUPerc", "MemUse", "NetRecv", "NetSent", "BlockR", "BlockW"])
609      log.info(f'Done: {log_dir}')
610  
611  
612  # process / plot docker-dstats.out
613  @app.command()
614  def dstats(ctx: typer.Context, log_dir: Path, # <- mandatory path
615              out_prefix: str = typer.Option("output", help="Specify the prefix for the plot pdfs"),
616              aggregate: bool = typer.Option(True, help="Specify whether to aggregate"),
617              config_file: str = typer.Option("", callback=_config_file_callback, is_eager=True,
618               help="Set the input config file (JSON)")):
619      if not path_ok(log_dir, True):
620          sys.exit(0)
621  
622      to_plot = ctx.default_map["to_plot"] if ctx.default_map and "to_plot" in ctx.default_map else []
623      jf = f'{os.path.abspath(log_dir)}/config/topology_generated/network_data.json'
624      if  os.path.exists("plots"):
625          os.system('rm -rf plots')
626      os.makedirs("plots")
627      dstats = DStats(log_dir, f'plots/{out_prefix}-dstats', jf, to_plot, config_file)
628      cmd_helper(dstats, to_plot, agg=aggregate,
629              to_compare=["CPUPerc", "MemUse", "NetRecv", "NetSent", "BlockR", "BlockW"])
630      log.info(f'Done: {log_dir}')
631  
632  
633  if __name__ == "__main__":
634      app()