/ log_multi_analysis.py
log_multi_analysis.py
1 import asyncio 2 import json 3 import logging 4 import os 5 import sys 6 import traceback 7 from collections import defaultdict 8 from contextlib import contextmanager 9 from pathlib import Path 10 from typing import Any, Iterator, Optional, Self 11 12 from pydantic import BaseModel 13 14 sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) 15 sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))) 16 sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "deployments"))) 17 18 from src.analysis.mesh_analysis.analyzers.analyzer import Analyzer 19 from src.analysis.mesh_analysis.analyzers.data_puller import DataPuller 20 from src.analysis.mesh_analysis.analyzers.nimlibp2p_analyzer import Nimlibp2pAnalyzer 21 22 logger = logging.getLogger(__name__) 23 24 25 @contextmanager 26 def extra_log_handler(logger, handler, level=logging.INFO): 27 logging.getLogger().setLevel(level) 28 logger.addHandler(handler) 29 try: 30 yield 31 finally: 32 for handler in logger.handlers: 33 handler.flush() 34 logger.removeHandler(handler) 35 handler.close() 36 37 38 @contextmanager 39 def log_to_path(log_path): 40 """ 41 Warning: Removes previous log. 42 """ 43 try: 44 os.makedirs(log_path.parent, exist_ok=True) 45 os.remove(log_path) 46 except Exception: 47 pass 48 49 file_handler = logging.FileHandler(log_path) 50 formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s") 51 file_handler.setFormatter(formatter) 52 file_handler.setLevel(logging.INFO) 53 54 current_level = logger.getEffectiveLevel() 55 with extra_log_handler(logging.getLogger(), file_handler, current_level): 56 yield 57 58 59 def setup_logger(): 60 level = logging.DEBUG 61 logging.getLogger().setLevel(level) 62 stream_handler = logging.StreamHandler(sys.stdout) 63 stream_handler.setLevel(level) 64 logging.getLogger().addHandler(stream_handler) 65 66 67 def get_folders(base_dir: Path, file_name: str) -> Iterator[str]: 68 """Yield folders under `base_dir` containing files with the given `file_name`""" 69 for dirpath, _dirnames, filenames in os.walk(base_dir): 70 if file_name in filenames: 71 yield os.path.relpath(dirpath, base_dir) 72 73 74 def get_experiments(*, experiment_name: Optional[str] = None) -> Iterator[dict]: 75 start_dir = Path("../deployments/") 76 subdirs = [] 77 78 # Relative paths to parent of experiments. 79 start_subs = [ 80 # TODO: Put your paths here. 81 ] 82 83 for start_sub in start_subs: 84 subdirs += [ 85 start_sub / path for path in get_folders(start_dir / start_sub, "metadata.json") 86 ] 87 for d in subdirs: 88 logger.info(d) 89 90 logger.info(f"found {len(subdirs)} paths") 91 for path in subdirs: 92 logger.info(f"path: {path}") 93 94 for subdir in subdirs: 95 try: 96 metadata_log_path = start_dir / subdir / "metadata.json" 97 logger.info(f"Events log path: {metadata_log_path}") 98 with open(metadata_log_path, "r", encoding="utf-8") as f: 99 exp = json.load(f) 100 yield exp 101 except Exception as e: 102 full_trace = traceback.format_exc() 103 logger.error(f"exception: {full_trace}") 104 raise 105 106 107 class StackGen: 108 _args: dict = {} 109 110 def vaclab(self) -> Self: 111 self._args.update( 112 { 113 "url": "https://vlselect.lab.vac.dev/select/logsql/query", 114 "type": "vaclab", 115 "reader": "victoria", 116 } 117 ) 118 return self 119 120 def local(self, folder) -> Self: 121 self._args.update({"local_folder": folder}) 122 return self 123 124 def with_experiment(self, exp: dict) -> Self: 125 self._args.update(exp["stack"]) 126 return self 127 128 def build(self) -> dict: 129 required_fields = [ 130 "type", 131 "url", 132 "start_time", 133 "end_time", 134 "stateful_sets", 135 "nodes_per_statefulset", 136 "container_name", 137 "extra_fields", 138 ] 139 for key in required_fields: 140 assert key in self._args, f"Missing key in stack. key: `{key}` stack: `{self._args}`" 141 142 return self._args 143 144 145 async def process_experiment(exp: dict) -> dict: 146 exp_name = exp["stack"]["name"] 147 base_data_path = Path("local_data/simulations_data/") 148 log_path = base_data_path / exp_name / "out.log" 149 150 results_dict = {"exp": exp} 151 with log_to_path(log_path): 152 logger.info(f"log_path: {log_path}") 153 logger.info(f"Processing experiment: {exp_name}\n") 154 exp["stack"] = StackGen().vaclab().with_experiment(exp).build() 155 new_analyzer = get_analyzer_for_dev_testing(exp) 156 results_dict["results"] = new_analyzer.run() 157 158 return results_dict 159 160 161 def get_analyzer_for_dev_testing(exp) -> Analyzer: 162 stack = exp["stack"] 163 params = exp["params"] 164 data_puller = DataPuller().with_kwargs(stack) 165 stateful_sets = stack["stateful_sets"] 166 nodes_per_statefulset = stack["nodes_per_statefulset"] 167 168 # TODO: We search for all StatefulSets in our exp["stack"], 169 # but the bootstrap nodes are relay=False, so they are filtered here. 170 reliability = [ 171 ss 172 for ss in zip(stack["stateful_sets"], stack["nodes_per_statefulset"]) 173 if "bootstrap" not in ss[0] 174 ] 175 176 return ( 177 Nimlibp2pAnalyzer() 178 .with_data_puller(data_puller) 179 .with_ss_check(stateful_sets, nodes_per_statefulset) 180 .with_reliability_check( 181 stateful_sets=[ss[0] for ss in reliability], 182 nodes_per_ss=[ss[1] for ss in reliability], 183 expected_num_peers=params["num_nodes"], 184 expected_num_messages=params["num_messages"], 185 ) 186 .with_dump_analysis_dir(f"local_data/simulations_data/{exp['stack']['name']}/") 187 ) 188 189 190 def unravel(obj: Any) -> Any: 191 if isinstance(obj, BaseModel): 192 return unravel(obj.model_dump()) 193 elif isinstance(obj, dict): 194 return {key: unravel(value) for key, value in obj.items()} 195 elif isinstance(obj, (list, tuple)): 196 return type(obj)(unravel(item) for item in obj) 197 else: 198 return obj 199 200 201 async def main(): 202 setup_logger() 203 204 all_statuses = defaultdict(int) 205 summary = defaultdict(int) 206 207 all_results = [] 208 for exp in get_experiments(): 209 try: 210 results = await process_experiment(exp) 211 all_results.append(results) 212 passed = True 213 for item in results["results"]: 214 all_statuses[item.status] += 1 215 if item.status != "passed": 216 passed = False 217 218 if passed: 219 summary["passed"] += 1 220 except Exception as e: 221 # Catch all exceptions so we can still print results table 222 # even if an experiment had a problem. 223 logger.error(f"exception: {e}") 224 full_trace = traceback.format_exc() 225 logger.error(f"exception: {full_trace}") 226 227 logger.info(f"=== All Results ===\n{json.dumps(unravel(all_results), indent=2, default=str)}") 228 not_passed = len(all_results) - summary["passed"] 229 logger.info(f"Passed: {summary['passed']}\nNot Passed: {not_passed}\nTotal: {len(all_results)}") 230 status_str = "\n".join([f"{key}: {value}" for key, value in all_statuses.items()]) 231 logger.info(f"=== Statuses === \n{status_str}") 232 233 if not_passed: 234 logger.error("At least one check failed!") 235 236 237 if __name__ == "__main__": 238 asyncio.run(main())