/ analyze.py
analyze.py
  1  import pandas as pd
  2  from rich.console import Console
  3  from rich.table import Table
  4  import questionary
  5  import datetime
  6  import os
  7  import argparse
  8  from prompt_toolkit.application import Application
  9  from prompt_toolkit.key_binding import KeyBindings
 10  from prompt_toolkit.layout import Layout, HSplit
 11  from prompt_toolkit.layout.controls import FormattedTextControl
 12  from prompt_toolkit.layout.containers import Window
 13  from prompt_toolkit.styles import Style
 14  
 15  parser = argparse.ArgumentParser()
 16  parser.add_argument("filepath", nargs="?", help="Path to the file (optional)")
 17  args = parser.parse_args()
 18  filepath = args.filepath or input("Enter the path to the log file (csv): ")
 19  console = Console()
 20  
 21  
 22  class DateTimePicker:
 23      def __init__(self, initial=None, title=None):
 24          self.now = initial or datetime.datetime.now()
 25          self.title = title or ""
 26          self.fields = ['year', 'month', 'day', 'hour', 'minute']
 27          self.position = 0
 28          self.buffer = ''
 29          self.values = {
 30              'year': self.now.year,
 31              'month': self.now.month,
 32              'day': self.now.day,
 33              'hour': self.now.hour,
 34              'minute': self.now.minute
 35          }
 36          self.limits = {
 37              'month': (1, 12),
 38              'day': (1, 31),
 39              'hour': (0, 23),
 40              'minute': (0, 59)
 41          }
 42  
 43          self.title_control = FormattedTextControl(self._get_title_text)
 44          self.error_control = FormattedTextControl(self._get_error_text)
 45          self.display_control = FormattedTextControl(self._get_display_text)
 46  
 47          self.app = Application(
 48              layout=Layout(
 49                  HSplit([
 50                      Window(content=self.title_control, height=1, align='center'),
 51                      Window(content=self.error_control, height=1, align='center'),
 52                      Window(content=self.display_control, height=1, align='center'),
 53                  ])
 54              ),
 55              key_bindings=self._build_key_bindings(),
 56              style=Style.from_dict({
 57                  'selected': 'reverse',
 58                  'error': 'bold red',
 59                  'invalid': 'red bold',
 60                  'title': 'bold underline',
 61                  '': 'bold'
 62              }),
 63              full_screen=False,
 64              erase_when_done=True
 65          )
 66  
 67      def _get_title_text(self):
 68          return [('class:title', f"→ Set {self.title} date/time")] if self.title else [('', '')]
 69  
 70      def _get_error_text(self):
 71          field = self.fields[self.position]
 72          if field in self.limits:
 73              val = self.values[field]
 74              min_val, max_val = self.limits[field]
 75              if not (min_val <= val <= max_val):
 76                  return [('class:error', f"{field.capitalize()} must be between {min_val}–{max_val}")]
 77          return [('', '')]
 78  
 79      def _get_display_text(self):
 80          out = []
 81          for i, field in enumerate(self.fields):
 82              val = self.values[field]
 83              txt = f"{val:04}" if field == 'year' else f"{val:02}"
 84              selected = i == self.position
 85              invalid = False
 86              if field in self.limits:
 87                  min_val, max_val = self.limits[field]
 88                  if not (min_val <= val <= max_val):
 89                      invalid = True
 90              style = 'class:invalid' if invalid else 'class:selected' if selected else ''
 91              out.append((style, f"[{txt}]"))
 92              if field in ['year', 'month']:
 93                  out.append(('', '-'))
 94              elif field == 'day':
 95                  out.append(('', '   '))
 96              elif field == 'hour':
 97                  out.append(('', ':'))
 98          return out
 99  
