/ log_multi_analysis.py
log_multi_analysis.py
  1  import asyncio
  2  import json
  3  import logging
  4  import os
  5  import sys
  6  import traceback
  7  from collections import defaultdict
  8  from contextlib import contextmanager
  9  from pathlib import Path
 10  from typing import Any, Iterator, Optional, Self
 11  
 12  from pydantic import BaseModel
 13  
 14  sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
 15  sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")))
 16  sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "deployments")))
 17  
 18  from src.analysis.mesh_analysis.analyzers.analyzer import Analyzer
 19  from src.analysis.mesh_analysis.analyzers.data_puller import DataPuller
 20  from src.analysis.mesh_analysis.analyzers.nimlibp2p_analyzer import Nimlibp2pAnalyzer
 21  
 22  logger = logging.getLogger(__name__)
 23  
 24  
 25  @contextmanager
 26  def extra_log_handler(logger, handler, level=logging.INFO):
 27      logging.getLogger().setLevel(level)
 28      logger.addHandler(handler)
 29      try:
 30          yield
 31      finally:
 32          for handler in logger.handlers:
 33              handler.flush()
 34          logger.removeHandler(handler)
 35          handler.close()
 36  
 37  
 38  @contextmanager
 39  def log_to_path(log_path):
 40      """
 41      Warning: Removes previous log.
 42      """
 43      try:
 44          os.makedirs(log_path.parent, exist_ok=True)
 45          os.remove(log_path)
 46      except Exception:
 47          pass
 48  
 49      file_handler = logging.FileHandler(log_path)
 50      formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
 51      file_handler.setFormatter(formatter)
 52      file_handler.setLevel(logging.INFO)
 53  
 54      current_level = logger.getEffectiveLevel()
 55      with extra_log_handler(logging.getLogger(), file_handler, current_level):
 56          yield
 57  
 58  
 59  def setup_logger():
 60      level = logging.DEBUG
 61      logging.getLogger().setLevel(level)
 62      stream_handler = logging.StreamHandler(sys.stdout)
 63      stream_handler.setLevel(level)
 64      logging.getLogger().addHandler(stream_handler)
 65  
 66  
 67  def get_folders(base_dir: Path, file_name: str) -> Iterator[str]:
 68      """Yield folders under `base_dir` containing files with the given `file_name`"""
 69      for dirpath, _dirnames, filenames in os.walk(base_dir):
 70          if file_name in filenames:
 71              yield os.path.relpath(dirpath, base_dir)
 72  
 73  
 74  def get_experiments(*, experiment_name: Optional[str] = None) -> Iterator[dict]:
 75      start_dir = Path("../deployments/")
 76      subdirs = []
 77  
 78      # Relative paths to parent of experiments.
 79      start_subs = [
 80          # TODO: Put your paths here.
 81      ]
 82  
 83      for start_sub in start_subs:
 84          subdirs += [
 85              start_sub / path for path in get_folders(start_dir / start_sub, "metadata.json")
 86          ]
 87      for d in subdirs:
 88          logger.info(d)
 89  
 90      logger.info(f"found {len(subdirs)} paths")
 91      for path in subdirs:
 92          logger.info(f"path: {path}")
 93  
 94      for subdir in subdirs:
 95          try:
 96              metadata_log_path = start_dir / subdir / "metadata.json"
 97              logger.info(f"Events log path: {metadata_log_path}")
 98              with open(metadata_log_path, "r", encoding="utf-8") as f:
 99                  exp = json.load(f)
