/ src / analysis / mesh_analysis / stacks / vaclab_stack_analysis.py
vaclab_stack_analysis.py
  1  # Python Imports
  2  import logging
  3  import traceback
  4  from concurrent.futures import ProcessPoolExecutor, as_completed
  5  from pathlib import Path
  6  from typing import Dict, List, Optional, Tuple
  7  
  8  import pandas as pd
  9  from pydantic import NonNegativeInt
 10  from result import Err, Ok, Result
 11  
 12  from src.analysis.mesh_analysis.readers.builders.victoria_reader_builder import (
 13      VictoriaReaderBuilder,
 14  )
 15  from src.analysis.mesh_analysis.stacks.stack_analysis import StackAnalysis
 16  from src.analysis.utils import path_utils
 17  
 18  logger = logging.getLogger(__name__)
 19  
 20  
 21  class VaclabStackAnalysis(StackAnalysis):
 22      def __init__(self, reader_builder: VictoriaReaderBuilder, **kwargs):
 23          super().__init__(reader_builder, **kwargs)
 24  
 25      def get_all_node_dataframes(
 26          self, stateful_sets: List[str], nodes_per_stateful_set: List[NonNegativeInt], n_jobs: int
 27      ) -> List[Dict[str, List[pd.DataFrame]]]:
 28          dfs = []
 29  
 30          for stateful_set_name, num_nodes_in_stateful_set in zip(
 31              stateful_sets, nodes_per_stateful_set
 32          ):
 33              with ProcessPoolExecutor(n_jobs) as executor:
 34                  futures = {
 35                      executor.submit(
 36                          self._extract_dataframe_single_node, stateful_set_name, node_index
 37                      ): node_index
 38                      for node_index in range(num_nodes_in_stateful_set)
 39                  }
 40  
 41                  for i, future in enumerate(as_completed(futures)):
 42                      i = i + 1
 43                      try:
 44                          statefulset_name, node_index, df_dict = future.result()
 45                          dfs.append(df_dict)
 46                          if i % 50 == 0 or i == num_nodes_in_stateful_set:
 47                              logger.info(
 48                                  f"Processed {statefulset_name}-{node_index} {i}/{num_nodes_in_stateful_set} nodes in stateful set <{stateful_set_name}>"
 49                              )
 50  
 51                      except Exception as e:
 52                          logger.error(f"Error retrieving logs for node {futures[future]}: {e}")
 53                          logger.error(traceback.format_exc())
 54  
 55          return dfs
 56  
 57      def _extract_dataframe_single_node(
 58          self, statefulset_name: str, node_index: int
 59      ) -> Tuple[str, int, Dict[str, List[pd.DataFrame]]]:
 60          reader = self._reader_builder.build_with_queries(statefulset_name, node_index)
 61          data = reader.get_dataframes()
 62          return statefulset_name, node_index, data
 63  
 64      def _dump_logs_for_single_node(self, node: str, dump_path: Path) -> Result[Path, None]:
 65          reader = self._reader_builder.build_with_single_query(node)
 66          data = reader.make_queries()
 67  
 68          log_name_path = dump_path / f"{node}.log"
 69          result = path_utils.prepare_path_for_file(log_name_path)
 70          if result.is_ok():
 71              with open(log_name_path, "w") as file:
 72                  for element in data[0][0]:  # We will always have 1 pattern group with 1 pattern
 73                      file.write(f"{element}\n")
 74  
 75              return Ok(log_name_path)
 76          else:
 77              return Err(None)
 78  
 79      def get_number_nodes(self, stateful_sets: List[str]) -> List[int]:
 80          num_nodes_per_stateful_set = []
 81          for stateful_set_prefix in stateful_sets:
 82              reader = self._reader_builder.build_with_single_query(
 83                  stateful_set_prefix, uniq_by="|uniq by (kubernetes.pod_name)"
 84              )
 85              result = reader.multiline_query_info()
 86              if result.is_ok():
 87                  num_nodes_per_stateful_set.append(len(list(result.ok_value)))
 88              else:
 89                  logger.error(result.err_value)
 90                  exit(1)
 91  
 92          return num_nodes_per_stateful_set
 93  
 94      def dump_node_logs(self, n_jobs: int, identifiers: List[str], dump_path: Path) -> None:
 95          with ProcessPoolExecutor(n_jobs) as executor:
 96              futures_map = {
 97                  executor.submit(self._dump_logs_for_single_node, identifier, dump_path): identifier
 98                  for identifier in identifiers
 99              }
100  
101              for future in as_completed(futures_map):
102                  identifier = futures_map[future]
103                  try:
104                      result = future.result()
105                      match result:
106                          case Ok(log_path):
107                              logger.info(f"Log for {identifier} dumped successfully: {log_path}")
108                          case Err(_):
109                              logger.warning(
110                                  f"Failed to dump logs for {identifier}: {result.err_value}"
111                              )
112                  except Exception as e:
113                      logger.error(f"Error retrieving logs for node {identifier}: {e}")
114  
115      def get_pod_logs(self, identifier: str, *, order_by: Optional[str] = None) -> List[str]:
116          reader = self._reader_builder.build_with_pod_identifier(identifier, order_by=order_by)
117          data = reader.make_queries()
118  
119          return data