victoria_reader.py
1 # Python Imports 2 import json 3 import logging 4 import re 5 from typing import Dict, Iterator, List, Optional, Tuple 6 7 import pandas as pd 8 import requests 9 from httpx import Response 10 from result import Err, Ok, Result 11 12 # Project Imports 13 from src.analysis.mesh_analysis.readers.reader import Reader 14 from src.analysis.mesh_analysis.readers.tracers.message_tracer import MessageTracer 15 16 logger = logging.getLogger(__name__) 17 18 19 class VictoriaReader(Reader): 20 def __init__( 21 self, 22 tracer: Optional[MessageTracer], 23 victoria_config_query: Dict, 24 extra_fields: Optional[List[str]] = None, 25 ): 26 """ 27 :param tracer: MessageTracer instance to retrieve raw message patterns from Victoria. 28 :param victoria_config_query: Configuration for the Victoria query. This allows to do a first filtering by the 29 monitoring stack retrieving only the lines we are interested in, saving time in the parsing process. 30 """ 31 self._tracer: MessageTracer = tracer 32 self._config_query = victoria_config_query 33 34 def _fetch_data(self, url: str, headers: Dict, params: Dict, extra_fields: List[str]): 35 logs = [] 36 logger.debug(f"Fetching {params}") 37 with requests.post(url=url, headers=headers, params=params, stream=True) as response: 38 for line in response.iter_lines(): 39 if line: 40 try: 41 parsed_object = json.loads(line) 42 except json.decoder.JSONDecodeError as e: 43 logger.info(line) 44 exit() 45 logs.append( 46 (parsed_object["_msg"],) + tuple(parsed_object[k] for k in extra_fields) 47 ) 48 logger.debug(f"Fetched {len(logs)} log lines") 49 50 return logs 51 52 def make_queries(self) -> List[List[Tuple]]: 53 """ 54 This function returns a list of lists, structured hierarchically as follows: 55 - The outer list corresponds to the pattern groups. 56 - Each element in the outer list is a list representing a specific pattern group. 57 - Within each pattern group list, there are sublists, one for each pattern in that group. 58 - Each sublist contains the lines that match the corresponding pattern. 59 60 The result is organized as [pattern_groups -> patterns -> matched_lines], where: 61 - Each pattern group can have multiple patterns. 62 - Each pattern can match multiple lines. 63 """ 64 params = self._config_query["params"] 65 if isinstance(params, Dict): 66 params = [params] 67 68 results = [[] for _ in self._tracer.patterns] 69 for i, pattern_group in enumerate(self._tracer.patterns): 70 logs = self._fetch_data( 71 self._config_query["url"], 72 self._config_query["headers"], 73 params[i], 74 self._tracer.extra_fields, 75 ) 76 query_results = [[] for _ in pattern_group.trace_pairs] 77 for log_line in logs: 78 for j, trace_pair in enumerate(pattern_group.trace_pairs): 79 pattern = trace_pair.regex 80 match = re.search(pattern, log_line[0]) 81 if match: 82 match_as_list = list(match.groups()) 83 match_as_list.extend(log_line[1:]) 84 query_results[j].append(match_as_list) 85 86 results[i].extend(query_results) 87 88 return results 89 90 def get_dataframes(self) -> Dict[str, List[pd.DataFrame]]: 91 results = self.make_queries() 92 dfs = self._tracer.trace(results) 93 94 return dfs 95 96 def single_query_info(self) -> Result[Dict, Response]: 97 response = requests.post(**self._config_query) 98 if response.status_code != 200: 99 logger.error(f"Request failed with status code: {response.status_code}") 100 return Err(response) 101 102 try: 103 data = response.json() 104 return Ok(data) 105 except json.decoder.JSONDecodeError as e: 106 logger.error(f"Failed to decode JSON: {e}") 107 logger.error(f"Response content: {response.content}") 108 109 return Err(response) 110 111 def multiline_query_info(self) -> Result[Iterator, str]: 112 response = requests.post( 113 self._config_query["url"], 114 headers=self._config_query["headers"], 115 params=self._config_query["params"], 116 ) 117 if response.status_code != 200: 118 logger.error(f"Request failed with status code: {response.status_code}") 119 return Err(response.text) 120 121 try: 122 data = response.iter_lines() 123 return Ok(data) 124 except json.decoder.JSONDecodeError as e: 125 logger.error(f"Failed to decode JSON: {e}") 126 logger.error(f"Response content: {response.content}") 127 128 return Err(response.text)