/ agent / insights.py
insights.py
  1  """
  2  Session Insights Engine for Hermes Agent.
  3  
  4  Analyzes historical session data from the SQLite state database to produce
  5  comprehensive usage insights — token consumption, cost estimates, tool usage
  6  patterns, activity trends, model/platform breakdowns, and session metrics.
  7  
  8  Inspired by Claude Code's /insights command, adapted for Hermes Agent's
  9  multi-platform architecture with additional cost estimation and platform
 10  breakdown capabilities.
 11  
 12  Usage:
 13      from agent.insights import InsightsEngine
 14      engine = InsightsEngine(db)
 15      report = engine.generate(days=30)
 16      print(engine.format_terminal(report))
 17  """
 18  
 19  import json
 20  import time
 21  from collections import Counter, defaultdict
 22  from datetime import datetime
 23  from typing import Any, Dict, List
 24  
 25  from agent.usage_pricing import (
 26      CanonicalUsage,
 27      DEFAULT_PRICING,
 28      estimate_usage_cost,
 29      format_duration_compact,
 30      has_known_pricing,
 31  )
 32  
 33  _DEFAULT_PRICING = DEFAULT_PRICING
 34  
 35  
 36  def _has_known_pricing(model_name: str, provider: str = None, base_url: str = None) -> bool:
 37      """Check if a model has known pricing (vs unknown/custom endpoint)."""
 38      return has_known_pricing(model_name, provider=provider, base_url=base_url)
 39  
 40  
 41  def _estimate_cost(
 42      session_or_model: Dict[str, Any] | str,
 43      input_tokens: int = 0,
 44      output_tokens: int = 0,
 45      *,
 46      cache_read_tokens: int = 0,
 47      cache_write_tokens: int = 0,
 48      provider: str = None,
 49      base_url: str = None,
 50  ) -> tuple[float, str]:
 51      """Estimate the USD cost for a session row or a model/token tuple."""
 52      if isinstance(session_or_model, dict):
 53          session = session_or_model
 54          model = session.get("model") or ""
 55          usage = CanonicalUsage(
 56              input_tokens=session.get("input_tokens") or 0,
 57              output_tokens=session.get("output_tokens") or 0,
 58              cache_read_tokens=session.get("cache_read_tokens") or 0,
 59              cache_write_tokens=session.get("cache_write_tokens") or 0,
 60          )
 61          provider = session.get("billing_provider")
 62          base_url = session.get("billing_base_url")
 63      else:
 64          model = session_or_model or ""
 65          usage = CanonicalUsage(
 66              input_tokens=input_tokens,
 67              output_tokens=output_tokens,
 68              cache_read_tokens=cache_read_tokens,
 69              cache_write_tokens=cache_write_tokens,
 70          )
 71      result = estimate_usage_cost(
 72          model,
 73          usage,
 74          provider=provider,
 75          base_url=base_url,
 76      )
 77      return float(result.amount_usd or 0.0), result.status
 78  
 79  
 80  def _format_duration(seconds: float) -> str:
 81      """Format seconds into a human-readable duration string."""
 82      return format_duration_compact(seconds)
 83  
 84  
 85  def _bar_chart(values: List[int], max_width: int = 20) -> List[str]:
 86      """Create simple horizontal bar chart strings from values."""
 87      peak = max(values) if values else 1
 88      if peak == 0:
 89          return ["" for _ in values]
 90      return ["█" * max(1, int(v / peak * max_width)) if v > 0 else "" for v in values]
 91  
 92  
 93  class InsightsEngine:
 94      """
 95      Analyzes session history and produces usage insights.
 96  
 97      Works directly with a SessionDB instance (or raw sqlite3 connection)
 98      to query session and message data.
 99      """
