session_search_tool.py
1 #!/usr/bin/env python3 2 """ 3 Session Search Tool - Long-Term Conversation Recall 4 5 Searches past session transcripts in SQLite via FTS5, then summarizes the top 6 matching sessions using the configured auxiliary session_search model (same 7 pattern as web_extract). By default, auxiliary "auto" routing uses the main 8 chat provider/model unless the user overrides auxiliary.session_search. 9 Returns focused summaries of past conversations rather than raw transcripts, 10 keeping the main model's context window clean. 11 12 Flow: 13 1. FTS5 search finds matching messages ranked by relevance 14 2. Groups by session, takes the top N unique sessions (default 3) 15 3. Loads each session's conversation, truncates to ~100k chars centered on matches 16 4. Sends to the configured auxiliary model with a focused summarization prompt 17 5. Returns per-session summaries with metadata 18 """ 19 20 import asyncio 21 import concurrent.futures 22 import json 23 import logging 24 import re 25 from typing import Dict, Any, List, Optional, Union 26 27 from agent.auxiliary_client import async_call_llm, extract_content_or_reasoning 28 MAX_SESSION_CHARS = 100_000 29 MAX_SUMMARY_TOKENS = 10000 30 31 32 def _get_session_search_max_concurrency(default: int = 3) -> int: 33 """Read auxiliary.session_search.max_concurrency with sane bounds.""" 34 try: 35 from hermes_cli.config import load_config 36 config = load_config() 37 except ImportError: 38 return default 39 aux = config.get("auxiliary", {}) if isinstance(config, dict) else {} 40 task_config = aux.get("session_search", {}) if isinstance(aux, dict) else {} 41 if not isinstance(task_config, dict): 42 return default 43 raw = task_config.get("max_concurrency") 44 if raw is None: 45 return default 46 try: 47 value = int(raw) 48 except (TypeError, ValueError): 49 return default 50 return max(1, min(value, 5)) 51 52 53 def _format_timestamp(ts: Union[int, float, str, None]) -> str: 54 """Convert a Unix timestamp (float/int) or ISO string to a human-readable date. 55 56 Returns "unknown" for None, str(ts) if conversion fails. 57 """ 58 if ts is None: 59 return "unknown" 60 try: 61 if isinstance(ts, (int, float)): 62 from datetime import datetime 63 dt = datetime.fromtimestamp(ts) 64 return dt.strftime("%B %d, %Y at %I:%M %p") 65 if isinstance(ts, str): 66 if ts.replace(".", "").replace("-", "").isdigit(): 67 from datetime import datetime 68 dt = datetime.fromtimestamp(float(ts)) 69 return dt.strftime("%B %d, %Y at %I:%M %p") 70 return ts 71 except (ValueError, OSError, OverflowError) as e: 72 # Log specific errors for debugging while gracefully handling edge cases 73 logging.debug("Failed to format timestamp %s: %s", ts, e, exc_info=True) 74 except Exception as e: 75 logging.debug("Unexpected error formatting timestamp %s: %s", ts, e, exc_info=True) 76 return str(ts) 77 78 79 def _format_conversation(messages: List[Dict[str, Any]]) -> str: 80 """Format session messages into a readable transcript for summarization.""" 81 parts = [] 82 for msg in messages: 83 role = msg.get("role", "unknown").upper() 84 content = msg.get("content") or "" 85 tool_name = msg.get("tool_name") 86 87 if role == "TOOL" and tool_name: 88 # Truncate long tool outputs 89 if len(content) > 500: 90 content = content[:250] + "\n...[truncated]...\n" + content[-250:] 91 parts.append(f"[TOOL:{tool_name}]: {content}") 92 elif role == "ASSISTANT": 93 # Include tool call names if present 94 tool_calls = msg.get("tool_calls") 95 if tool_calls and isinstance(tool_calls, list): 96 tc_names = [] 97 for tc in tool_calls: 98 if isinstance(tc, dict): 99 name = tc.get("name") or tc.get("function", {}).get("name", "?") 100 tc_names.append(name) 101 if tc_names: 102 parts.append(f"[ASSISTANT]: [Called: {', '.join(tc_names)}]") 103 if content: 104 parts.append(f"[ASSISTANT]: {content}") 105 else: 106 parts.append(f"[ASSISTANT]: {content}") 107 else: 108 parts.append(f"[{role}]: {content}") 109 110 return "\n\n".join(parts) 111 112 113 def _truncate_around_matches( 114 full_text: str, query: str, max_chars: int = MAX_SESSION_CHARS 115 ) -> str: 116 """ 117 Truncate a conversation transcript to *max_chars*, choosing a window 118 that maximises coverage of positions where the *query* actually appears. 119 120 Strategy (in priority order): 121 1. Try to find the full query as a phrase (case-insensitive). 122 2. If no phrase hit, look for positions where all query terms appear 123 within a 200-char proximity window (co-occurrence). 124 3. Fall back to individual term positions. 125 126 Once candidate positions are collected the function picks the window 127 start that covers the most of them. 128 """ 129 if len(full_text) <= max_chars: 130 return full_text 131 132 text_lower = full_text.lower() 133 query_lower = query.lower().strip() 134 match_positions: list[int] = [] 135 136 # --- 1. Full-phrase search ------------------------------------------------ 137 phrase_pat = re.compile(re.escape(query_lower)) 138 match_positions = [m.start() for m in phrase_pat.finditer(text_lower)] 139 140 # --- 2. Proximity co-occurrence of all terms (within 200 chars) ----------- 141 if not match_positions: 142 terms = query_lower.split() 143 if len(terms) > 1: 144 # Collect every occurrence of each term 145 term_positions: dict[str, list[int]] = {} 146 for t in terms: 147 term_positions[t] = [ 148 m.start() for m in re.finditer(re.escape(t), text_lower) 149 ] 150 # Slide through positions of the rarest term and check proximity 151 rarest = min(terms, key=lambda t: len(term_positions.get(t, []))) 152 for pos in term_positions.get(rarest, []): 153 if all( 154 any(abs(p - pos) < 200 for p in term_positions.get(t, [])) 155 for t in terms 156 if t != rarest 157 ): 158 match_positions.append(pos) 159 160 # --- 3. Individual term positions (last resort) --------------------------- 161 if not match_positions: 162 terms = query_lower.split() 163 for t in terms: 164 for m in re.finditer(re.escape(t), text_lower): 165 match_positions.append(m.start()) 166 167 if not match_positions: 168 # Nothing at all — take from the start 169 truncated = full_text[:max_chars] 170 suffix = "\n\n...[later conversation truncated]..." if max_chars < len(full_text) else "" 171 return truncated + suffix 172 173 # --- Pick window that covers the most match positions --------------------- 174 match_positions.sort() 175 176 best_start = 0 177 best_count = 0 178 for candidate in match_positions: 179 ws = max(0, candidate - max_chars // 4) # bias: 25% before, 75% after 180 we = ws + max_chars 181 if we > len(full_text): 182 ws = max(0, len(full_text) - max_chars) 183 we = len(full_text) 184 count = sum(1 for p in match_positions if ws <= p < we) 185 if count > best_count: 186 best_count = count 187 best_start = ws 188 189 start = best_start 190 end = min(len(full_text), start + max_chars) 191 192 truncated = full_text[start:end] 193 prefix = "...[earlier conversation truncated]...\n\n" if start > 0 else "" 194 suffix = "\n\n...[later conversation truncated]..." if end < len(full_text) else "" 195 return prefix + truncated + suffix 196 197 198 async def _summarize_session( 199 conversation_text: str, query: str, session_meta: Dict[str, Any] 200 ) -> Optional[str]: 201 """Summarize a single session conversation focused on the search query.""" 202 system_prompt = ( 203 "You are reviewing a past conversation transcript to help recall what happened. " 204 "Summarize the conversation with a focus on the search topic. Include:\n" 205 "1. What the user asked about or wanted to accomplish\n" 206 "2. What actions were taken and what the outcomes were\n" 207 "3. Key decisions, solutions found, or conclusions reached\n" 208 "4. Any specific commands, files, URLs, or technical details that were important\n" 209 "5. Anything left unresolved or notable\n\n" 210 "Be thorough but concise. Preserve specific details (commands, paths, error messages) " 211 "that would be useful to recall. Write in past tense as a factual recap." 212 ) 213 214 source = session_meta.get("source", "unknown") 215 started = _format_timestamp(session_meta.get("started_at")) 216 217 user_prompt = ( 218 f"Search topic: {query}\n" 219 f"Session source: {source}\n" 220 f"Session date: {started}\n\n" 221 f"CONVERSATION TRANSCRIPT:\n{conversation_text}\n\n" 222 f"Summarize this conversation with focus on: {query}" 223 ) 224 225 max_retries = 3 226 for attempt in range(max_retries): 227 try: 228 response = await async_call_llm( 229 task="session_search", 230 messages=[ 231 {"role": "system", "content": system_prompt}, 232 {"role": "user", "content": user_prompt}, 233 ], 234 temperature=0.1, 235 max_tokens=MAX_SUMMARY_TOKENS, 236 ) 237 content = extract_content_or_reasoning(response) 238 if content: 239 return content 240 # Reasoning-only / empty — let the retry loop handle it 241 logging.warning("Session search LLM returned empty content (attempt %d/%d)", attempt + 1, max_retries) 242 if attempt < max_retries - 1: 243 await asyncio.sleep(1 * (attempt + 1)) 244 continue 245 return content 246 except RuntimeError: 247 logging.warning("No auxiliary model available for session summarization") 248 return None 249 except Exception as e: 250 if attempt < max_retries - 1: 251 await asyncio.sleep(1 * (attempt + 1)) 252 else: 253 logging.warning( 254 "Session summarization failed after %d attempts: %s", 255 max_retries, 256 e, 257 exc_info=True, 258 ) 259 return None 260 261 262 # Sources that are excluded from session browsing/searching by default. 263 # Third-party integrations (Paperclip agents, etc.) tag their sessions with 264 # HERMES_SESSION_SOURCE=tool so they don't clutter the user's session history. 265 _HIDDEN_SESSION_SOURCES = ("tool",) 266 267 268 def _list_recent_sessions(db, limit: int, current_session_id: str = None) -> str: 269 """Return metadata for the most recent sessions (no LLM calls).""" 270 try: 271 sessions = db.list_sessions_rich( 272 limit=limit + 5, 273 exclude_sources=list(_HIDDEN_SESSION_SOURCES), 274 order_by_last_active=True, 275 ) # fetch extra to skip current 276 277 # Resolve current session lineage to exclude it 278 current_root = None 279 if current_session_id: 280 try: 281 sid = current_session_id 282 visited = set() 283 current_root = current_session_id 284 while sid and sid not in visited: 285 visited.add(sid) 286 current_root = sid 287 s = db.get_session(sid) 288 parent = s.get("parent_session_id") if s else None 289 sid = parent if parent else None 290 except Exception: 291 current_root = current_session_id 292 293 results = [] 294 for s in sessions: 295 sid = s.get("id", "") 296 if current_root and (sid == current_root or sid == current_session_id): 297 continue 298 # Skip child/delegation sessions (they have parent_session_id) 299 if s.get("parent_session_id"): 300 continue 301 results.append({ 302 "session_id": sid, 303 "title": s.get("title") or None, 304 "source": s.get("source", ""), 305 "started_at": s.get("started_at", ""), 306 "last_active": s.get("last_active", ""), 307 "message_count": s.get("message_count", 0), 308 "preview": s.get("preview", ""), 309 }) 310 if len(results) >= limit: 311 break 312 313 return json.dumps({ 314 "success": True, 315 "mode": "recent", 316 "results": results, 317 "count": len(results), 318 "message": f"Showing {len(results)} most recent sessions. Use a keyword query to search specific topics.", 319 }, ensure_ascii=False) 320 except Exception as e: 321 logging.error("Error listing recent sessions: %s", e, exc_info=True) 322 return tool_error(f"Failed to list recent sessions: {e}", success=False) 323 324 325 def session_search( 326 query: str, 327 role_filter: str = None, 328 limit: int = 3, 329 db=None, 330 current_session_id: str = None, 331 ) -> str: 332 """ 333 Search past sessions and return focused summaries of matching conversations. 334 335 Uses FTS5 to find matches, then summarizes the top sessions with the 336 configured auxiliary session_search model. 337 The current session is excluded from results since the agent already has that context. 338 """ 339 if db is None: 340 return tool_error("Session database not available.", success=False) 341 342 # Defensive: models (especially open-source) may send non-int limit values 343 # (None when JSON null, string "int", or even a type object). Coerce to a 344 # safe integer before any arithmetic/comparison to prevent TypeError. 345 if not isinstance(limit, int): 346 try: 347 limit = int(limit) 348 except (TypeError, ValueError): 349 limit = 3 350 limit = max(1, min(limit, 5)) # Clamp to [1, 5] 351 352 # Recent sessions mode: when query is empty, return metadata for recent sessions. 353 # No LLM calls — just DB queries for titles, previews, timestamps. 354 if not query or not query.strip(): 355 return _list_recent_sessions(db, limit, current_session_id) 356 357 query = query.strip() 358 359 try: 360 # Parse role filter 361 role_list = None 362 if role_filter and role_filter.strip(): 363 role_list = [r.strip() for r in role_filter.split(",") if r.strip()] 364 365 # FTS5 search -- get matches ranked by relevance 366 raw_results = db.search_messages( 367 query=query, 368 role_filter=role_list, 369 exclude_sources=list(_HIDDEN_SESSION_SOURCES), 370 limit=50, # Get more matches to find unique sessions 371 offset=0, 372 ) 373 374 if not raw_results: 375 return json.dumps({ 376 "success": True, 377 "query": query, 378 "results": [], 379 "count": 0, 380 "message": "No matching sessions found.", 381 }, ensure_ascii=False) 382 383 # Resolve child sessions to their parent — delegation stores detailed 384 # content in child sessions, but the user's conversation is the parent. 385 def _resolve_to_parent(session_id: str) -> str: 386 """Walk delegation chain to find the root parent session ID.""" 387 visited = set() 388 sid = session_id 389 while sid and sid not in visited: 390 visited.add(sid) 391 try: 392 session = db.get_session(sid) 393 if not session: 394 break 395 parent = session.get("parent_session_id") 396 if parent: 397 sid = parent 398 else: 399 break 400 except Exception as e: 401 logging.debug( 402 "Error resolving parent for session %s: %s", 403 sid, 404 e, 405 exc_info=True, 406 ) 407 break 408 return sid 409 410 current_lineage_root = ( 411 _resolve_to_parent(current_session_id) if current_session_id else None 412 ) 413 414 # Group by resolved (parent) session_id, dedup, skip the current 415 # session lineage. Compression and delegation create child sessions 416 # that still belong to the same active conversation. 417 seen_sessions = {} 418 for result in raw_results: 419 raw_sid = result["session_id"] 420 resolved_sid = _resolve_to_parent(raw_sid) 421 # Skip the current session lineage — the agent already has that 422 # context, even if older turns live in parent fragments. 423 if current_lineage_root and resolved_sid == current_lineage_root: 424 continue 425 if current_session_id and raw_sid == current_session_id: 426 continue 427 if resolved_sid not in seen_sessions: 428 result = dict(result) 429 result["session_id"] = resolved_sid 430 seen_sessions[resolved_sid] = result 431 if len(seen_sessions) >= limit: 432 break 433 434 # Prepare all sessions for parallel summarization 435 tasks = [] 436 for session_id, match_info in seen_sessions.items(): 437 try: 438 messages = db.get_messages_as_conversation(session_id) 439 if not messages: 440 continue 441 session_meta = db.get_session(session_id) or {} 442 conversation_text = _format_conversation(messages) 443 conversation_text = _truncate_around_matches(conversation_text, query) 444 tasks.append((session_id, match_info, conversation_text, session_meta)) 445 except Exception as e: 446 logging.warning( 447 "Failed to prepare session %s: %s", 448 session_id, 449 e, 450 exc_info=True, 451 ) 452 453 # Summarize all sessions in parallel 454 async def _summarize_all() -> List[Union[str, Exception]]: 455 """Summarize all sessions with bounded concurrency.""" 456 max_concurrency = min(_get_session_search_max_concurrency(), max(1, len(tasks))) 457 semaphore = asyncio.Semaphore(max_concurrency) 458 459 async def _bounded_summary(text: str, meta: Dict[str, Any]) -> Optional[str]: 460 async with semaphore: 461 return await _summarize_session(text, query, meta) 462 463 coros = [ 464 _bounded_summary(text, meta) 465 for _, _, text, meta in tasks 466 ] 467 return await asyncio.gather(*coros, return_exceptions=True) 468 469 try: 470 # Use _run_async() which properly manages event loops across 471 # CLI, gateway, and worker-thread contexts. The previous 472 # pattern (asyncio.run() in a ThreadPoolExecutor) created a 473 # disposable event loop that conflicted with cached 474 # AsyncOpenAI/httpx clients bound to a different loop, 475 # causing deadlocks in gateway mode (#2681). 476 from model_tools import _run_async 477 results = _run_async(_summarize_all()) 478 except concurrent.futures.TimeoutError: 479 logging.warning( 480 "Session summarization timed out after 60 seconds", 481 exc_info=True, 482 ) 483 return json.dumps({ 484 "success": False, 485 "error": "Session summarization timed out. Try a more specific query or reduce the limit.", 486 }, ensure_ascii=False) 487 488 summaries = [] 489 for (session_id, match_info, conversation_text, session_meta), result in zip(tasks, results): 490 if isinstance(result, Exception): 491 logging.warning( 492 "Failed to summarize session %s: %s", 493 session_id, result, exc_info=True, 494 ) 495 result = None 496 497 # Prefer resolved parent session metadata over FTS5 match metadata. 498 # match_info carries source/model from the *child* session that contained 499 # the FTS5 hit; after _resolve_to_parent() the session_id points to the 500 # root, so session_meta has the authoritative platform/source for the 501 # session the user actually cares about (#15909). 502 entry = { 503 "session_id": session_id, 504 "when": _format_timestamp( 505 session_meta.get("started_at") or match_info.get("session_started") 506 ), 507 "source": session_meta.get("source") or match_info.get("source", "unknown"), 508 "model": session_meta.get("model") or match_info.get("model"), 509 } 510 511 if result: 512 entry["summary"] = result 513 else: 514 # Fallback: raw preview so matched sessions aren't silently 515 # dropped when the summarizer is unavailable (fixes #3409). 516 preview = (conversation_text[:500] + "\n…[truncated]") if conversation_text else "No preview available." 517 entry["summary"] = f"[Raw preview — summarization unavailable]\n{preview}" 518 519 summaries.append(entry) 520 521 return json.dumps({ 522 "success": True, 523 "query": query, 524 "results": summaries, 525 "count": len(summaries), 526 "sessions_searched": len(seen_sessions), 527 }, ensure_ascii=False) 528 529 except Exception as e: 530 logging.error("Session search failed: %s", e, exc_info=True) 531 return tool_error(f"Search failed: {str(e)}", success=False) 532 533 534 def check_session_search_requirements() -> bool: 535 """Requires SQLite state database and an auxiliary text model.""" 536 try: 537 from hermes_state import DEFAULT_DB_PATH 538 return DEFAULT_DB_PATH.parent.exists() 539 except ImportError: 540 return False 541 542 543 SESSION_SEARCH_SCHEMA = { 544 "name": "session_search", 545 "description": ( 546 "Search your long-term memory of past conversations, or browse recent sessions. This is your recall -- " 547 "every past session is searchable, and this tool summarizes what happened.\n\n" 548 "TWO MODES:\n" 549 "1. Recent sessions (no query): Call with no arguments to see what was worked on recently. " 550 "Returns titles, previews, and timestamps. Zero LLM cost, instant. " 551 "Start here when the user asks what were we working on or what did we do recently.\n" 552 "2. Keyword search (with query): Search for specific topics across all past sessions. " 553 "Returns LLM-generated summaries of matching sessions.\n\n" 554 "USE THIS PROACTIVELY when:\n" 555 "- The user says 'we did this before', 'remember when', 'last time', 'as I mentioned'\n" 556 "- The user asks about a topic you worked on before but don't have in current context\n" 557 "- The user references a project, person, or concept that seems familiar but isn't in memory\n" 558 "- You want to check if you've solved a similar problem before\n" 559 "- The user asks 'what did we do about X?' or 'how did we fix Y?'\n\n" 560 "Don't hesitate to search when it is actually cross-session -- it's fast and cheap. " 561 "Better to search and confirm than to guess or ask the user to repeat themselves.\n\n" 562 "Search syntax: keywords joined with OR for broad recall (elevenlabs OR baseten OR funding), " 563 "phrases for exact match (\"docker networking\"), boolean (python NOT java), prefix (deploy*). " 564 "IMPORTANT: Use OR between keywords for best results — FTS5 defaults to AND which misses " 565 "sessions that only mention some terms. If a broad OR query returns nothing, try individual " 566 "keyword searches in parallel. Returns summaries of the top matching sessions." 567 ), 568 "parameters": { 569 "type": "object", 570 "properties": { 571 "query": { 572 "type": "string", 573 "description": "Search query — keywords, phrases, or boolean expressions to find in past sessions. Omit this parameter entirely to browse recent sessions instead (returns titles, previews, timestamps with no LLM cost).", 574 }, 575 "role_filter": { 576 "type": "string", 577 "description": "Optional: only search messages from specific roles (comma-separated). E.g. 'user,assistant' to skip tool outputs.", 578 }, 579 "limit": { 580 "type": "integer", 581 "description": "Max sessions to summarize (default: 3, max: 5).", 582 "default": 3, 583 }, 584 }, 585 "required": [], 586 }, 587 } 588 589 590 # --- Registry --- 591 from tools.registry import registry, tool_error 592 593 registry.register( 594 name="session_search", 595 toolset="session_search", 596 schema=SESSION_SEARCH_SCHEMA, 597 handler=lambda args, **kw: session_search( 598 query=args.get("query") or "", 599 role_filter=args.get("role_filter"), 600 limit=args.get("limit", 3), 601 db=kw.get("db"), 602 current_session_id=kw.get("current_session_id")), 603 check_fn=check_session_search_requirements, 604 emoji="🔍", 605 )