100      def _build_key_bindings(self):
101          kb = KeyBindings()
102  
103          @kb.add('left')
104          def _(event): self.position = (self.position - 1) % len(self.fields); self.buffer = ''
105  
106          @kb.add('right')
107          def _(event): self.position = (self.position + 1) % len(self.fields); self.buffer = ''
108  
109          @kb.add('up')
110          def _(event): self.values[self.fields[self.position]] += 1
111  
112          @kb.add('down')
113          def _(event): self.values[self.fields[self.position]] = max(0, self.values[self.fields[self.position]] - 1)
114  
115          for d in '0123456789':
116              @kb.add(d)
117              def _(event, digit=d):
118                  self.buffer += digit
119                  try:
120                      val = int(self.buffer)
121                      self.values[self.fields[self.position]] = val
122                  except:
123                      self.buffer = ''
124  
125          @kb.add('enter')
126          def _(event):
127              for field in self.limits:
128                  val = self.values[field]
129                  min_val, max_val = self.limits[field]
130                  if not (min_val <= val <= max_val):
131                      return
132              event.app.exit(result=self._finalize())
133  
134          @kb.add('escape')
135          @kb.add('c-c')
136          def _(event): event.app.exit(result=None)
137  
138          return kb
139  
140      def _finalize(self):
141          try:
142              y, m, d = self.values['year'], self.values['month'], self.values['day']
143              max_day = (datetime.datetime(y, m % 12 + 1, 1) - datetime.timedelta(days=1)).day
144              self.values['day'] = min(d, max_day)
145              return datetime.datetime(
146                  y, m, self.values['day'],
147                  self.values['hour'],
148                  self.values['minute']
149              )
150          except:
151              return None
152  
153      def run(self):
154          return self.app.run()
155  
156  
157  def load_data():
158      if not os.path.exists(filepath):
159          return "error_not_found"
160      if os.path.getsize(filepath) == 0:
161          return "error_empty"
162      try:
163          df = pd.read_csv(filepath, quotechar='"', engine='python')
164          df.rename(columns={
165              "Date": "date", "Method": "type",
166              "Status Code": "status", "Money/Error": "details"
167          }, inplace=True)
168          df['date'] = pd.to_datetime(df['date'], errors='coerce')
169          df.dropna(subset=['date'], inplace=True)
170          bad_types = {"error", "general", "general error", "none", "", None}
171          df = df[~df["type"].astype(str).str.lower().isin(bad_types)]
172  
173          def extract_amount(detail, status):
174              if str(status) != "200":
175                  return 0.0
176              try:
177                  clean = str(detail).replace(".", "").replace(",", ".")
178                  return float(clean)
179              except:
180                  return 0.0
181          
182          df['amount'] = df.apply(lambda row: extract_amount(row['details'], row['status']), axis=1)
183          return df
184      except Exception as e:
185          return f"error_parsing:{e}"
186  
187  
188  def get_filters(df):
189      all_types = sorted(df["type"].unique())
190      selected_types = questionary.checkbox("Select types to include:", choices=all_types).ask()
191      if selected_types is None: return None
192  
193      use_date = questionary.confirm("Filter by date range?", default=False).ask()
194      start_date = end_date = None
195      if use_date:
196          start_date = DateTimePicker(title="START").run()
197          end_date = DateTimePicker(title="END").run()
198      return {"types": selected_types, "start_date": start_date, "end_date": end_date}
199  
200  
201  def summarize(df):
202      summary = {}
203      all_error_count, all_money = 0, 0.0
204      all_status_counts = {}
205      for typ in df['type'].unique():
206          sub = df[df['type'] == typ]
207          error_count = sub[sub['status'] == 'ERROR'].shape[0]
208          status_counts = sub['status'].value_counts().to_dict()
209          total_money = sub['amount'].sum()
210          summary[typ] = {
211              "errors": error_count,
212              "statuses": status_counts,
213              "money": total_money
214          }
215          all_error_count += error_count
216          all_money += total_money
217          for stat, count in status_counts.items():
218              all_status_counts[stat] = all_status_counts.get(stat, 0) + count
219      summary["ALL"] = {
220          "errors": all_error_count,
221          "statuses": all_status_counts,
222          "money": all_money
223      }
224      return summary
225  
226  
227  def print_summary(summary, filters):
228      table = Table(title="Detailed Log Summary")
229      table.add_column("Type", style="cyan")
230      table.add_column("Error Count", justify="right", style="red")
231      table.add_column("Status Codes", justify="left", style="yellow")
232      table.add_column("Total Money", justify="right", style="green")
233      for typ, data in summary.items():
234          status_str = ", ".join([f"{k}:{v}" for k, v in data["statuses"].items()])
235          table.add_row(
236              typ,
237              str(data["errors"]),
238              status_str or "-",
239              f"{data['money']:,.2f}"
240          )
241      console.print()
242      if filters.get("start_date") and filters.get("end_date"):
243          console.print(f"[bold white]Start date:[/] {filters['start_date']}   "
244                        f"[bold white]End date:[/] {filters['end_date']}\n")
245      console.print(table)
246  
247  
248  def main():
249      df = load_data()
250      if isinstance(df, str):
251          if df == "error_not_found": console.print("[red]ERROR:[/red] File not found.")
252          elif df == "error_empty": console.print("[red]ERROR:[/red] File is empty.")
253          elif df.startswith("error_parsing"):
254              console.print("[red]ERROR:[/red] Failed to parse CSV.")
255              console.print(f"[yellow]{df.split(':', 1)[1]}[/yellow]")
256          return
257  
258      while True:
259          filters = get_filters(df)
260          if filters is None:
261              console.print("[bold]Goodbye.[/bold]")
262              break
263  
264          fdf = df[df['type'].isin(filters["types"])]
265          if filters['start_date']:
266              fdf = fdf[fdf['date'] >= filters['start_date']]
267          if filters['end_date']:
268              fdf = fdf[fdf['date'] <= filters['end_date']]
269  
270          if fdf.empty:
271              console.print("[yellow]No records found for selected filters.[/yellow]")
272          else:
273              summary = summarize(fdf)
274              print_summary(summary, filters)
275  
276          again = questionary.confirm("Analyze again?", default=False).ask()
277          if not again:
278              break
279  
280  
281  if __name__ == "__main__":
282      main()