/ src / analysis / mesh_analysis / readers / victoria_reader.py
victoria_reader.py
  1  # Python Imports
  2  import json
  3  import logging
  4  import re
  5  from typing import Dict, Iterator, List, Optional, Tuple
  6  
  7  import pandas as pd
  8  import requests
  9  from httpx import Response
 10  from result import Err, Ok, Result
 11  
 12  # Project Imports
 13  from src.analysis.mesh_analysis.readers.reader import Reader
 14  from src.analysis.mesh_analysis.readers.tracers.message_tracer import MessageTracer
 15  
 16  logger = logging.getLogger(__name__)
 17  
 18  
 19  class VictoriaReader(Reader):
 20      def __init__(
 21          self,
 22          tracer: Optional[MessageTracer],
 23          victoria_config_query: Dict,
 24          extra_fields: Optional[List[str]] = None,
 25      ):
 26          """
 27          :param tracer: MessageTracer instance to retrieve raw message patterns from Victoria.
 28          :param victoria_config_query: Configuration for the Victoria query. This allows to do a first filtering by the
 29          monitoring stack retrieving only the lines we are interested in, saving time in the parsing process.
 30          """
 31          self._tracer: MessageTracer = tracer
 32          self._config_query = victoria_config_query
 33  
 34      def _fetch_data(self, url: str, headers: Dict, params: Dict, extra_fields: List[str]):
 35          logs = []
 36          logger.debug(f"Fetching {params}")
 37          with requests.post(url=url, headers=headers, params=params, stream=True) as response:
 38              for line in response.iter_lines():
 39                  if line:
 40                      try:
 41                          parsed_object = json.loads(line)
 42                      except json.decoder.JSONDecodeError as e:
 43                          logger.info(line)
 44                          exit()
 45                      logs.append(
 46                          (parsed_object["_msg"],) + tuple(parsed_object[k] for k in extra_fields)
 47                      )
 48          logger.debug(f"Fetched {len(logs)} log lines")
 49  
 50          return logs
 51  
 52      def make_queries(self) -> List[List[Tuple]]:
 53          """
 54          This function returns a list of lists, structured hierarchically as follows:
 55          - The outer list corresponds to the pattern groups.
 56          - Each element in the outer list is a list representing a specific pattern group.
 57          - Within each pattern group list, there are sublists, one for each pattern in that group.
 58          - Each sublist contains the lines that match the corresponding pattern.
 59  
 60          The result is organized as [pattern_groups -> patterns -> matched_lines], where:
 61          - Each pattern group can have multiple patterns.
 62          - Each pattern can match multiple lines.
 63          """
 64          params = self._config_query["params"]
 65          if isinstance(params, Dict):
 66              params = [params]
 67  
 68          results = [[] for _ in self._tracer.patterns]
 69          for i, pattern_group in enumerate(self._tracer.patterns):
 70              logs = self._fetch_data(
 71                  self._config_query["url"],
 72                  self._config_query["headers"],
 73                  params[i],
 74                  self._tracer.extra_fields,
 75              )
 76              query_results = [[] for _ in pattern_group.trace_pairs]
 77              for log_line in logs:
 78                  for j, trace_pair in enumerate(pattern_group.trace_pairs):
 79                      pattern = trace_pair.regex
 80                      match = re.search(pattern, log_line[0])
 81                      if match:
 82                          match_as_list = list(match.groups())
 83                          match_as_list.extend(log_line[1:])
 84                          query_results[j].append(match_as_list)
 85  
 86              results[i].extend(query_results)
 87  
 88          return results
 89  
 90      def get_dataframes(self) -> Dict[str, List[pd.DataFrame]]:
 91          results = self.make_queries()
 92          dfs = self._tracer.trace(results)
 93  
 94          return dfs
 95  
 96      def single_query_info(self) -> Result[Dict, Response]:
 97          response = requests.post(**self._config_query)
 98          if response.status_code != 200:
 99              logger.error(f"Request failed with status code: {response.status_code}")
100              return Err(response)
101  
102          try:
103              data = response.json()
104              return Ok(data)
105          except json.decoder.JSONDecodeError as e:
106              logger.error(f"Failed to decode JSON: {e}")
107              logger.error(f"Response content: {response.content}")
108  
109              return Err(response)
110  
111      def multiline_query_info(self) -> Result[Iterator, str]:
112          response = requests.post(
113              self._config_query["url"],
114              headers=self._config_query["headers"],
115              params=self._config_query["params"],
116          )
117          if response.status_code != 200:
118              logger.error(f"Request failed with status code: {response.status_code}")
119              return Err(response.text)
120  
121          try:
122              data = response.iter_lines()
123              return Ok(data)
124          except json.decoder.JSONDecodeError as e:
125              logger.error(f"Failed to decode JSON: {e}")
126              logger.error(f"Response content: {response.content}")
127  
128              return Err(response.text)