/ analyze.py
analyze.py
1 import pandas as pd 2 from rich.console import Console 3 from rich.table import Table 4 import questionary 5 import datetime 6 import os 7 import argparse 8 from prompt_toolkit.application import Application 9 from prompt_toolkit.key_binding import KeyBindings 10 from prompt_toolkit.layout import Layout, HSplit 11 from prompt_toolkit.layout.controls import FormattedTextControl 12 from prompt_toolkit.layout.containers import Window 13 from prompt_toolkit.styles import Style 14 15 parser = argparse.ArgumentParser() 16 parser.add_argument("filepath", nargs="?", help="Path to the file (optional)") 17 args = parser.parse_args() 18 filepath = args.filepath or input("Enter the path to the log file (csv): ") 19 console = Console() 20 21 22 class DateTimePicker: 23 def __init__(self, initial=None, title=None): 24 self.now = initial or datetime.datetime.now() 25 self.title = title or "" 26 self.fields = ['year', 'month', 'day', 'hour', 'minute'] 27 self.position = 0 28 self.buffer = '' 29 self.values = { 30 'year': self.now.year, 31 'month': self.now.month, 32 'day': self.now.day, 33 'hour': self.now.hour, 34 'minute': self.now.minute 35 } 36 self.limits = { 37 'month': (1, 12), 38 'day': (1, 31), 39 'hour': (0, 23), 40 'minute': (0, 59) 41 } 42 43 self.title_control = FormattedTextControl(self._get_title_text) 44 self.error_control = FormattedTextControl(self._get_error_text) 45 self.display_control = FormattedTextControl(self._get_display_text) 46 47 self.app = Application( 48 layout=Layout( 49 HSplit([ 50 Window(content=self.title_control, height=1, align='center'), 51 Window(content=self.error_control, height=1, align='center'), 52 Window(content=self.display_control, height=1, align='center'), 53 ]) 54 ), 55 key_bindings=self._build_key_bindings(), 56 style=Style.from_dict({ 57 'selected': 'reverse', 58 'error': 'bold red', 59 'invalid': 'red bold', 60 'title': 'bold underline', 61 '': 'bold' 62 }), 63 full_screen=False, 64 erase_when_done=True 65 ) 66 67 def _get_title_text(self): 68 return [('class:title', f"→ Set {self.title} date/time")] if self.title else [('', '')] 69 70 def _get_error_text(self): 71 field = self.fields[self.position] 72 if field in self.limits: 73 val = self.values[field] 74 min_val, max_val = self.limits[field] 75 if not (min_val <= val <= max_val): 76 return [('class:error', f"{field.capitalize()} must be between {min_val}–{max_val}")] 77 return [('', '')] 78 79 def _get_display_text(self): 80 out = [] 81 for i, field in enumerate(self.fields): 82 val = self.values[field] 83 txt = f"{val:04}" if field == 'year' else f"{val:02}" 84 selected = i == self.position 85 invalid = False 86 if field in self.limits: 87 min_val, max_val = self.limits[field] 88 if not (min_val <= val <= max_val): 89 invalid = True 90 style = 'class:invalid' if invalid else 'class:selected' if selected else '' 91 out.append((style, f"[{txt}]")) 92 if field in ['year', 'month']: 93 out.append(('', '-')) 94 elif field == 'day': 95 out.append(('', ' ')) 96 elif field == 'hour': 97 out.append(('', ':')) 98 return out 99 100 def _build_key_bindings(self): 101 kb = KeyBindings() 102 103 @kb.add('left') 104 def _(event): self.position = (self.position - 1) % len(self.fields); self.buffer = '' 105 106 @kb.add('right') 107 def _(event): self.position = (self.position + 1) % len(self.fields); self.buffer = '' 108 109 @kb.add('up') 110 def _(event): self.values[self.fields[self.position]] += 1 111 112 @kb.add('down') 113 def _(event): self.values[self.fields[self.position]] = max(0, self.values[self.fields[self.position]] - 1) 114 115 for d in '0123456789': 116 @kb.add(d) 117 def _(event, digit=d): 118 self.buffer += digit 119 try: 120 val = int(self.buffer) 121 self.values[self.fields[self.position]] = val 122 except: 123 self.buffer = '' 124 125 @kb.add('enter') 126 def _(event): 127 for field in self.limits: 128 val = self.values[field] 129 min_val, max_val = self.limits[field] 130 if not (min_val <= val <= max_val): 131 return 132 event.app.exit(result=self._finalize()) 133 134 @kb.add('escape') 135 @kb.add('c-c') 136 def _(event): event.app.exit(result=None) 137 138 return kb 139 140 def _finalize(self): 141 try: 142 y, m, d = self.values['year'], self.values['month'], self.values['day'] 143 max_day = (datetime.datetime(y, m % 12 + 1, 1) - datetime.timedelta(days=1)).day 144 self.values['day'] = min(d, max_day) 145 return datetime.datetime( 146 y, m, self.values['day'], 147 self.values['hour'], 148 self.values['minute'] 149 ) 150 except: 151 return None 152 153 def run(self): 154 return self.app.run() 155 156 157 def load_data(): 158 if not os.path.exists(filepath): 159 return "error_not_found" 160 if os.path.getsize(filepath) == 0: 161 return "error_empty" 162 try: 163 df = pd.read_csv(filepath, quotechar='"', engine='python') 164 df.rename(columns={ 165 "Date": "date", "Method": "type", 166 "Status Code": "status", "Money/Error": "details" 167 }, inplace=True) 168 df['date'] = pd.to_datetime(df['date'], errors='coerce') 169 df.dropna(subset=['date'], inplace=True) 170 bad_types = {"error", "general", "general error", "none", "", None} 171 df = df[~df["type"].astype(str).str.lower().isin(bad_types)] 172 173 def extract_amount(detail, status): 174 if str(status) != "200": 175 return 0.0 176 try: 177 clean = str(detail).replace(".", "").replace(",", ".") 178 return float(clean) 179 except: 180 return 0.0 181 182 df['amount'] = df.apply(lambda row: extract_amount(row['details'], row['status']), axis=1) 183 return df 184 except Exception as e: 185 return f"error_parsing:{e}" 186 187 188 def get_filters(df): 189 all_types = sorted(df["type"].unique()) 190 selected_types = questionary.checkbox("Select types to include:", choices=all_types).ask() 191 if selected_types is None: return None 192 193 use_date = questionary.confirm("Filter by date range?", default=False).ask() 194 start_date = end_date = None 195 if use_date: 196 start_date = DateTimePicker(title="START").run() 197 end_date = DateTimePicker(title="END").run() 198 return {"types": selected_types, "start_date": start_date, "end_date": end_date} 199 200 201 def summarize(df): 202 summary = {} 203 all_error_count, all_money = 0, 0.0 204 all_status_counts = {} 205 for typ in df['type'].unique(): 206 sub = df[df['type'] == typ] 207 error_count = sub[sub['status'] == 'ERROR'].shape[0] 208 status_counts = sub['status'].value_counts().to_dict() 209 total_money = sub['amount'].sum() 210 summary[typ] = { 211 "errors": error_count, 212 "statuses": status_counts, 213 "money": total_money 214 } 215 all_error_count += error_count 216 all_money += total_money 217 for stat, count in status_counts.items(): 218 all_status_counts[stat] = all_status_counts.get(stat, 0) + count 219 summary["ALL"] = { 220 "errors": all_error_count, 221 "statuses": all_status_counts, 222 "money": all_money 223 } 224 return summary 225 226 227 def print_summary(summary, filters): 228 table = Table(title="Detailed Log Summary") 229 table.add_column("Type", style="cyan") 230 table.add_column("Error Count", justify="right", style="red") 231 table.add_column("Status Codes", justify="left", style="yellow") 232 table.add_column("Total Money", justify="right", style="green") 233 for typ, data in summary.items(): 234 status_str = ", ".join([f"{k}:{v}" for k, v in data["statuses"].items()]) 235 table.add_row( 236 typ, 237 str(data["errors"]), 238 status_str or "-", 239 f"{data['money']:,.2f}" 240 ) 241 console.print() 242 if filters.get("start_date") and filters.get("end_date"): 243 console.print(f"[bold white]Start date:[/] {filters['start_date']} " 244 f"[bold white]End date:[/] {filters['end_date']}\n") 245 console.print(table) 246 247 248 def main(): 249 df = load_data() 250 if isinstance(df, str): 251 if df == "error_not_found": console.print("[red]ERROR:[/red] File not found.") 252 elif df == "error_empty": console.print("[red]ERROR:[/red] File is empty.") 253 elif df.startswith("error_parsing"): 254 console.print("[red]ERROR:[/red] Failed to parse CSV.") 255 console.print(f"[yellow]{df.split(':', 1)[1]}[/yellow]") 256 return 257 258 while True: 259 filters = get_filters(df) 260 if filters is None: 261 console.print("[bold]Goodbye.[/bold]") 262 break 263 264 fdf = df[df['type'].isin(filters["types"])] 265 if filters['start_date']: 266 fdf = fdf[fdf['date'] >= filters['start_date']] 267 if filters['end_date']: 268 fdf = fdf[fdf['date'] <= filters['end_date']] 269 270 if fdf.empty: 271 console.print("[yellow]No records found for selected filters.[/yellow]") 272 else: 273 summary = summarize(fdf) 274 print_summary(summary, filters) 275 276 again = questionary.confirm("Analyze again?", default=False).ask() 277 if not again: 278 break 279 280 281 if __name__ == "__main__": 282 main()