/ test / benchmarks / analyze.py
analyze.py
  1  #!/usr/bin/env python3
  2  """Parse a shan session + audit log for one test run and emit a report."""
  3  import json, sys, re
  4  from pathlib import Path
  5  from collections import Counter
  6  
  7  def main(session_id, task_num, task_name):
  8      home = Path.home()
  9      sess_file = home / ".shannon/sessions" / f"{session_id}.json"
 10      audit = home / ".shannon/logs/audit.log"
 11      if not sess_file.exists():
 12          print(f"ERROR: session file not found: {sess_file}")
 13          return
 14      sess = json.loads(sess_file.read_text())
 15  
 16      # Audit rows for this session, partitioned into tool calls vs non-tool
 17      # events (e.g. {"event":"force_stop"}). Non-tool events are tracked
 18      # separately so they don't pollute tool_calls/tool_dist/streaks.
 19      tool_calls = []
 20      events = []
 21      with audit.open() as f:
 22          for line in f:
 23              try:
 24                  r = json.loads(line)
 25              except json.JSONDecodeError:
 26                  continue
 27              if r.get("session_id") != session_id:
 28                  continue
 29              if r.get("tool_name"):
 30                  tool_calls.append(r)
 31              elif r.get("event"):
 32                  events.append(r)
 33  
 34      detector_force_stop = any(e.get("event") == "force_stop" for e in events)
 35  
 36      tools = [t.get("tool_name", "?") for t in tool_calls]
 37      tool_counts = Counter(tools)
 38      # consecutive-same-tool streaks
 39      streaks = []
 40      cur, cur_n = None, 0
 41      for name in tools:
 42          if name == cur:
 43              cur_n += 1
 44          else:
 45              if cur_n >= 3:
 46                  streaks.append((cur, cur_n))
 47              cur, cur_n = name, 1
 48      if cur_n >= 3:
 49          streaks.append((cur, cur_n))
 50  
 51      # error heuristic: output_summary contains "error" or starts with "fail"
 52      failed = 0
 53      for t in tool_calls:
 54          out = (t.get("output_summary") or "").lower()
 55          if ("error" in out[:200]) or out.startswith("fail") or "no such file" in out:
 56              failed += 1
 57  
 58      # read-before-edit violation: file_edit without prior file_read of same path
 59      reads, violations = set(), []
 60      for t in tool_calls:
 61          name = t.get("tool_name")
 62          inp = t.get("input_summary") or ""
 63          m = re.search(r'"(?:file_path|path)"\s*:\s*"([^"]+)"', inp)
 64          path = m.group(1) if m else None
 65          if name == "file_read" and path:
 66              reads.add(path)
 67          if name == "file_edit" and path and path not in reads:
 68              violations.append(path)
 69  
 70      usage = sess.get("usage", {})
 71      llm_calls = usage.get("llm_calls", 0)
 72      msgs = sess.get("messages", [])
 73      last_assistant = ""
 74      for m in reversed(msgs):
 75          if m.get("role") == "assistant":
 76              c = m.get("content")
 77              if isinstance(c, str):
 78                  last_assistant = c
 79              elif isinstance(c, list):
 80                  for part in c:
 81                      if isinstance(part, dict) and part.get("type") == "text":
 82                          last_assistant = part.get("text", "")
 83                          break
 84              break
 85  
 86      # Final-message heuristic: both the maxIter path and the new
 87      # detector-driven force-stop path emit the same Task/Done/Pending/
 88      # Partial-answer shape (PR #86 unified them for UX). Disambiguate
 89      # with the audit-log event tag: a force_stop event means detector
 90      # path; its absence when the report shape is present means maxIter.
 91      looks_structured = "**Task**" in last_assistant and "**Done**" in last_assistant
 92      if looks_structured and detector_force_stop:
 93          synthesis_reason = "detector_force_stop"
 94      elif looks_structured:
 95          synthesis_reason = "maxiter"
 96      else:
 97          synthesis_reason = None
 98  
 99      report = {
100          "task": task_num,
101          "name": task_name,
102          "session_id": session_id,
103          "llm_calls": llm_calls,
104          "tool_calls": len(tool_calls),
105          "tool_dist": dict(tool_counts.most_common()),
106          "consecutive_streaks_3plus": streaks,
107          "failures_detected": failed,
108          "read_before_edit_violations": violations,
109          "total_tokens": usage.get("total_tokens", 0),
110          "cost_usd": usage.get("cost_usd", 0),
111          "cache_read_tokens": usage.get("cache_read_tokens", 0),
112          "detector_force_stop": detector_force_stop,
113          "synthesis_reason": synthesis_reason,
114          "last_assistant_preview": last_assistant[:400],
115          "tool_sequence": [f"{i+1}. {t.get('tool_name', '?')}: {(t.get('input_summary') or '')[:80]}" for i, t in enumerate(tool_calls)][:40],
116      }
117      print(json.dumps(report, ensure_ascii=False, indent=2))
118  
119  if __name__ == "__main__":
120      main(sys.argv[1], sys.argv[2], sys.argv[3])