100  
101      def __init__(self, db):
102          """
103          Initialize with a SessionDB instance.
104  
105          Args:
106              db: A SessionDB instance (from hermes_state.py)
107          """
108          self.db = db
109          self._conn = db._conn
110  
111      def generate(self, days: int = 30, source: str = None) -> Dict[str, Any]:
112          """
113          Generate a complete insights report.
114  
115          Args:
116              days: Number of days to look back (default: 30)
117              source: Optional filter by source platform
118  
119          Returns:
120              Dict with all computed insights
121          """
122          cutoff = time.time() - (days * 86400)
123  
124          # Gather raw data
125          sessions = self._get_sessions(cutoff, source)
126          tool_usage = self._get_tool_usage(cutoff, source)
127          skill_usage = self._get_skill_usage(cutoff, source)
128          message_stats = self._get_message_stats(cutoff, source)
129  
130          if not sessions:
131              return {
132                  "days": days,
133                  "source_filter": source,
134                  "empty": True,
135                  "overview": {},
136                  "models": [],
137                  "platforms": [],
138                  "tools": [],
139                  "skills": {
140                      "summary": {
141                          "total_skill_loads": 0,
142                          "total_skill_edits": 0,
143                          "total_skill_actions": 0,
144                          "distinct_skills_used": 0,
145                      },
146                      "top_skills": [],
147                  },
148                  "activity": {},
149                  "top_sessions": [],
150              }
151  
152          # Compute insights
153          overview = self._compute_overview(sessions, message_stats)
154          models = self._compute_model_breakdown(sessions)
155          platforms = self._compute_platform_breakdown(sessions)
156          tools = self._compute_tool_breakdown(tool_usage)
157          skills = self._compute_skill_breakdown(skill_usage)
158          activity = self._compute_activity_patterns(sessions)
159          top_sessions = self._compute_top_sessions(sessions)
160  
161          return {
162              "days": days,
163              "source_filter": source,
164              "empty": False,
165              "generated_at": time.time(),
166              "overview": overview,
167              "models": models,
168              "platforms": platforms,
169              "tools": tools,
170              "skills": skills,
171              "activity": activity,
172              "top_sessions": top_sessions,
173          }
174  
175      # =========================================================================
176      # Data gathering (SQL queries)
177      # =========================================================================
178  
179      # Columns we actually need (skip system_prompt, model_config blobs)
180      _SESSION_COLS = ("id, source, model, started_at, ended_at, "
181                       "message_count, tool_call_count, input_tokens, output_tokens, "
182                       "cache_read_tokens, cache_write_tokens, billing_provider, "
183                       "billing_base_url, billing_mode, estimated_cost_usd, "
184                       "actual_cost_usd, cost_status, cost_source")
185  
186      # Pre-computed query strings — f-string evaluated once at class definition,
187      # not at runtime, so no user-controlled value can alter the query structure.
188      _GET_SESSIONS_WITH_SOURCE = (
189          f"SELECT {_SESSION_COLS} FROM sessions"
190          " WHERE started_at >= ? AND source = ?"
191          " ORDER BY started_at DESC"
192      )
193      _GET_SESSIONS_ALL = (
194          f"SELECT {_SESSION_COLS} FROM sessions"
195          " WHERE started_at >= ?"
196          " ORDER BY started_at DESC"
197      )
198  
199      def _get_sessions(self, cutoff: float, source: str = None) -> List[Dict]:
200          """Fetch sessions within the time window."""
201          if source:
202              cursor = self._conn.execute(self._GET_SESSIONS_WITH_SOURCE, (cutoff, source))
203          else:
204              cursor = self._conn.execute(self._GET_SESSIONS_ALL, (cutoff,))
205          return [dict(row) for row in cursor.fetchall()]
206  
207      def _get_tool_usage(self, cutoff: float, source: str = None) -> List[Dict]:
208          """Get tool call counts from messages.
209  
210          Uses two sources:
211          1. tool_name column on 'tool' role messages (set by gateway)
212          2. tool_calls JSON on 'assistant' role messages (covers CLI where
213             tool_name is not populated on tool responses)
214          """
215          tool_counts = Counter()
216  
217          # Source 1: explicit tool_name on tool response messages
218          if source:
219              cursor = self._conn.execute(
220                  """SELECT m.tool_name, COUNT(*) as count
221                     FROM messages m
222                     JOIN sessions s ON s.id = m.session_id
223                     WHERE s.started_at >= ? AND s.source = ?
224                       AND m.role = 'tool' AND m.tool_name IS NOT NULL
225                     GROUP BY m.tool_name
226                     ORDER BY count DESC""",
227                  (cutoff, source),
228              )
229          else:
230              cursor = self._conn.execute(
231                  """SELECT m.tool_name, COUNT(*) as count
232                     FROM messages m
233                     JOIN sessions s ON s.id = m.session_id
234                     WHERE s.started_at >= ?
235                       AND m.role = 'tool' AND m.tool_name IS NOT NULL
236                     GROUP BY m.tool_name
237                     ORDER BY count DESC""",
238                  (cutoff,),
239              )
240          for row in cursor.fetchall():
241              tool_counts[row["tool_name"]] += row["count"]
242  
243          # Source 2: extract from tool_calls JSON on assistant messages
244          # (covers CLI sessions where tool_name is NULL on tool responses)
245          if source:
246              cursor2 = self._conn.execute(
247                  """SELECT m.tool_calls
248                     FROM messages m
249                     JOIN sessions s ON s.id = m.session_id
250                     WHERE s.started_at >= ? AND s.source = ?
251                       AND m.role = 'assistant' AND m.tool_calls IS NOT NULL""",
252                  (cutoff, source),
253              )
254          else:
255              cursor2 = self._conn.execute(
256                  """SELECT m.tool_calls
257                     FROM messages m
258                     JOIN sessions s ON s.id = m.session_id
259                     WHERE s.started_at >= ?
260                       AND m.role = 'assistant' AND m.tool_calls IS NOT NULL""",
261                  (cutoff,),
262              )
263  
264          tool_calls_counts = Counter()
265          for row in cursor2.fetchall():
266              try:
267                  calls = row["tool_calls"]
268                  if isinstance(calls, str):
269                      calls = json.loads(calls)
270                  if isinstance(calls, list):
271                      for call in calls:
272                          func = call.get("function", {}) if isinstance(call, dict) else {}
273                          name = func.get("name")
274                          if name:
275                              tool_calls_counts[name] += 1
276              except (json.JSONDecodeError, TypeError, AttributeError):
277                  continue
278  
279          # Merge: prefer tool_name source, supplement with tool_calls source
280          # for tools not already counted
281          if not tool_counts and tool_calls_counts:
282              # No tool_name data at all — use tool_calls exclusively
283              tool_counts = tool_calls_counts
284          elif tool_counts and tool_calls_counts:
285              # Both sources have data — use whichever has the higher count per tool
286              # (they may overlap, so take the max to avoid double-counting)
287              all_tools = set(tool_counts) | set(tool_calls_counts)
288              merged = Counter()
289              for tool in all_tools:
290                  merged[tool] = max(tool_counts.get(tool, 0), tool_calls_counts.get(tool, 0))
291              tool_counts = merged
292  
293          # Convert to the expected format
294          return [
295              {"tool_name": name, "count": count}
296              for name, count in tool_counts.most_common()
297          ]
298  
299      def _get_skill_usage(self, cutoff: float, source: str = None) -> List[Dict]:
300          """Extract per-skill usage from assistant tool calls."""
301          skill_counts: Dict[str, Dict[str, Any]] = {}
302  
303          if source:
304              cursor = self._conn.execute(
305                  """SELECT m.tool_calls, m.timestamp
306                     FROM messages m
307                     JOIN sessions s ON s.id = m.session_id
308                     WHERE s.started_at >= ? AND s.source = ?
309                       AND m.role = 'assistant' AND m.tool_calls IS NOT NULL""",
310                  (cutoff, source),
311              )
312          else:
313              cursor = self._conn.execute(
314                  """SELECT m.tool_calls, m.timestamp
315                     FROM messages m
316                     JOIN sessions s ON s.id = m.session_id
317                     WHERE s.started_at >= ?
318                       AND m.role = 'assistant' AND m.tool_calls IS NOT NULL""",
319                  (cutoff,),
320              )
321  
322          for row in cursor.fetchall():
323              try:
324                  calls = row["tool_calls"]
325                  if isinstance(calls, str):
326                      calls = json.loads(calls)
327                  if not isinstance(calls, list):
328                      continue
329              except (json.JSONDecodeError, TypeError):
330                  continue
331  
332              timestamp = row["timestamp"]
333              for call in calls:
334                  if not isinstance(call, dict):
335                      continue
336                  func = call.get("function", {})
337                  tool_name = func.get("name")
338                  if tool_name not in {"skill_view", "skill_manage"}:
339                      continue
340  
341                  args = func.get("arguments")
342                  if isinstance(args, str):
343                      try:
344                          args = json.loads(args)
345                      except (json.JSONDecodeError, TypeError):
346                          continue
347                  if not isinstance(args, dict):
348                      continue
349  
350                  skill_name = args.get("name")
351                  if not isinstance(skill_name, str) or not skill_name.strip():
352                      continue
353  
354                  entry = skill_counts.setdefault(
355                      skill_name,
356                      {
357                          "skill": skill_name,
358                          "view_count": 0,
359                          "manage_count": 0,
360                          "last_used_at": None,
361                      },
362                  )
363                  if tool_name == "skill_view":
364                      entry["view_count"] += 1
365                  else:
366                      entry["manage_count"] += 1
367  
368                  if timestamp is not None and (
369                      entry["last_used_at"] is None or timestamp > entry["last_used_at"]
370                  ):
371                      entry["last_used_at"] = timestamp
372  
373          return list(skill_counts.values())
374  
375      def _get_message_stats(self, cutoff: float, source: str = None) -> Dict:
376          """Get aggregate message statistics."""
377          if source:
378              cursor = self._conn.execute(
379                  """SELECT
380                       COUNT(*) as total_messages,
381                       SUM(CASE WHEN m.role = 'user' THEN 1 ELSE 0 END) as user_messages,
382                       SUM(CASE WHEN m.role = 'assistant' THEN 1 ELSE 0 END) as assistant_messages,
383                       SUM(CASE WHEN m.role = 'tool' THEN 1 ELSE 0 END) as tool_messages
384                     FROM messages m
385                     JOIN sessions s ON s.id = m.session_id
386                     WHERE s.started_at >= ? AND s.source = ?""",
387                  (cutoff, source),
388              )
389          else:
390              cursor = self._conn.execute(
391                  """SELECT
392                       COUNT(*) as total_messages,
393                       SUM(CASE WHEN m.role = 'user' THEN 1 ELSE 0 END) as user_messages,
394                       SUM(CASE WHEN m.role = 'assistant' THEN 1 ELSE 0 END) as assistant_messages,
395                       SUM(CASE WHEN m.role = 'tool' THEN 1 ELSE 0 END) as tool_messages
396                     FROM messages m
397                     JOIN sessions s ON s.id = m.session_id
398                     WHERE s.started_at >= ?""",
399                  (cutoff,),
400              )
401          row = cursor.fetchone()
402          return dict(row) if row else {
403              "total_messages": 0, "user_messages": 0,
404              "assistant_messages": 0, "tool_messages": 0,
405          }
406  
407      # =========================================================================
408      # Computation
409      # =========================================================================
410  
411      def _compute_overview(self, sessions: List[Dict], message_stats: Dict) -> Dict:
412          """Compute high-level overview statistics."""
413          total_input = sum(s.get("input_tokens") or 0 for s in sessions)
414          total_output = sum(s.get("output_tokens") or 0 for s in sessions)
415          total_cache_read = sum(s.get("cache_read_tokens") or 0 for s in sessions)
416          total_cache_write = sum(s.get("cache_write_tokens") or 0 for s in sessions)
417          total_tokens = total_input + total_output + total_cache_read + total_cache_write
418          total_tool_calls = sum(s.get("tool_call_count") or 0 for s in sessions)
419          total_messages = sum(s.get("message_count") or 0 for s in sessions)
420  
421          # Cost estimation (weighted by model)
422          total_cost = 0.0
423          actual_cost = 0.0
424          models_with_pricing = set()
425          models_without_pricing = set()
426          unknown_cost_sessions = 0
427          included_cost_sessions = 0
428          for s in sessions:
429              model = s.get("model") or ""
430              estimated, status = _estimate_cost(s)
431              total_cost += estimated
432              actual_cost += s.get("actual_cost_usd") or 0.0
433              display = model.split("/")[-1] if "/" in model else (model or "unknown")
434              if status == "included":
435                  included_cost_sessions += 1
436              elif status == "unknown":
437                  unknown_cost_sessions += 1
438              if _has_known_pricing(model, s.get("billing_provider"), s.get("billing_base_url")):
439                  models_with_pricing.add(display)
440              else:
441                  models_without_pricing.add(display)
442  
443          # Session duration stats (guard against negative durations from clock drift)
444          durations = []
445          for s in sessions:
446              start = s.get("started_at")
447              end = s.get("ended_at")
448              if start and end and end > start:
449                  durations.append(end - start)
450  
451          total_hours = sum(durations) / 3600 if durations else 0
452          avg_duration = sum(durations) / len(durations) if durations else 0
453  
454          # Earliest and latest session
455          started_timestamps = [s["started_at"] for s in sessions if s.get("started_at")]
456          date_range_start = min(started_timestamps) if started_timestamps else None
457          date_range_end = max(started_timestamps) if started_timestamps else None
458  
459          return {
460              "total_sessions": len(sessions),
461              "total_messages": total_messages,
462              "total_tool_calls": total_tool_calls,
463              "total_input_tokens": total_input,
464              "total_output_tokens": total_output,
465              "total_cache_read_tokens": total_cache_read,
466              "total_cache_write_tokens": total_cache_write,
467              "total_tokens": total_tokens,
468              "estimated_cost": total_cost,
469              "actual_cost": actual_cost,
470              "total_hours": total_hours,
471              "avg_session_duration": avg_duration,
472              "avg_messages_per_session": total_messages / len(sessions) if sessions else 0,
473              "avg_tokens_per_session": total_tokens / len(sessions) if sessions else 0,
474              "user_messages": message_stats.get("user_messages") or 0,
475              "assistant_messages": message_stats.get("assistant_messages") or 0,
476              "tool_messages": message_stats.get("tool_messages") or 0,
477              "date_range_start": date_range_start,
478              "date_range_end": date_range_end,
479              "models_with_pricing": sorted(models_with_pricing),
480              "models_without_pricing": sorted(models_without_pricing),
481              "unknown_cost_sessions": unknown_cost_sessions,
482              "included_cost_sessions": included_cost_sessions,
483          }
484  
485      def _compute_model_breakdown(self, sessions: List[Dict]) -> List[Dict]:
486          """Break down usage by model."""
487          model_data = defaultdict(lambda: {
488              "sessions": 0, "input_tokens": 0, "output_tokens": 0,
489              "cache_read_tokens": 0, "cache_write_tokens": 0,
490              "total_tokens": 0, "tool_calls": 0, "cost": 0.0,
491          })
492  
493          for s in sessions:
494              model = s.get("model") or "unknown"
495              # Normalize: strip provider prefix for display
496              display_model = model.split("/")[-1] if "/" in model else model
497              d = model_data[display_model]
498              d["sessions"] += 1
499              inp = s.get("input_tokens") or 0
500              out = s.get("output_tokens") or 0
501              cache_read = s.get("cache_read_tokens") or 0
502              cache_write = s.get("cache_write_tokens") or 0
503              d["input_tokens"] += inp
504              d["output_tokens"] += out
505              d["cache_read_tokens"] += cache_read
506              d["cache_write_tokens"] += cache_write
507              d["total_tokens"] += inp + out + cache_read + cache_write
508              d["tool_calls"] += s.get("tool_call_count") or 0
509              estimate, status = _estimate_cost(s)
510              d["cost"] += estimate
511              d["has_pricing"] = _has_known_pricing(model, s.get("billing_provider"), s.get("billing_base_url"))
512              d["cost_status"] = status
513  
514          result = [
515              {"model": model, **data}
516              for model, data in model_data.items()
517          ]
518          # Sort by tokens first, fall back to session count when tokens are 0
519          result.sort(key=lambda x: (x["total_tokens"], x["sessions"]), reverse=True)
520          return result
521  
522      def _compute_platform_breakdown(self, sessions: List[Dict]) -> List[Dict]:
523          """Break down usage by platform/source."""
524          platform_data = defaultdict(lambda: {
525              "sessions": 0, "messages": 0, "input_tokens": 0,
526              "output_tokens": 0, "cache_read_tokens": 0,
527              "cache_write_tokens": 0, "total_tokens": 0, "tool_calls": 0,
528          })
529  
530          for s in sessions:
531              source = s.get("source") or "unknown"
532              d = platform_data[source]
533              d["sessions"] += 1
534              d["messages"] += s.get("message_count") or 0
535              inp = s.get("input_tokens") or 0
536              out = s.get("output_tokens") or 0
537              cache_read = s.get("cache_read_tokens") or 0
538              cache_write = s.get("cache_write_tokens") or 0
539              d["input_tokens"] += inp
540              d["output_tokens"] += out
541              d["cache_read_tokens"] += cache_read
542              d["cache_write_tokens"] += cache_write
543              d["total_tokens"] += inp + out + cache_read + cache_write
544              d["tool_calls"] += s.get("tool_call_count") or 0
545  
546          result = [
547              {"platform": platform, **data}
548              for platform, data in platform_data.items()
549          ]
550          result.sort(key=lambda x: x["sessions"], reverse=True)
551          return result
552  
553      def _compute_tool_breakdown(self, tool_usage: List[Dict]) -> List[Dict]:
554          """Process tool usage data into a ranked list with percentages."""
555          total_calls = sum(t["count"] for t in tool_usage) if tool_usage else 0
556          result = []
557          for t in tool_usage:
558              pct = (t["count"] / total_calls * 100) if total_calls else 0
559              result.append({
560                  "tool": t["tool_name"],
561                  "count": t["count"],
562                  "percentage": pct,
563              })
564          return result
565  
566      def _compute_skill_breakdown(self, skill_usage: List[Dict]) -> Dict[str, Any]:
567          """Process per-skill usage into summary + ranked list."""
568          total_skill_loads = sum(s["view_count"] for s in skill_usage) if skill_usage else 0
569          total_skill_edits = sum(s["manage_count"] for s in skill_usage) if skill_usage else 0
570          total_skill_actions = total_skill_loads + total_skill_edits
571  
572          top_skills = []
573          for skill in skill_usage:
574              total_count = skill["view_count"] + skill["manage_count"]
575              percentage = (total_count / total_skill_actions * 100) if total_skill_actions else 0
576              top_skills.append({
577                  "skill": skill["skill"],
578                  "view_count": skill["view_count"],
579                  "manage_count": skill["manage_count"],
580                  "total_count": total_count,
581                  "percentage": percentage,
582                  "last_used_at": skill.get("last_used_at"),
583              })
584  
585          top_skills.sort(
586              key=lambda s: (
587                  s["total_count"],
588                  s["view_count"],
589                  s["manage_count"],
590                  s["last_used_at"] or 0,
591                  s["skill"],
592              ),
593              reverse=True,
594          )
595  
596          return {
597              "summary": {
598                  "total_skill_loads": total_skill_loads,
599                  "total_skill_edits": total_skill_edits,
600                  "total_skill_actions": total_skill_actions,
601                  "distinct_skills_used": len(skill_usage),
602              },
603              "top_skills": top_skills,
604          }
605  
606      def _compute_activity_patterns(self, sessions: List[Dict]) -> Dict:
607          """Analyze activity patterns by day of week and hour."""
608          day_counts = Counter()  # 0=Monday ... 6=Sunday
609          hour_counts = Counter()
610          daily_counts = Counter()  # date string -> count
611  
612          for s in sessions:
613              ts = s.get("started_at")
614              if not ts:
615                  continue
616              dt = datetime.fromtimestamp(ts)
617              day_counts[dt.weekday()] += 1
618              hour_counts[dt.hour] += 1
619              daily_counts[dt.strftime("%Y-%m-%d")] += 1
620  
621          day_names = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
622          day_breakdown = [
623              {"day": day_names[i], "count": day_counts.get(i, 0)}
624              for i in range(7)
625          ]
626  
627          hour_breakdown = [
628              {"hour": i, "count": hour_counts.get(i, 0)}
629              for i in range(24)
630          ]
631  
632          # Busiest day and hour
633          busiest_day = max(day_breakdown, key=lambda x: x["count"]) if day_breakdown else None
634          busiest_hour = max(hour_breakdown, key=lambda x: x["count"]) if hour_breakdown else None
635  
636          # Active days (days with at least one session)
637          active_days = len(daily_counts)
638  
639          # Streak calculation
640          if daily_counts:
641              all_dates = sorted(daily_counts.keys())
642              current_streak = 1
643              max_streak = 1
644              for i in range(1, len(all_dates)):
645                  d1 = datetime.strptime(all_dates[i - 1], "%Y-%m-%d")
646                  d2 = datetime.strptime(all_dates[i], "%Y-%m-%d")
647                  if (d2 - d1).days == 1:
648                      current_streak += 1
649                      max_streak = max(max_streak, current_streak)
650                  else:
651                      current_streak = 1
652          else:
653              max_streak = 0
654  
655          return {
656              "by_day": day_breakdown,
657              "by_hour": hour_breakdown,
658              "busiest_day": busiest_day,
659              "busiest_hour": busiest_hour,
660              "active_days": active_days,
661              "max_streak": max_streak,
662          }
663  
664      def _compute_top_sessions(self, sessions: List[Dict]) -> List[Dict]:
665          """Find notable sessions (longest, most messages, most tokens)."""
666          top = []
667  
668          # Longest by duration
669          sessions_with_duration = [
670              s for s in sessions
671              if s.get("started_at") and s.get("ended_at")
672          ]
673          if sessions_with_duration:
674              longest = max(
675                  sessions_with_duration,
676                  key=lambda s: (s["ended_at"] - s["started_at"]),
677              )
678              dur = longest["ended_at"] - longest["started_at"]
679              top.append({
680                  "label": "Longest session",
681                  "session_id": longest["id"][:16],
682                  "value": _format_duration(dur),
683                  "date": datetime.fromtimestamp(longest["started_at"]).strftime("%b %d"),
684              })
685  
686          # Most messages
687          most_msgs = max(sessions, key=lambda s: s.get("message_count") or 0)
688          if (most_msgs.get("message_count") or 0) > 0:
689              top.append({
690                  "label": "Most messages",
691                  "session_id": most_msgs["id"][:16],
692                  "value": f"{most_msgs['message_count']} msgs",
693                  "date": datetime.fromtimestamp(most_msgs["started_at"]).strftime("%b %d") if most_msgs.get("started_at") else "?",
694              })
695  
696          # Most tokens
697          most_tokens = max(
698              sessions,
699              key=lambda s: (s.get("input_tokens") or 0) + (s.get("output_tokens") or 0),
700          )
701          token_total = (most_tokens.get("input_tokens") or 0) + (most_tokens.get("output_tokens") or 0)
702          if token_total > 0:
703              top.append({
704                  "label": "Most tokens",
705                  "session_id": most_tokens["id"][:16],
706                  "value": f"{token_total:,} tokens",
707                  "date": datetime.fromtimestamp(most_tokens["started_at"]).strftime("%b %d") if most_tokens.get("started_at") else "?",
708              })
709  
710          # Most tool calls
711          most_tools = max(sessions, key=lambda s: s.get("tool_call_count") or 0)
712          if (most_tools.get("tool_call_count") or 0) > 0:
713              top.append({
714                  "label": "Most tool calls",
715                  "session_id": most_tools["id"][:16],
716                  "value": f"{most_tools['tool_call_count']} calls",
717                  "date": datetime.fromtimestamp(most_tools["started_at"]).strftime("%b %d") if most_tools.get("started_at") else "?",
718              })
719  
720          return top
721  
722      # =========================================================================
723      # Formatting
724      # =========================================================================
725  
726      def format_terminal(self, report: Dict) -> str:
727          """Format the insights report for terminal display (CLI)."""
728          if report.get("empty"):
729              days = report.get("days", 30)
730              src = f" (source: {report['source_filter']})" if report.get("source_filter") else ""
731              return f"  No sessions found in the last {days} days{src}."
732  
733          lines = []
734          o = report["overview"]
735          days = report["days"]
736          src_filter = report.get("source_filter")
737  
738          # Header
739          lines.append("")
740          lines.append("  ╔══════════════════════════════════════════════════════════╗")
741          lines.append("  ║                    📊 Hermes Insights                    ║")
742          period_label = f"Last {days} days"
743          if src_filter:
744              period_label += f" ({src_filter})"
745          padding = 58 - len(period_label) - 2
746          left_pad = padding // 2
747          right_pad = padding - left_pad
748          lines.append(f"  ║{' ' * left_pad} {period_label} {' ' * right_pad}║")
749          lines.append("  ╚══════════════════════════════════════════════════════════╝")
750          lines.append("")
751  
752          # Date range
753          if o.get("date_range_start") and o.get("date_range_end"):
754              start_str = datetime.fromtimestamp(o["date_range_start"]).strftime("%b %d, %Y")
755              end_str = datetime.fromtimestamp(o["date_range_end"]).strftime("%b %d, %Y")
756              lines.append(f"  Period: {start_str} — {end_str}")
757              lines.append("")
758  
759          # Overview
760          lines.append("  📋 Overview")
761          lines.append("  " + "─" * 56)
762          lines.append(f"  Sessions:          {o['total_sessions']:<12}  Messages:        {o['total_messages']:,}")
763          lines.append(f"  Tool calls:        {o['total_tool_calls']:<12,}  User messages:   {o['user_messages']:,}")
764          lines.append(f"  Input tokens:      {o['total_input_tokens']:<12,}  Output tokens:   {o['total_output_tokens']:,}")
765          lines.append(f"  Total tokens:      {o['total_tokens']:,}")
766          if o["total_hours"] > 0:
767              lines.append(f"  Active time:       ~{_format_duration(o['total_hours'] * 3600):<11}  Avg session:     ~{_format_duration(o['avg_session_duration'])}")
768          lines.append(f"  Avg msgs/session:  {o['avg_messages_per_session']:.1f}")
769          lines.append("")
770  
771          # Model breakdown
772          if report["models"]:
773              lines.append("  🤖 Models Used")
774              lines.append("  " + "─" * 56)
775              lines.append(f"  {'Model':<30} {'Sessions':>8} {'Tokens':>12}")
776              for m in report["models"]:
777                  model_name = m["model"][:28]
778                  lines.append(f"  {model_name:<30} {m['sessions']:>8} {m['total_tokens']:>12,}")
779              lines.append("")
780  
781          # Platform breakdown
782          if len(report["platforms"]) > 1 or (report["platforms"] and report["platforms"][0]["platform"] != "cli"):
783              lines.append("  📱 Platforms")
784              lines.append("  " + "─" * 56)
785              lines.append(f"  {'Platform':<14} {'Sessions':>8} {'Messages':>10} {'Tokens':>14}")
786              for p in report["platforms"]:
787                  lines.append(f"  {p['platform']:<14} {p['sessions']:>8} {p['messages']:>10,} {p['total_tokens']:>14,}")
788              lines.append("")
789  
790          # Tool usage
791          if report["tools"]:
792              lines.append("  🔧 Top Tools")
793              lines.append("  " + "─" * 56)
794              lines.append(f"  {'Tool':<28} {'Calls':>8} {'%':>8}")
795              for t in report["tools"][:15]:  # Top 15
796                  lines.append(f"  {t['tool']:<28} {t['count']:>8,} {t['percentage']:>7.1f}%")
797              if len(report["tools"]) > 15:
798                  lines.append(f"  ... and {len(report['tools']) - 15} more tools")
799              lines.append("")
800  
801          # Skill usage
802          skills = report.get("skills", {})
803          top_skills = skills.get("top_skills", [])
804          if top_skills:
805              lines.append("  🧠 Top Skills")
806              lines.append("  " + "─" * 56)
807              lines.append(f"  {'Skill':<28} {'Loads':>7} {'Edits':>7} {'Last used':>11}")
808              for skill in top_skills[:10]:
809                  last_used = "—"
810                  if skill.get("last_used_at"):
811                      last_used = datetime.fromtimestamp(skill["last_used_at"]).strftime("%b %d")
812                  lines.append(
813                      f"  {skill['skill'][:28]:<28} {skill['view_count']:>7,} {skill['manage_count']:>7,} {last_used:>11}"
814                  )
815              summary = skills.get("summary", {})
816              lines.append(
817                  f"  Distinct skills: {summary.get('distinct_skills_used', 0)}  "
818                  f"Loads: {summary.get('total_skill_loads', 0):,}  "
819                  f"Edits: {summary.get('total_skill_edits', 0):,}"
820              )
821              lines.append("")
822  
823          # Activity patterns
824          act = report.get("activity", {})
825          if act.get("by_day"):
826              lines.append("  📅 Activity Patterns")
827              lines.append("  " + "─" * 56)
828  
829              # Day of week chart
830              day_values = [d["count"] for d in act["by_day"]]
831              bars = _bar_chart(day_values, max_width=15)
832              for i, d in enumerate(act["by_day"]):
833                  bar = bars[i]
834                  lines.append(f"  {d['day']}  {bar:<15} {d['count']}")
835  
836              lines.append("")
837  
838              # Peak hours (show top 5 busiest hours)
839              busy_hours = sorted(act["by_hour"], key=lambda x: x["count"], reverse=True)
840              busy_hours = [h for h in busy_hours if h["count"] > 0][:5]
841              if busy_hours:
842                  hour_strs = []
843                  for h in busy_hours:
844                      hr = h["hour"]
845                      ampm = "AM" if hr < 12 else "PM"
846                      display_hr = hr % 12 or 12
847                      hour_strs.append(f"{display_hr}{ampm} ({h['count']})")
848                  lines.append(f"  Peak hours: {', '.join(hour_strs)}")
849  
850              if act.get("active_days"):
851                  lines.append(f"  Active days: {act['active_days']}")
852              if act.get("max_streak") and act["max_streak"] > 1:
853                  lines.append(f"  Best streak: {act['max_streak']} consecutive days")
854              lines.append("")
855  
856          # Notable sessions
857          if report.get("top_sessions"):
858              lines.append("  🏆 Notable Sessions")
859              lines.append("  " + "─" * 56)
860              for ts in report["top_sessions"]:
861                  lines.append(f"  {ts['label']:<20} {ts['value']:<18} ({ts['date']}, {ts['session_id']})")
862              lines.append("")
863  
864          return "\n".join(lines)
865  
866      def format_gateway(self, report: Dict) -> str:
867          """Format the insights report for gateway/messaging (shorter)."""
868          if report.get("empty"):
869              days = report.get("days", 30)
870              return f"No sessions found in the last {days} days."
871  
872          lines = []
873          o = report["overview"]
874          days = report["days"]
875  
876          lines.append(f"📊 **Hermes Insights** — Last {days} days\n")
877  
878          # Overview
879          lines.append(f"**Sessions:** {o['total_sessions']} | **Messages:** {o['total_messages']:,} | **Tool calls:** {o['total_tool_calls']:,}")
880          lines.append(f"**Tokens:** {o['total_tokens']:,} (in: {o['total_input_tokens']:,} / out: {o['total_output_tokens']:,})")
881          if o["total_hours"] > 0:
882              lines.append(f"**Active time:** ~{_format_duration(o['total_hours'] * 3600)} | **Avg session:** ~{_format_duration(o['avg_session_duration'])}")
883          lines.append("")
884  
885          # Models (top 5)
886          if report["models"]:
887              lines.append("**🤖 Models:**")
888              for m in report["models"][:5]:
889                  lines.append(f"  {m['model'][:25]} — {m['sessions']} sessions, {m['total_tokens']:,} tokens")
890              lines.append("")
891  
892          # Platforms (if multi-platform)
893          if len(report["platforms"]) > 1:
894              lines.append("**📱 Platforms:**")
895              for p in report["platforms"]:
896                  lines.append(f"  {p['platform']} — {p['sessions']} sessions, {p['messages']:,} msgs")
897              lines.append("")
898  
899          # Tools (top 8)
900          if report["tools"]:
901              lines.append("**🔧 Top Tools:**")
902              for t in report["tools"][:8]:
903                  lines.append(f"  {t['tool']} — {t['count']:,} calls ({t['percentage']:.1f}%)")
904              lines.append("")
905  
906          skills = report.get("skills", {})
907          if skills.get("top_skills"):
908              lines.append("**🧠 Top Skills:**")
909              for skill in skills["top_skills"][:5]:
910                  suffix = ""
911                  if skill.get("last_used_at"):
912                      suffix = f", last used {datetime.fromtimestamp(skill['last_used_at']).strftime('%b %d')}"
913                  lines.append(
914                      f"  {skill['skill']} — {skill['view_count']:,} loads, {skill['manage_count']:,} edits{suffix}"
915                  )
916              lines.append("")
917  
918          # Activity summary
919          act = report.get("activity", {})
920          if act.get("busiest_day") and act.get("busiest_hour"):
921              hr = act["busiest_hour"]["hour"]
922              ampm = "AM" if hr < 12 else "PM"
923              display_hr = hr % 12 or 12
924              lines.append(f"**📅 Busiest:** {act['busiest_day']['day']}s ({act['busiest_day']['count']} sessions), {display_hr}{ampm} ({act['busiest_hour']['count']} sessions)")
925              if act.get("active_days"):
926                  lines.append(f"**Active days:** {act['active_days']}", )
927              if act.get("max_streak", 0) > 1:
928                  lines.append(f"**Best streak:** {act['max_streak']} consecutive days")
929  
930          return "\n".join(lines)