vaclab_stack_analysis.py
1 # Python Imports 2 import logging 3 import traceback 4 from concurrent.futures import ProcessPoolExecutor, as_completed 5 from pathlib import Path 6 from typing import Dict, List, Optional, Tuple 7 8 import pandas as pd 9 from pydantic import NonNegativeInt 10 from result import Err, Ok, Result 11 12 from src.analysis.mesh_analysis.readers.builders.victoria_reader_builder import ( 13 VictoriaReaderBuilder, 14 ) 15 from src.analysis.mesh_analysis.stacks.stack_analysis import StackAnalysis 16 from src.analysis.utils import path_utils 17 18 logger = logging.getLogger(__name__) 19 20 21 class VaclabStackAnalysis(StackAnalysis): 22 def __init__(self, reader_builder: VictoriaReaderBuilder, **kwargs): 23 super().__init__(reader_builder, **kwargs) 24 25 def get_all_node_dataframes( 26 self, stateful_sets: List[str], nodes_per_stateful_set: List[NonNegativeInt], n_jobs: int 27 ) -> List[Dict[str, List[pd.DataFrame]]]: 28 dfs = [] 29 30 for stateful_set_name, num_nodes_in_stateful_set in zip( 31 stateful_sets, nodes_per_stateful_set 32 ): 33 with ProcessPoolExecutor(n_jobs) as executor: 34 futures = { 35 executor.submit( 36 self._extract_dataframe_single_node, stateful_set_name, node_index 37 ): node_index 38 for node_index in range(num_nodes_in_stateful_set) 39 } 40 41 for i, future in enumerate(as_completed(futures)): 42 i = i + 1 43 try: 44 statefulset_name, node_index, df_dict = future.result() 45 dfs.append(df_dict) 46 if i % 50 == 0 or i == num_nodes_in_stateful_set: 47 logger.info( 48 f"Processed {statefulset_name}-{node_index} {i}/{num_nodes_in_stateful_set} nodes in stateful set <{stateful_set_name}>" 49 ) 50 51 except Exception as e: 52 logger.error(f"Error retrieving logs for node {futures[future]}: {e}") 53 logger.error(traceback.format_exc()) 54 55 return dfs 56 57 def _extract_dataframe_single_node( 58 self, statefulset_name: str, node_index: int 59 ) -> Tuple[str, int, Dict[str, List[pd.DataFrame]]]: 60 reader = self._reader_builder.build_with_queries(statefulset_name, node_index) 61 data = reader.get_dataframes() 62 return statefulset_name, node_index, data 63 64 def _dump_logs_for_single_node(self, node: str, dump_path: Path) -> Result[Path, None]: 65 reader = self._reader_builder.build_with_single_query(node) 66 data = reader.make_queries() 67 68 log_name_path = dump_path / f"{node}.log" 69 result = path_utils.prepare_path_for_file(log_name_path) 70 if result.is_ok(): 71 with open(log_name_path, "w") as file: 72 for element in data[0][0]: # We will always have 1 pattern group with 1 pattern 73 file.write(f"{element}\n") 74 75 return Ok(log_name_path) 76 else: 77 return Err(None) 78 79 def get_number_nodes(self, stateful_sets: List[str]) -> List[int]: 80 num_nodes_per_stateful_set = [] 81 for stateful_set_prefix in stateful_sets: 82 reader = self._reader_builder.build_with_single_query( 83 stateful_set_prefix, uniq_by="|uniq by (kubernetes.pod_name)" 84 ) 85 result = reader.multiline_query_info() 86 if result.is_ok(): 87 num_nodes_per_stateful_set.append(len(list(result.ok_value))) 88 else: 89 logger.error(result.err_value) 90 exit(1) 91 92 return num_nodes_per_stateful_set 93 94 def dump_node_logs(self, n_jobs: int, identifiers: List[str], dump_path: Path) -> None: 95 with ProcessPoolExecutor(n_jobs) as executor: 96 futures_map = { 97 executor.submit(self._dump_logs_for_single_node, identifier, dump_path): identifier 98 for identifier in identifiers 99 } 100 101 for future in as_completed(futures_map): 102 identifier = futures_map[future] 103 try: 104 result = future.result() 105 match result: 106 case Ok(log_path): 107 logger.info(f"Log for {identifier} dumped successfully: {log_path}") 108 case Err(_): 109 logger.warning( 110 f"Failed to dump logs for {identifier}: {result.err_value}" 111 ) 112 except Exception as e: 113 logger.error(f"Error retrieving logs for node {identifier}: {e}") 114 115 def get_pod_logs(self, identifier: str, *, order_by: Optional[str] = None) -> List[str]: 116 reader = self._reader_builder.build_with_pod_identifier(identifier, order_by=order_by) 117 data = reader.make_queries() 118 119 return data