insights.py
1 """ 2 Session Insights Engine for Hermes Agent. 3 4 Analyzes historical session data from the SQLite state database to produce 5 comprehensive usage insights — token consumption, cost estimates, tool usage 6 patterns, activity trends, model/platform breakdowns, and session metrics. 7 8 Inspired by Claude Code's /insights command, adapted for Hermes Agent's 9 multi-platform architecture with additional cost estimation and platform 10 breakdown capabilities. 11 12 Usage: 13 from agent.insights import InsightsEngine 14 engine = InsightsEngine(db) 15 report = engine.generate(days=30) 16 print(engine.format_terminal(report)) 17 """ 18 19 import json 20 import time 21 from collections import Counter, defaultdict 22 from datetime import datetime 23 from typing import Any, Dict, List 24 25 from agent.usage_pricing import ( 26 CanonicalUsage, 27 DEFAULT_PRICING, 28 estimate_usage_cost, 29 format_duration_compact, 30 has_known_pricing, 31 ) 32 33 _DEFAULT_PRICING = DEFAULT_PRICING 34 35 36 def _has_known_pricing(model_name: str, provider: str = None, base_url: str = None) -> bool: 37 """Check if a model has known pricing (vs unknown/custom endpoint).""" 38 return has_known_pricing(model_name, provider=provider, base_url=base_url) 39 40 41 def _estimate_cost( 42 session_or_model: Dict[str, Any] | str, 43 input_tokens: int = 0, 44 output_tokens: int = 0, 45 *, 46 cache_read_tokens: int = 0, 47 cache_write_tokens: int = 0, 48 provider: str = None, 49 base_url: str = None, 50 ) -> tuple[float, str]: 51 """Estimate the USD cost for a session row or a model/token tuple.""" 52 if isinstance(session_or_model, dict): 53 session = session_or_model 54 model = session.get("model") or "" 55 usage = CanonicalUsage( 56 input_tokens=session.get("input_tokens") or 0, 57 output_tokens=session.get("output_tokens") or 0, 58 cache_read_tokens=session.get("cache_read_tokens") or 0, 59 cache_write_tokens=session.get("cache_write_tokens") or 0, 60 ) 61 provider = session.get("billing_provider") 62 base_url = session.get("billing_base_url") 63 else: 64 model = session_or_model or "" 65 usage = CanonicalUsage( 66 input_tokens=input_tokens, 67 output_tokens=output_tokens, 68 cache_read_tokens=cache_read_tokens, 69 cache_write_tokens=cache_write_tokens, 70 ) 71 result = estimate_usage_cost( 72 model, 73 usage, 74 provider=provider, 75 base_url=base_url, 76 ) 77 return float(result.amount_usd or 0.0), result.status 78 79 80 def _format_duration(seconds: float) -> str: 81 """Format seconds into a human-readable duration string.""" 82 return format_duration_compact(seconds) 83 84 85 def _bar_chart(values: List[int], max_width: int = 20) -> List[str]: 86 """Create simple horizontal bar chart strings from values.""" 87 peak = max(values) if values else 1 88 if peak == 0: 89 return ["" for _ in values] 90 return ["█" * max(1, int(v / peak * max_width)) if v > 0 else "" for v in values] 91 92 93 class InsightsEngine: 94 """ 95 Analyzes session history and produces usage insights. 96 97 Works directly with a SessionDB instance (or raw sqlite3 connection) 98 to query session and message data. 99 """ 100 101 def __init__(self, db): 102 """ 103 Initialize with a SessionDB instance. 104 105 Args: 106 db: A SessionDB instance (from hermes_state.py) 107 """ 108 self.db = db 109 self._conn = db._conn 110 111 def generate(self, days: int = 30, source: str = None) -> Dict[str, Any]: 112 """ 113 Generate a complete insights report. 114 115 Args: 116 days: Number of days to look back (default: 30) 117 source: Optional filter by source platform 118 119 Returns: 120 Dict with all computed insights 121 """ 122 cutoff = time.time() - (days * 86400) 123 124 # Gather raw data 125 sessions = self._get_sessions(cutoff, source) 126 tool_usage = self._get_tool_usage(cutoff, source) 127 skill_usage = self._get_skill_usage(cutoff, source) 128 message_stats = self._get_message_stats(cutoff, source) 129 130 if not sessions: 131 return { 132 "days": days, 133 "source_filter": source, 134 "empty": True, 135 "overview": {}, 136 "models": [], 137 "platforms": [], 138 "tools": [], 139 "skills": { 140 "summary": { 141 "total_skill_loads": 0, 142 "total_skill_edits": 0, 143 "total_skill_actions": 0, 144 "distinct_skills_used": 0, 145 }, 146 "top_skills": [], 147 }, 148 "activity": {}, 149 "top_sessions": [], 150 } 151 152 # Compute insights 153 overview = self._compute_overview(sessions, message_stats) 154 models = self._compute_model_breakdown(sessions) 155 platforms = self._compute_platform_breakdown(sessions) 156 tools = self._compute_tool_breakdown(tool_usage) 157 skills = self._compute_skill_breakdown(skill_usage) 158 activity = self._compute_activity_patterns(sessions) 159 top_sessions = self._compute_top_sessions(sessions) 160 161 return { 162 "days": days, 163 "source_filter": source, 164 "empty": False, 165 "generated_at": time.time(), 166 "overview": overview, 167 "models": models, 168 "platforms": platforms, 169 "tools": tools, 170 "skills": skills, 171 "activity": activity, 172 "top_sessions": top_sessions, 173 } 174 175 # ========================================================================= 176 # Data gathering (SQL queries) 177 # ========================================================================= 178 179 # Columns we actually need (skip system_prompt, model_config blobs) 180 _SESSION_COLS = ("id, source, model, started_at, ended_at, " 181 "message_count, tool_call_count, input_tokens, output_tokens, " 182 "cache_read_tokens, cache_write_tokens, billing_provider, " 183 "billing_base_url, billing_mode, estimated_cost_usd, " 184 "actual_cost_usd, cost_status, cost_source") 185 186 # Pre-computed query strings — f-string evaluated once at class definition, 187 # not at runtime, so no user-controlled value can alter the query structure. 188 _GET_SESSIONS_WITH_SOURCE = ( 189 f"SELECT {_SESSION_COLS} FROM sessions" 190 " WHERE started_at >= ? AND source = ?" 191 " ORDER BY started_at DESC" 192 ) 193 _GET_SESSIONS_ALL = ( 194 f"SELECT {_SESSION_COLS} FROM sessions" 195 " WHERE started_at >= ?" 196 " ORDER BY started_at DESC" 197 ) 198 199 def _get_sessions(self, cutoff: float, source: str = None) -> List[Dict]: 200 """Fetch sessions within the time window.""" 201 if source: 202 cursor = self._conn.execute(self._GET_SESSIONS_WITH_SOURCE, (cutoff, source)) 203 else: 204 cursor = self._conn.execute(self._GET_SESSIONS_ALL, (cutoff,)) 205 return [dict(row) for row in cursor.fetchall()] 206 207 def _get_tool_usage(self, cutoff: float, source: str = None) -> List[Dict]: 208 """Get tool call counts from messages. 209 210 Uses two sources: 211 1. tool_name column on 'tool' role messages (set by gateway) 212 2. tool_calls JSON on 'assistant' role messages (covers CLI where 213 tool_name is not populated on tool responses) 214 """ 215 tool_counts = Counter() 216 217 # Source 1: explicit tool_name on tool response messages 218 if source: 219 cursor = self._conn.execute( 220 """SELECT m.tool_name, COUNT(*) as count 221 FROM messages m 222 JOIN sessions s ON s.id = m.session_id 223 WHERE s.started_at >= ? AND s.source = ? 224 AND m.role = 'tool' AND m.tool_name IS NOT NULL 225 GROUP BY m.tool_name 226 ORDER BY count DESC""", 227 (cutoff, source), 228 ) 229 else: 230 cursor = self._conn.execute( 231 """SELECT m.tool_name, COUNT(*) as count 232 FROM messages m 233 JOIN sessions s ON s.id = m.session_id 234 WHERE s.started_at >= ? 235 AND m.role = 'tool' AND m.tool_name IS NOT NULL 236 GROUP BY m.tool_name 237 ORDER BY count DESC""", 238 (cutoff,), 239 ) 240 for row in cursor.fetchall(): 241 tool_counts[row["tool_name"]] += row["count"] 242 243 # Source 2: extract from tool_calls JSON on assistant messages 244 # (covers CLI sessions where tool_name is NULL on tool responses) 245 if source: 246 cursor2 = self._conn.execute( 247 """SELECT m.tool_calls 248 FROM messages m 249 JOIN sessions s ON s.id = m.session_id 250 WHERE s.started_at >= ? AND s.source = ? 251 AND m.role = 'assistant' AND m.tool_calls IS NOT NULL""", 252 (cutoff, source), 253 ) 254 else: 255 cursor2 = self._conn.execute( 256 """SELECT m.tool_calls 257 FROM messages m 258 JOIN sessions s ON s.id = m.session_id 259 WHERE s.started_at >= ? 260 AND m.role = 'assistant' AND m.tool_calls IS NOT NULL""", 261 (cutoff,), 262 ) 263 264 tool_calls_counts = Counter() 265 for row in cursor2.fetchall(): 266 try: 267 calls = row["tool_calls"] 268 if isinstance(calls, str): 269 calls = json.loads(calls) 270 if isinstance(calls, list): 271 for call in calls: 272 func = call.get("function", {}) if isinstance(call, dict) else {} 273 name = func.get("name") 274 if name: 275 tool_calls_counts[name] += 1 276 except (json.JSONDecodeError, TypeError, AttributeError): 277 continue 278 279 # Merge: prefer tool_name source, supplement with tool_calls source 280 # for tools not already counted 281 if not tool_counts and tool_calls_counts: 282 # No tool_name data at all — use tool_calls exclusively 283 tool_counts = tool_calls_counts 284 elif tool_counts and tool_calls_counts: 285 # Both sources have data — use whichever has the higher count per tool 286 # (they may overlap, so take the max to avoid double-counting) 287 all_tools = set(tool_counts) | set(tool_calls_counts) 288 merged = Counter() 289 for tool in all_tools: 290 merged[tool] = max(tool_counts.get(tool, 0), tool_calls_counts.get(tool, 0)) 291 tool_counts = merged 292 293 # Convert to the expected format 294 return [ 295 {"tool_name": name, "count": count} 296 for name, count in tool_counts.most_common() 297 ] 298 299 def _get_skill_usage(self, cutoff: float, source: str = None) -> List[Dict]: 300 """Extract per-skill usage from assistant tool calls.""" 301 skill_counts: Dict[str, Dict[str, Any]] = {} 302 303 if source: 304 cursor = self._conn.execute( 305 """SELECT m.tool_calls, m.timestamp 306 FROM messages m 307 JOIN sessions s ON s.id = m.session_id 308 WHERE s.started_at >= ? AND s.source = ? 309 AND m.role = 'assistant' AND m.tool_calls IS NOT NULL""", 310 (cutoff, source), 311 ) 312 else: 313 cursor = self._conn.execute( 314 """SELECT m.tool_calls, m.timestamp 315 FROM messages m 316 JOIN sessions s ON s.id = m.session_id 317 WHERE s.started_at >= ? 318 AND m.role = 'assistant' AND m.tool_calls IS NOT NULL""", 319 (cutoff,), 320 ) 321 322 for row in cursor.fetchall(): 323 try: 324 calls = row["tool_calls"] 325 if isinstance(calls, str): 326 calls = json.loads(calls) 327 if not isinstance(calls, list): 328 continue 329 except (json.JSONDecodeError, TypeError): 330 continue 331 332 timestamp = row["timestamp"] 333 for call in calls: 334 if not isinstance(call, dict): 335 continue 336 func = call.get("function", {}) 337 tool_name = func.get("name") 338 if tool_name not in {"skill_view", "skill_manage"}: 339 continue 340 341 args = func.get("arguments") 342 if isinstance(args, str): 343 try: 344 args = json.loads(args) 345 except (json.JSONDecodeError, TypeError): 346 continue 347 if not isinstance(args, dict): 348 continue 349 350 skill_name = args.get("name") 351 if not isinstance(skill_name, str) or not skill_name.strip(): 352 continue 353 354 entry = skill_counts.setdefault( 355 skill_name, 356 { 357 "skill": skill_name, 358 "view_count": 0, 359 "manage_count": 0, 360 "last_used_at": None, 361 }, 362 ) 363 if tool_name == "skill_view": 364 entry["view_count"] += 1 365 else: 366 entry["manage_count"] += 1 367 368 if timestamp is not None and ( 369 entry["last_used_at"] is None or timestamp > entry["last_used_at"] 370 ): 371 entry["last_used_at"] = timestamp 372 373 return list(skill_counts.values()) 374 375 def _get_message_stats(self, cutoff: float, source: str = None) -> Dict: 376 """Get aggregate message statistics.""" 377 if source: 378 cursor = self._conn.execute( 379 """SELECT 380 COUNT(*) as total_messages, 381 SUM(CASE WHEN m.role = 'user' THEN 1 ELSE 0 END) as user_messages, 382 SUM(CASE WHEN m.role = 'assistant' THEN 1 ELSE 0 END) as assistant_messages, 383 SUM(CASE WHEN m.role = 'tool' THEN 1 ELSE 0 END) as tool_messages 384 FROM messages m 385 JOIN sessions s ON s.id = m.session_id 386 WHERE s.started_at >= ? AND s.source = ?""", 387 (cutoff, source), 388 ) 389 else: 390 cursor = self._conn.execute( 391 """SELECT 392 COUNT(*) as total_messages, 393 SUM(CASE WHEN m.role = 'user' THEN 1 ELSE 0 END) as user_messages, 394 SUM(CASE WHEN m.role = 'assistant' THEN 1 ELSE 0 END) as assistant_messages, 395 SUM(CASE WHEN m.role = 'tool' THEN 1 ELSE 0 END) as tool_messages 396 FROM messages m 397 JOIN sessions s ON s.id = m.session_id 398 WHERE s.started_at >= ?""", 399 (cutoff,), 400 ) 401 row = cursor.fetchone() 402 return dict(row) if row else { 403 "total_messages": 0, "user_messages": 0, 404 "assistant_messages": 0, "tool_messages": 0, 405 } 406 407 # ========================================================================= 408 # Computation 409 # ========================================================================= 410 411 def _compute_overview(self, sessions: List[Dict], message_stats: Dict) -> Dict: 412 """Compute high-level overview statistics.""" 413 total_input = sum(s.get("input_tokens") or 0 for s in sessions) 414 total_output = sum(s.get("output_tokens") or 0 for s in sessions) 415 total_cache_read = sum(s.get("cache_read_tokens") or 0 for s in sessions) 416 total_cache_write = sum(s.get("cache_write_tokens") or 0 for s in sessions) 417 total_tokens = total_input + total_output + total_cache_read + total_cache_write 418 total_tool_calls = sum(s.get("tool_call_count") or 0 for s in sessions) 419 total_messages = sum(s.get("message_count") or 0 for s in sessions) 420 421 # Cost estimation (weighted by model) 422 total_cost = 0.0 423 actual_cost = 0.0 424 models_with_pricing = set() 425 models_without_pricing = set() 426 unknown_cost_sessions = 0 427 included_cost_sessions = 0 428 for s in sessions: 429 model = s.get("model") or "" 430 estimated, status = _estimate_cost(s) 431 total_cost += estimated 432 actual_cost += s.get("actual_cost_usd") or 0.0 433 display = model.split("/")[-1] if "/" in model else (model or "unknown") 434 if status == "included": 435 included_cost_sessions += 1 436 elif status == "unknown": 437 unknown_cost_sessions += 1 438 if _has_known_pricing(model, s.get("billing_provider"), s.get("billing_base_url")): 439 models_with_pricing.add(display) 440 else: 441 models_without_pricing.add(display) 442 443 # Session duration stats (guard against negative durations from clock drift) 444 durations = [] 445 for s in sessions: 446 start = s.get("started_at") 447 end = s.get("ended_at") 448 if start and end and end > start: 449 durations.append(end - start) 450 451 total_hours = sum(durations) / 3600 if durations else 0 452 avg_duration = sum(durations) / len(durations) if durations else 0 453 454 # Earliest and latest session 455 started_timestamps = [s["started_at"] for s in sessions if s.get("started_at")] 456 date_range_start = min(started_timestamps) if started_timestamps else None 457 date_range_end = max(started_timestamps) if started_timestamps else None 458 459 return { 460 "total_sessions": len(sessions), 461 "total_messages": total_messages, 462 "total_tool_calls": total_tool_calls, 463 "total_input_tokens": total_input, 464 "total_output_tokens": total_output, 465 "total_cache_read_tokens": total_cache_read, 466 "total_cache_write_tokens": total_cache_write, 467 "total_tokens": total_tokens, 468 "estimated_cost": total_cost, 469 "actual_cost": actual_cost, 470 "total_hours": total_hours, 471 "avg_session_duration": avg_duration, 472 "avg_messages_per_session": total_messages / len(sessions) if sessions else 0, 473 "avg_tokens_per_session": total_tokens / len(sessions) if sessions else 0, 474 "user_messages": message_stats.get("user_messages") or 0, 475 "assistant_messages": message_stats.get("assistant_messages") or 0, 476 "tool_messages": message_stats.get("tool_messages") or 0, 477 "date_range_start": date_range_start, 478 "date_range_end": date_range_end, 479 "models_with_pricing": sorted(models_with_pricing), 480 "models_without_pricing": sorted(models_without_pricing), 481 "unknown_cost_sessions": unknown_cost_sessions, 482 "included_cost_sessions": included_cost_sessions, 483 } 484 485 def _compute_model_breakdown(self, sessions: List[Dict]) -> List[Dict]: 486 """Break down usage by model.""" 487 model_data = defaultdict(lambda: { 488 "sessions": 0, "input_tokens": 0, "output_tokens": 0, 489 "cache_read_tokens": 0, "cache_write_tokens": 0, 490 "total_tokens": 0, "tool_calls": 0, "cost": 0.0, 491 }) 492 493 for s in sessions: 494 model = s.get("model") or "unknown" 495 # Normalize: strip provider prefix for display 496 display_model = model.split("/")[-1] if "/" in model else model 497 d = model_data[display_model] 498 d["sessions"] += 1 499 inp = s.get("input_tokens") or 0 500 out = s.get("output_tokens") or 0 501 cache_read = s.get("cache_read_tokens") or 0 502 cache_write = s.get("cache_write_tokens") or 0 503 d["input_tokens"] += inp 504 d["output_tokens"] += out 505 d["cache_read_tokens"] += cache_read 506 d["cache_write_tokens"] += cache_write 507 d["total_tokens"] += inp + out + cache_read + cache_write 508 d["tool_calls"] += s.get("tool_call_count") or 0 509 estimate, status = _estimate_cost(s) 510 d["cost"] += estimate 511 d["has_pricing"] = _has_known_pricing(model, s.get("billing_provider"), s.get("billing_base_url")) 512 d["cost_status"] = status 513 514 result = [ 515 {"model": model, **data} 516 for model, data in model_data.items() 517 ] 518 # Sort by tokens first, fall back to session count when tokens are 0 519 result.sort(key=lambda x: (x["total_tokens"], x["sessions"]), reverse=True) 520 return result 521 522 def _compute_platform_breakdown(self, sessions: List[Dict]) -> List[Dict]: 523 """Break down usage by platform/source.""" 524 platform_data = defaultdict(lambda: { 525 "sessions": 0, "messages": 0, "input_tokens": 0, 526 "output_tokens": 0, "cache_read_tokens": 0, 527 "cache_write_tokens": 0, "total_tokens": 0, "tool_calls": 0, 528 }) 529 530 for s in sessions: 531 source = s.get("source") or "unknown" 532 d = platform_data[source] 533 d["sessions"] += 1 534 d["messages"] += s.get("message_count") or 0 535 inp = s.get("input_tokens") or 0 536 out = s.get("output_tokens") or 0 537 cache_read = s.get("cache_read_tokens") or 0 538 cache_write = s.get("cache_write_tokens") or 0 539 d["input_tokens"] += inp 540 d["output_tokens"] += out 541 d["cache_read_tokens"] += cache_read 542 d["cache_write_tokens"] += cache_write 543 d["total_tokens"] += inp + out + cache_read + cache_write 544 d["tool_calls"] += s.get("tool_call_count") or 0 545 546 result = [ 547 {"platform": platform, **data} 548 for platform, data in platform_data.items() 549 ] 550 result.sort(key=lambda x: x["sessions"], reverse=True) 551 return result 552 553 def _compute_tool_breakdown(self, tool_usage: List[Dict]) -> List[Dict]: 554 """Process tool usage data into a ranked list with percentages.""" 555 total_calls = sum(t["count"] for t in tool_usage) if tool_usage else 0 556 result = [] 557 for t in tool_usage: 558 pct = (t["count"] / total_calls * 100) if total_calls else 0 559 result.append({ 560 "tool": t["tool_name"], 561 "count": t["count"], 562 "percentage": pct, 563 }) 564 return result 565 566 def _compute_skill_breakdown(self, skill_usage: List[Dict]) -> Dict[str, Any]: 567 """Process per-skill usage into summary + ranked list.""" 568 total_skill_loads = sum(s["view_count"] for s in skill_usage) if skill_usage else 0 569 total_skill_edits = sum(s["manage_count"] for s in skill_usage) if skill_usage else 0 570 total_skill_actions = total_skill_loads + total_skill_edits 571 572 top_skills = [] 573 for skill in skill_usage: 574 total_count = skill["view_count"] + skill["manage_count"] 575 percentage = (total_count / total_skill_actions * 100) if total_skill_actions else 0 576 top_skills.append({ 577 "skill": skill["skill"], 578 "view_count": skill["view_count"], 579 "manage_count": skill["manage_count"], 580 "total_count": total_count, 581 "percentage": percentage, 582 "last_used_at": skill.get("last_used_at"), 583 }) 584 585 top_skills.sort( 586 key=lambda s: ( 587 s["total_count"], 588 s["view_count"], 589 s["manage_count"], 590 s["last_used_at"] or 0, 591 s["skill"], 592 ), 593 reverse=True, 594 ) 595 596 return { 597 "summary": { 598 "total_skill_loads": total_skill_loads, 599 "total_skill_edits": total_skill_edits, 600 "total_skill_actions": total_skill_actions, 601 "distinct_skills_used": len(skill_usage), 602 }, 603 "top_skills": top_skills, 604 } 605 606 def _compute_activity_patterns(self, sessions: List[Dict]) -> Dict: 607 """Analyze activity patterns by day of week and hour.""" 608 day_counts = Counter() # 0=Monday ... 6=Sunday 609 hour_counts = Counter() 610 daily_counts = Counter() # date string -> count 611 612 for s in sessions: 613 ts = s.get("started_at") 614 if not ts: 615 continue 616 dt = datetime.fromtimestamp(ts) 617 day_counts[dt.weekday()] += 1 618 hour_counts[dt.hour] += 1 619 daily_counts[dt.strftime("%Y-%m-%d")] += 1 620 621 day_names = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"] 622 day_breakdown = [ 623 {"day": day_names[i], "count": day_counts.get(i, 0)} 624 for i in range(7) 625 ] 626 627 hour_breakdown = [ 628 {"hour": i, "count": hour_counts.get(i, 0)} 629 for i in range(24) 630 ] 631 632 # Busiest day and hour 633 busiest_day = max(day_breakdown, key=lambda x: x["count"]) if day_breakdown else None 634 busiest_hour = max(hour_breakdown, key=lambda x: x["count"]) if hour_breakdown else None 635 636 # Active days (days with at least one session) 637 active_days = len(daily_counts) 638 639 # Streak calculation 640 if daily_counts: 641 all_dates = sorted(daily_counts.keys()) 642 current_streak = 1 643 max_streak = 1 644 for i in range(1, len(all_dates)): 645 d1 = datetime.strptime(all_dates[i - 1], "%Y-%m-%d") 646 d2 = datetime.strptime(all_dates[i], "%Y-%m-%d") 647 if (d2 - d1).days == 1: 648 current_streak += 1 649 max_streak = max(max_streak, current_streak) 650 else: 651 current_streak = 1 652 else: 653 max_streak = 0 654 655 return { 656 "by_day": day_breakdown, 657 "by_hour": hour_breakdown, 658 "busiest_day": busiest_day, 659 "busiest_hour": busiest_hour, 660 "active_days": active_days, 661 "max_streak": max_streak, 662 } 663 664 def _compute_top_sessions(self, sessions: List[Dict]) -> List[Dict]: 665 """Find notable sessions (longest, most messages, most tokens).""" 666 top = [] 667 668 # Longest by duration 669 sessions_with_duration = [ 670 s for s in sessions 671 if s.get("started_at") and s.get("ended_at") 672 ] 673 if sessions_with_duration: 674 longest = max( 675 sessions_with_duration, 676 key=lambda s: (s["ended_at"] - s["started_at"]), 677 ) 678 dur = longest["ended_at"] - longest["started_at"] 679 top.append({ 680 "label": "Longest session", 681 "session_id": longest["id"][:16], 682 "value": _format_duration(dur), 683 "date": datetime.fromtimestamp(longest["started_at"]).strftime("%b %d"), 684 }) 685 686 # Most messages 687 most_msgs = max(sessions, key=lambda s: s.get("message_count") or 0) 688 if (most_msgs.get("message_count") or 0) > 0: 689 top.append({ 690 "label": "Most messages", 691 "session_id": most_msgs["id"][:16], 692 "value": f"{most_msgs['message_count']} msgs", 693 "date": datetime.fromtimestamp(most_msgs["started_at"]).strftime("%b %d") if most_msgs.get("started_at") else "?", 694 }) 695 696 # Most tokens 697 most_tokens = max( 698 sessions, 699 key=lambda s: (s.get("input_tokens") or 0) + (s.get("output_tokens") or 0), 700 ) 701 token_total = (most_tokens.get("input_tokens") or 0) + (most_tokens.get("output_tokens") or 0) 702 if token_total > 0: 703 top.append({ 704 "label": "Most tokens", 705 "session_id": most_tokens["id"][:16], 706 "value": f"{token_total:,} tokens", 707 "date": datetime.fromtimestamp(most_tokens["started_at"]).strftime("%b %d") if most_tokens.get("started_at") else "?", 708 }) 709 710 # Most tool calls 711 most_tools = max(sessions, key=lambda s: s.get("tool_call_count") or 0) 712 if (most_tools.get("tool_call_count") or 0) > 0: 713 top.append({ 714 "label": "Most tool calls", 715 "session_id": most_tools["id"][:16], 716 "value": f"{most_tools['tool_call_count']} calls", 717 "date": datetime.fromtimestamp(most_tools["started_at"]).strftime("%b %d") if most_tools.get("started_at") else "?", 718 }) 719 720 return top 721 722 # ========================================================================= 723 # Formatting 724 # ========================================================================= 725 726 def format_terminal(self, report: Dict) -> str: 727 """Format the insights report for terminal display (CLI).""" 728 if report.get("empty"): 729 days = report.get("days", 30) 730 src = f" (source: {report['source_filter']})" if report.get("source_filter") else "" 731 return f" No sessions found in the last {days} days{src}." 732 733 lines = [] 734 o = report["overview"] 735 days = report["days"] 736 src_filter = report.get("source_filter") 737 738 # Header 739 lines.append("") 740 lines.append(" ╔══════════════════════════════════════════════════════════╗") 741 lines.append(" ║ 📊 Hermes Insights ║") 742 period_label = f"Last {days} days" 743 if src_filter: 744 period_label += f" ({src_filter})" 745 padding = 58 - len(period_label) - 2 746 left_pad = padding // 2 747 right_pad = padding - left_pad 748 lines.append(f" ║{' ' * left_pad} {period_label} {' ' * right_pad}║") 749 lines.append(" ╚══════════════════════════════════════════════════════════╝") 750 lines.append("") 751 752 # Date range 753 if o.get("date_range_start") and o.get("date_range_end"): 754 start_str = datetime.fromtimestamp(o["date_range_start"]).strftime("%b %d, %Y") 755 end_str = datetime.fromtimestamp(o["date_range_end"]).strftime("%b %d, %Y") 756 lines.append(f" Period: {start_str} — {end_str}") 757 lines.append("") 758 759 # Overview 760 lines.append(" 📋 Overview") 761 lines.append(" " + "─" * 56) 762 lines.append(f" Sessions: {o['total_sessions']:<12} Messages: {o['total_messages']:,}") 763 lines.append(f" Tool calls: {o['total_tool_calls']:<12,} User messages: {o['user_messages']:,}") 764 lines.append(f" Input tokens: {o['total_input_tokens']:<12,} Output tokens: {o['total_output_tokens']:,}") 765 lines.append(f" Total tokens: {o['total_tokens']:,}") 766 if o["total_hours"] > 0: 767 lines.append(f" Active time: ~{_format_duration(o['total_hours'] * 3600):<11} Avg session: ~{_format_duration(o['avg_session_duration'])}") 768 lines.append(f" Avg msgs/session: {o['avg_messages_per_session']:.1f}") 769 lines.append("") 770 771 # Model breakdown 772 if report["models"]: 773 lines.append(" 🤖 Models Used") 774 lines.append(" " + "─" * 56) 775 lines.append(f" {'Model':<30} {'Sessions':>8} {'Tokens':>12}") 776 for m in report["models"]: 777 model_name = m["model"][:28] 778 lines.append(f" {model_name:<30} {m['sessions']:>8} {m['total_tokens']:>12,}") 779 lines.append("") 780 781 # Platform breakdown 782 if len(report["platforms"]) > 1 or (report["platforms"] and report["platforms"][0]["platform"] != "cli"): 783 lines.append(" 📱 Platforms") 784 lines.append(" " + "─" * 56) 785 lines.append(f" {'Platform':<14} {'Sessions':>8} {'Messages':>10} {'Tokens':>14}") 786 for p in report["platforms"]: 787 lines.append(f" {p['platform']:<14} {p['sessions']:>8} {p['messages']:>10,} {p['total_tokens']:>14,}") 788 lines.append("") 789 790 # Tool usage 791 if report["tools"]: 792 lines.append(" 🔧 Top Tools") 793 lines.append(" " + "─" * 56) 794 lines.append(f" {'Tool':<28} {'Calls':>8} {'%':>8}") 795 for t in report["tools"][:15]: # Top 15 796 lines.append(f" {t['tool']:<28} {t['count']:>8,} {t['percentage']:>7.1f}%") 797 if len(report["tools"]) > 15: 798 lines.append(f" ... and {len(report['tools']) - 15} more tools") 799 lines.append("") 800 801 # Skill usage 802 skills = report.get("skills", {}) 803 top_skills = skills.get("top_skills", []) 804 if top_skills: 805 lines.append(" 🧠 Top Skills") 806 lines.append(" " + "─" * 56) 807 lines.append(f" {'Skill':<28} {'Loads':>7} {'Edits':>7} {'Last used':>11}") 808 for skill in top_skills[:10]: 809 last_used = "—" 810 if skill.get("last_used_at"): 811 last_used = datetime.fromtimestamp(skill["last_used_at"]).strftime("%b %d") 812 lines.append( 813 f" {skill['skill'][:28]:<28} {skill['view_count']:>7,} {skill['manage_count']:>7,} {last_used:>11}" 814 ) 815 summary = skills.get("summary", {}) 816 lines.append( 817 f" Distinct skills: {summary.get('distinct_skills_used', 0)} " 818 f"Loads: {summary.get('total_skill_loads', 0):,} " 819 f"Edits: {summary.get('total_skill_edits', 0):,}" 820 ) 821 lines.append("") 822 823 # Activity patterns 824 act = report.get("activity", {}) 825 if act.get("by_day"): 826 lines.append(" 📅 Activity Patterns") 827 lines.append(" " + "─" * 56) 828 829 # Day of week chart 830 day_values = [d["count"] for d in act["by_day"]] 831 bars = _bar_chart(day_values, max_width=15) 832 for i, d in enumerate(act["by_day"]): 833 bar = bars[i] 834 lines.append(f" {d['day']} {bar:<15} {d['count']}") 835 836 lines.append("") 837 838 # Peak hours (show top 5 busiest hours) 839 busy_hours = sorted(act["by_hour"], key=lambda x: x["count"], reverse=True) 840 busy_hours = [h for h in busy_hours if h["count"] > 0][:5] 841 if busy_hours: 842 hour_strs = [] 843 for h in busy_hours: 844 hr = h["hour"] 845 ampm = "AM" if hr < 12 else "PM" 846 display_hr = hr % 12 or 12 847 hour_strs.append(f"{display_hr}{ampm} ({h['count']})") 848 lines.append(f" Peak hours: {', '.join(hour_strs)}") 849 850 if act.get("active_days"): 851 lines.append(f" Active days: {act['active_days']}") 852 if act.get("max_streak") and act["max_streak"] > 1: 853 lines.append(f" Best streak: {act['max_streak']} consecutive days") 854 lines.append("") 855 856 # Notable sessions 857 if report.get("top_sessions"): 858 lines.append(" 🏆 Notable Sessions") 859 lines.append(" " + "─" * 56) 860 for ts in report["top_sessions"]: 861 lines.append(f" {ts['label']:<20} {ts['value']:<18} ({ts['date']}, {ts['session_id']})") 862 lines.append("") 863 864 return "\n".join(lines) 865 866 def format_gateway(self, report: Dict) -> str: 867 """Format the insights report for gateway/messaging (shorter).""" 868 if report.get("empty"): 869 days = report.get("days", 30) 870 return f"No sessions found in the last {days} days." 871 872 lines = [] 873 o = report["overview"] 874 days = report["days"] 875 876 lines.append(f"📊 **Hermes Insights** — Last {days} days\n") 877 878 # Overview 879 lines.append(f"**Sessions:** {o['total_sessions']} | **Messages:** {o['total_messages']:,} | **Tool calls:** {o['total_tool_calls']:,}") 880 lines.append(f"**Tokens:** {o['total_tokens']:,} (in: {o['total_input_tokens']:,} / out: {o['total_output_tokens']:,})") 881 if o["total_hours"] > 0: 882 lines.append(f"**Active time:** ~{_format_duration(o['total_hours'] * 3600)} | **Avg session:** ~{_format_duration(o['avg_session_duration'])}") 883 lines.append("") 884 885 # Models (top 5) 886 if report["models"]: 887 lines.append("**🤖 Models:**") 888 for m in report["models"][:5]: 889 lines.append(f" {m['model'][:25]} — {m['sessions']} sessions, {m['total_tokens']:,} tokens") 890 lines.append("") 891 892 # Platforms (if multi-platform) 893 if len(report["platforms"]) > 1: 894 lines.append("**📱 Platforms:**") 895 for p in report["platforms"]: 896 lines.append(f" {p['platform']} — {p['sessions']} sessions, {p['messages']:,} msgs") 897 lines.append("") 898 899 # Tools (top 8) 900 if report["tools"]: 901 lines.append("**🔧 Top Tools:**") 902 for t in report["tools"][:8]: 903 lines.append(f" {t['tool']} — {t['count']:,} calls ({t['percentage']:.1f}%)") 904 lines.append("") 905 906 skills = report.get("skills", {}) 907 if skills.get("top_skills"): 908 lines.append("**🧠 Top Skills:**") 909 for skill in skills["top_skills"][:5]: 910 suffix = "" 911 if skill.get("last_used_at"): 912 suffix = f", last used {datetime.fromtimestamp(skill['last_used_at']).strftime('%b %d')}" 913 lines.append( 914 f" {skill['skill']} — {skill['view_count']:,} loads, {skill['manage_count']:,} edits{suffix}" 915 ) 916 lines.append("") 917 918 # Activity summary 919 act = report.get("activity", {}) 920 if act.get("busiest_day") and act.get("busiest_hour"): 921 hr = act["busiest_hour"]["hour"] 922 ampm = "AM" if hr < 12 else "PM" 923 display_hr = hr % 12 or 12 924 lines.append(f"**📅 Busiest:** {act['busiest_day']['day']}s ({act['busiest_day']['count']} sessions), {display_hr}{ampm} ({act['busiest_hour']['count']} sessions)") 925 if act.get("active_days"): 926 lines.append(f"**Active days:** {act['active_days']}", ) 927 if act.get("max_streak", 0) > 1: 928 lines.append(f"**Best streak:** {act['max_streak']} consecutive days") 929 930 return "\n".join(lines)