100              yield exp
101          except Exception as e:
102              full_trace = traceback.format_exc()
103              logger.error(f"exception: {full_trace}")
104              raise
105  
106  
107  class StackGen:
108      _args: dict = {}
109  
110      def vaclab(self) -> Self:
111          self._args.update(
112              {
113                  "url": "https://vlselect.lab.vac.dev/select/logsql/query",
114                  "type": "vaclab",
115                  "reader": "victoria",
116              }
117          )
118          return self
119  
120      def local(self, folder) -> Self:
121          self._args.update({"local_folder": folder})
122          return self
123  
124      def with_experiment(self, exp: dict) -> Self:
125          self._args.update(exp["stack"])
126          return self
127  
128      def build(self) -> dict:
129          required_fields = [
130              "type",
131              "url",
132              "start_time",
133              "end_time",
134              "stateful_sets",
135              "nodes_per_statefulset",
136              "container_name",
137              "extra_fields",
138          ]
139          for key in required_fields:
140              assert key in self._args, f"Missing key in stack. key: `{key}` stack: `{self._args}`"
141  
142          return self._args
143  
144  
145  async def process_experiment(exp: dict) -> dict:
146      exp_name = exp["stack"]["name"]
147      base_data_path = Path("local_data/simulations_data/")
148      log_path = base_data_path / exp_name / "out.log"
149  
150      results_dict = {"exp": exp}
151      with log_to_path(log_path):
152          logger.info(f"log_path: {log_path}")
153          logger.info(f"Processing experiment: {exp_name}\n")
154          exp["stack"] = StackGen().vaclab().with_experiment(exp).build()
155          new_analyzer = get_analyzer_for_dev_testing(exp)
156          results_dict["results"] = new_analyzer.run()
157  
158      return results_dict
159  
160  
161  def get_analyzer_for_dev_testing(exp) -> Analyzer:
162      stack = exp["stack"]
163      params = exp["params"]
164      data_puller = DataPuller().with_kwargs(stack)
165      stateful_sets = stack["stateful_sets"]
166      nodes_per_statefulset = stack["nodes_per_statefulset"]
167  
168      # TODO: We search for all StatefulSets in our exp["stack"],
169      # but the bootstrap nodes are relay=False, so they are filtered here.
170      reliability = [
171          ss
172          for ss in zip(stack["stateful_sets"], stack["nodes_per_statefulset"])
173          if "bootstrap" not in ss[0]
174      ]
175  
176      return (
177          Nimlibp2pAnalyzer()
178          .with_data_puller(data_puller)
179          .with_ss_check(stateful_sets, nodes_per_statefulset)
180          .with_reliability_check(
181              stateful_sets=[ss[0] for ss in reliability],
182              nodes_per_ss=[ss[1] for ss in reliability],
183              expected_num_peers=params["num_nodes"],
184              expected_num_messages=params["num_messages"],
185          )
186          .with_dump_analysis_dir(f"local_data/simulations_data/{exp['stack']['name']}/")
187      )
188  
189  
190  def unravel(obj: Any) -> Any:
191      if isinstance(obj, BaseModel):
192          return unravel(obj.model_dump())
193      elif isinstance(obj, dict):
194          return {key: unravel(value) for key, value in obj.items()}
195      elif isinstance(obj, (list, tuple)):
196          return type(obj)(unravel(item) for item in obj)
197      else:
198          return obj
199  
200  
201  async def main():
202      setup_logger()
203  
204      all_statuses = defaultdict(int)
205      summary = defaultdict(int)
206  
207      all_results = []
208      for exp in get_experiments():
209          try:
210              results = await process_experiment(exp)
211              all_results.append(results)
212              passed = True
213              for item in results["results"]:
214                  all_statuses[item.status] += 1
215                  if item.status != "passed":
216                      passed = False
217  
218              if passed:
219                  summary["passed"] += 1
220          except Exception as e:
221              # Catch all exceptions so we can still print results table
222              # even if an experiment had a problem.
223              logger.error(f"exception: {e}")
224              full_trace = traceback.format_exc()
225              logger.error(f"exception: {full_trace}")
226  
227      logger.info(f"=== All Results ===\n{json.dumps(unravel(all_results), indent=2, default=str)}")
228      not_passed = len(all_results) - summary["passed"]
229      logger.info(f"Passed: {summary['passed']}\nNot Passed: {not_passed}\nTotal: {len(all_results)}")
230      status_str = "\n".join([f"{key}: {value}" for key, value in all_statuses.items()])
231      logger.info(f"=== Statuses === \n{status_str}")
232  
233      if not_passed:
234          logger.error("At least one check failed!")
235  
236  
237  if __name__ == "__main__":
238      asyncio.run(main())