/ tools / session_search_tool.py
session_search_tool.py
  1  #!/usr/bin/env python3
  2  """
  3  Session Search Tool - Long-Term Conversation Recall
  4  
  5  Searches past session transcripts in SQLite via FTS5, then summarizes the top
  6  matching sessions using the configured auxiliary session_search model (same
  7  pattern as web_extract). By default, auxiliary "auto" routing uses the main
  8  chat provider/model unless the user overrides auxiliary.session_search.
  9  Returns focused summaries of past conversations rather than raw transcripts,
 10  keeping the main model's context window clean.
 11  
 12  Flow:
 13    1. FTS5 search finds matching messages ranked by relevance
 14    2. Groups by session, takes the top N unique sessions (default 3)
 15    3. Loads each session's conversation, truncates to ~100k chars centered on matches
 16    4. Sends to the configured auxiliary model with a focused summarization prompt
 17    5. Returns per-session summaries with metadata
 18  """
 19  
 20  import asyncio
 21  import concurrent.futures
 22  import json
 23  import logging
 24  import re
 25  from typing import Dict, Any, List, Optional, Union
 26  
 27  from agent.auxiliary_client import async_call_llm, extract_content_or_reasoning
 28  MAX_SESSION_CHARS = 100_000
 29  MAX_SUMMARY_TOKENS = 10000
 30  
 31  
 32  def _get_session_search_max_concurrency(default: int = 3) -> int:
 33      """Read auxiliary.session_search.max_concurrency with sane bounds."""
 34      try:
 35          from hermes_cli.config import load_config
 36          config = load_config()
 37      except ImportError:
 38          return default
 39      aux = config.get("auxiliary", {}) if isinstance(config, dict) else {}
 40      task_config = aux.get("session_search", {}) if isinstance(aux, dict) else {}
 41      if not isinstance(task_config, dict):
 42          return default
 43      raw = task_config.get("max_concurrency")
 44      if raw is None:
 45          return default
 46      try:
 47          value = int(raw)
 48      except (TypeError, ValueError):
 49          return default
 50      return max(1, min(value, 5))
 51  
 52  
 53  def _format_timestamp(ts: Union[int, float, str, None]) -> str:
 54      """Convert a Unix timestamp (float/int) or ISO string to a human-readable date.
 55  
 56      Returns "unknown" for None, str(ts) if conversion fails.
 57      """
 58      if ts is None:
 59          return "unknown"
 60      try:
 61          if isinstance(ts, (int, float)):
 62              from datetime import datetime
 63              dt = datetime.fromtimestamp(ts)
 64              return dt.strftime("%B %d, %Y at %I:%M %p")
 65          if isinstance(ts, str):
 66              if ts.replace(".", "").replace("-", "").isdigit():
 67                  from datetime import datetime
 68                  dt = datetime.fromtimestamp(float(ts))
 69                  return dt.strftime("%B %d, %Y at %I:%M %p")
 70              return ts
 71      except (ValueError, OSError, OverflowError) as e:
 72          # Log specific errors for debugging while gracefully handling edge cases
 73          logging.debug("Failed to format timestamp %s: %s", ts, e, exc_info=True)
 74      except Exception as e:
 75          logging.debug("Unexpected error formatting timestamp %s: %s", ts, e, exc_info=True)
 76      return str(ts)
 77  
 78  
 79  def _format_conversation(messages: List[Dict[str, Any]]) -> str:
 80      """Format session messages into a readable transcript for summarization."""
 81      parts = []
 82      for msg in messages:
 83          role = msg.get("role", "unknown").upper()
 84          content = msg.get("content") or ""
 85          tool_name = msg.get("tool_name")
 86  
 87          if role == "TOOL" and tool_name:
 88              # Truncate long tool outputs
 89              if len(content) > 500:
 90                  content = content[:250] + "\n...[truncated]...\n" + content[-250:]
 91              parts.append(f"[TOOL:{tool_name}]: {content}")
 92          elif role == "ASSISTANT":
 93              # Include tool call names if present
 94              tool_calls = msg.get("tool_calls")
 95              if tool_calls and isinstance(tool_calls, list):
 96                  tc_names = []
 97                  for tc in tool_calls:
 98                      if isinstance(tc, dict):
 99                          name = tc.get("name") or tc.get("function", {}).get("name", "?")
100                          tc_names.append(name)
101                  if tc_names:
102                      parts.append(f"[ASSISTANT]: [Called: {', '.join(tc_names)}]")
103                  if content:
104                      parts.append(f"[ASSISTANT]: {content}")
105              else:
106                  parts.append(f"[ASSISTANT]: {content}")
107          else:
108              parts.append(f"[{role}]: {content}")
109  
110      return "\n\n".join(parts)
111  
112  
113  def _truncate_around_matches(
114      full_text: str, query: str, max_chars: int = MAX_SESSION_CHARS
115  ) -> str:
116      """
117      Truncate a conversation transcript to *max_chars*, choosing a window
118      that maximises coverage of positions where the *query* actually appears.
119  
120      Strategy (in priority order):
121      1. Try to find the full query as a phrase (case-insensitive).
122      2. If no phrase hit, look for positions where all query terms appear
123         within a 200-char proximity window (co-occurrence).
124      3. Fall back to individual term positions.
125  
126      Once candidate positions are collected the function picks the window
127      start that covers the most of them.
128      """
129      if len(full_text) <= max_chars:
130          return full_text
131  
132      text_lower = full_text.lower()
133      query_lower = query.lower().strip()
134      match_positions: list[int] = []
135  
136      # --- 1. Full-phrase search ------------------------------------------------
137      phrase_pat = re.compile(re.escape(query_lower))
138      match_positions = [m.start() for m in phrase_pat.finditer(text_lower)]
139  
140      # --- 2. Proximity co-occurrence of all terms (within 200 chars) -----------
141      if not match_positions:
142          terms = query_lower.split()
143          if len(terms) > 1:
144              # Collect every occurrence of each term
145              term_positions: dict[str, list[int]] = {}
146              for t in terms:
147                  term_positions[t] = [
148                      m.start() for m in re.finditer(re.escape(t), text_lower)
149                  ]
150              # Slide through positions of the rarest term and check proximity
151              rarest = min(terms, key=lambda t: len(term_positions.get(t, [])))
152              for pos in term_positions.get(rarest, []):
153                  if all(
154                      any(abs(p - pos) < 200 for p in term_positions.get(t, []))
155                      for t in terms
156                      if t != rarest
157                  ):
158                      match_positions.append(pos)
159  
160      # --- 3. Individual term positions (last resort) ---------------------------
161      if not match_positions:
162          terms = query_lower.split()
163          for t in terms:
164              for m in re.finditer(re.escape(t), text_lower):
165                  match_positions.append(m.start())
166  
167      if not match_positions:
168          # Nothing at all — take from the start
169          truncated = full_text[:max_chars]
170          suffix = "\n\n...[later conversation truncated]..." if max_chars < len(full_text) else ""
171          return truncated + suffix
172  
173      # --- Pick window that covers the most match positions ---------------------
174      match_positions.sort()
175  
176      best_start = 0
177      best_count = 0
178      for candidate in match_positions:
179          ws = max(0, candidate - max_chars // 4)  # bias: 25% before, 75% after
180          we = ws + max_chars
181          if we > len(full_text):
182              ws = max(0, len(full_text) - max_chars)
183              we = len(full_text)
184          count = sum(1 for p in match_positions if ws <= p < we)
185          if count > best_count:
186              best_count = count
187              best_start = ws
188  
189      start = best_start
190      end = min(len(full_text), start + max_chars)
191  
192      truncated = full_text[start:end]
193      prefix = "...[earlier conversation truncated]...\n\n" if start > 0 else ""
194      suffix = "\n\n...[later conversation truncated]..." if end < len(full_text) else ""
195      return prefix + truncated + suffix
196  
197  
198  async def _summarize_session(
199      conversation_text: str, query: str, session_meta: Dict[str, Any]
200  ) -> Optional[str]:
201      """Summarize a single session conversation focused on the search query."""
202      system_prompt = (
203          "You are reviewing a past conversation transcript to help recall what happened. "
204          "Summarize the conversation with a focus on the search topic. Include:\n"
205          "1. What the user asked about or wanted to accomplish\n"
206          "2. What actions were taken and what the outcomes were\n"
207          "3. Key decisions, solutions found, or conclusions reached\n"
208          "4. Any specific commands, files, URLs, or technical details that were important\n"
209          "5. Anything left unresolved or notable\n\n"
210          "Be thorough but concise. Preserve specific details (commands, paths, error messages) "
211          "that would be useful to recall. Write in past tense as a factual recap."
212      )
213  
214      source = session_meta.get("source", "unknown")
215      started = _format_timestamp(session_meta.get("started_at"))
216  
217      user_prompt = (
218          f"Search topic: {query}\n"
219          f"Session source: {source}\n"
220          f"Session date: {started}\n\n"
221          f"CONVERSATION TRANSCRIPT:\n{conversation_text}\n\n"
222          f"Summarize this conversation with focus on: {query}"
223      )
224  
225      max_retries = 3
226      for attempt in range(max_retries):
227          try:
228              response = await async_call_llm(
229                  task="session_search",
230                  messages=[
231                      {"role": "system", "content": system_prompt},
232                      {"role": "user", "content": user_prompt},
233                  ],
234                  temperature=0.1,
235                  max_tokens=MAX_SUMMARY_TOKENS,
236              )
237              content = extract_content_or_reasoning(response)
238              if content:
239                  return content
240              # Reasoning-only / empty — let the retry loop handle it
241              logging.warning("Session search LLM returned empty content (attempt %d/%d)", attempt + 1, max_retries)
242              if attempt < max_retries - 1:
243                  await asyncio.sleep(1 * (attempt + 1))
244                  continue
245              return content
246          except RuntimeError:
247              logging.warning("No auxiliary model available for session summarization")
248              return None
249          except Exception as e:
250              if attempt < max_retries - 1:
251                  await asyncio.sleep(1 * (attempt + 1))
252              else:
253                  logging.warning(
254                      "Session summarization failed after %d attempts: %s",
255                      max_retries,
256                      e,
257                      exc_info=True,
258                  )
259                  return None
260  
261  
262  # Sources that are excluded from session browsing/searching by default.
263  # Third-party integrations (Paperclip agents, etc.) tag their sessions with
264  # HERMES_SESSION_SOURCE=tool so they don't clutter the user's session history.
265  _HIDDEN_SESSION_SOURCES = ("tool",)
266  
267  
268  def _list_recent_sessions(db, limit: int, current_session_id: str = None) -> str:
269      """Return metadata for the most recent sessions (no LLM calls)."""
270      try:
271          sessions = db.list_sessions_rich(
272              limit=limit + 5,
273              exclude_sources=list(_HIDDEN_SESSION_SOURCES),
274              order_by_last_active=True,
275          )  # fetch extra to skip current
276  
277          # Resolve current session lineage to exclude it
278          current_root = None
279          if current_session_id:
280              try:
281                  sid = current_session_id
282                  visited = set()
283                  current_root = current_session_id
284                  while sid and sid not in visited:
285                      visited.add(sid)
286                      current_root = sid
287                      s = db.get_session(sid)
288                      parent = s.get("parent_session_id") if s else None
289                      sid = parent if parent else None
290              except Exception:
291                  current_root = current_session_id
292  
293          results = []
294          for s in sessions:
295              sid = s.get("id", "")
296              if current_root and (sid == current_root or sid == current_session_id):
297                  continue
298              # Skip child/delegation sessions (they have parent_session_id)
299              if s.get("parent_session_id"):
300                  continue
301              results.append({
302                  "session_id": sid,
303                  "title": s.get("title") or None,
304                  "source": s.get("source", ""),
305                  "started_at": s.get("started_at", ""),
306                  "last_active": s.get("last_active", ""),
307                  "message_count": s.get("message_count", 0),
308                  "preview": s.get("preview", ""),
309              })
310              if len(results) >= limit:
311                  break
312  
313          return json.dumps({
314              "success": True,
315              "mode": "recent",
316              "results": results,
317              "count": len(results),
318              "message": f"Showing {len(results)} most recent sessions. Use a keyword query to search specific topics.",
319          }, ensure_ascii=False)
320      except Exception as e:
321          logging.error("Error listing recent sessions: %s", e, exc_info=True)
322          return tool_error(f"Failed to list recent sessions: {e}", success=False)
323  
324  
325  def session_search(
326      query: str,
327      role_filter: str = None,
328      limit: int = 3,
329      db=None,
330      current_session_id: str = None,
331  ) -> str:
332      """
333      Search past sessions and return focused summaries of matching conversations.
334  
335      Uses FTS5 to find matches, then summarizes the top sessions with the
336      configured auxiliary session_search model.
337      The current session is excluded from results since the agent already has that context.
338      """
339      if db is None:
340          return tool_error("Session database not available.", success=False)
341  
342      # Defensive: models (especially open-source) may send non-int limit values
343      # (None when JSON null, string "int", or even a type object).  Coerce to a
344      # safe integer before any arithmetic/comparison to prevent TypeError.
345      if not isinstance(limit, int):
346          try:
347              limit = int(limit)
348          except (TypeError, ValueError):
349              limit = 3
350      limit = max(1, min(limit, 5))  # Clamp to [1, 5]
351  
352      # Recent sessions mode: when query is empty, return metadata for recent sessions.
353      # No LLM calls — just DB queries for titles, previews, timestamps.
354      if not query or not query.strip():
355          return _list_recent_sessions(db, limit, current_session_id)
356  
357      query = query.strip()
358  
359      try:
360          # Parse role filter
361          role_list = None
362          if role_filter and role_filter.strip():
363              role_list = [r.strip() for r in role_filter.split(",") if r.strip()]
364  
365          # FTS5 search -- get matches ranked by relevance
366          raw_results = db.search_messages(
367              query=query,
368              role_filter=role_list,
369              exclude_sources=list(_HIDDEN_SESSION_SOURCES),
370              limit=50,  # Get more matches to find unique sessions
371              offset=0,
372          )
373  
374          if not raw_results:
375              return json.dumps({
376                  "success": True,
377                  "query": query,
378                  "results": [],
379                  "count": 0,
380                  "message": "No matching sessions found.",
381              }, ensure_ascii=False)
382  
383          # Resolve child sessions to their parent — delegation stores detailed
384          # content in child sessions, but the user's conversation is the parent.
385          def _resolve_to_parent(session_id: str) -> str:
386              """Walk delegation chain to find the root parent session ID."""
387              visited = set()
388              sid = session_id
389              while sid and sid not in visited:
390                  visited.add(sid)
391                  try:
392                      session = db.get_session(sid)
393                      if not session:
394                          break
395                      parent = session.get("parent_session_id")
396                      if parent:
397                          sid = parent
398                      else:
399                          break
400                  except Exception as e:
401                      logging.debug(
402                          "Error resolving parent for session %s: %s",
403                          sid,
404                          e,
405                          exc_info=True,
406                      )
407                      break
408              return sid
409  
410          current_lineage_root = (
411              _resolve_to_parent(current_session_id) if current_session_id else None
412          )
413  
414          # Group by resolved (parent) session_id, dedup, skip the current
415          # session lineage. Compression and delegation create child sessions
416          # that still belong to the same active conversation.
417          seen_sessions = {}
418          for result in raw_results:
419              raw_sid = result["session_id"]
420              resolved_sid = _resolve_to_parent(raw_sid)
421              # Skip the current session lineage — the agent already has that
422              # context, even if older turns live in parent fragments.
423              if current_lineage_root and resolved_sid == current_lineage_root:
424                  continue
425              if current_session_id and raw_sid == current_session_id:
426                  continue
427              if resolved_sid not in seen_sessions:
428                  result = dict(result)
429                  result["session_id"] = resolved_sid
430                  seen_sessions[resolved_sid] = result
431              if len(seen_sessions) >= limit:
432                  break
433  
434          # Prepare all sessions for parallel summarization
435          tasks = []
436          for session_id, match_info in seen_sessions.items():
437              try:
438                  messages = db.get_messages_as_conversation(session_id)
439                  if not messages:
440                      continue
441                  session_meta = db.get_session(session_id) or {}
442                  conversation_text = _format_conversation(messages)
443                  conversation_text = _truncate_around_matches(conversation_text, query)
444                  tasks.append((session_id, match_info, conversation_text, session_meta))
445              except Exception as e:
446                  logging.warning(
447                      "Failed to prepare session %s: %s",
448                      session_id,
449                      e,
450                      exc_info=True,
451                  )
452  
453          # Summarize all sessions in parallel
454          async def _summarize_all() -> List[Union[str, Exception]]:
455              """Summarize all sessions with bounded concurrency."""
456              max_concurrency = min(_get_session_search_max_concurrency(), max(1, len(tasks)))
457              semaphore = asyncio.Semaphore(max_concurrency)
458  
459              async def _bounded_summary(text: str, meta: Dict[str, Any]) -> Optional[str]:
460                  async with semaphore:
461                      return await _summarize_session(text, query, meta)
462  
463              coros = [
464                  _bounded_summary(text, meta)
465                  for _, _, text, meta in tasks
466              ]
467              return await asyncio.gather(*coros, return_exceptions=True)
468  
469          try:
470              # Use _run_async() which properly manages event loops across
471              # CLI, gateway, and worker-thread contexts.  The previous
472              # pattern (asyncio.run() in a ThreadPoolExecutor) created a
473              # disposable event loop that conflicted with cached
474              # AsyncOpenAI/httpx clients bound to a different loop,
475              # causing deadlocks in gateway mode (#2681).
476              from model_tools import _run_async
477              results = _run_async(_summarize_all())
478          except concurrent.futures.TimeoutError:
479              logging.warning(
480                  "Session summarization timed out after 60 seconds",
481                  exc_info=True,
482              )
483              return json.dumps({
484                  "success": False,
485                  "error": "Session summarization timed out. Try a more specific query or reduce the limit.",
486              }, ensure_ascii=False)
487  
488          summaries = []
489          for (session_id, match_info, conversation_text, session_meta), result in zip(tasks, results):
490              if isinstance(result, Exception):
491                  logging.warning(
492                      "Failed to summarize session %s: %s",
493                      session_id, result, exc_info=True,
494                  )
495                  result = None
496  
497              # Prefer resolved parent session metadata over FTS5 match metadata.
498              # match_info carries source/model from the *child* session that contained
499              # the FTS5 hit; after _resolve_to_parent() the session_id points to the
500              # root, so session_meta has the authoritative platform/source for the
501              # session the user actually cares about (#15909).
502              entry = {
503                  "session_id": session_id,
504                  "when": _format_timestamp(
505                      session_meta.get("started_at") or match_info.get("session_started")
506                  ),
507                  "source": session_meta.get("source") or match_info.get("source", "unknown"),
508                  "model": session_meta.get("model") or match_info.get("model"),
509              }
510  
511              if result:
512                  entry["summary"] = result
513              else:
514                  # Fallback: raw preview so matched sessions aren't silently
515                  # dropped when the summarizer is unavailable (fixes #3409).
516                  preview = (conversation_text[:500] + "\n…[truncated]") if conversation_text else "No preview available."
517                  entry["summary"] = f"[Raw preview — summarization unavailable]\n{preview}"
518  
519              summaries.append(entry)
520  
521          return json.dumps({
522              "success": True,
523              "query": query,
524              "results": summaries,
525              "count": len(summaries),
526              "sessions_searched": len(seen_sessions),
527          }, ensure_ascii=False)
528  
529      except Exception as e:
530          logging.error("Session search failed: %s", e, exc_info=True)
531          return tool_error(f"Search failed: {str(e)}", success=False)
532  
533  
534  def check_session_search_requirements() -> bool:
535      """Requires SQLite state database and an auxiliary text model."""
536      try:
537          from hermes_state import DEFAULT_DB_PATH
538          return DEFAULT_DB_PATH.parent.exists()
539      except ImportError:
540          return False
541  
542  
543  SESSION_SEARCH_SCHEMA = {
544      "name": "session_search",
545      "description": (
546          "Search your long-term memory of past conversations, or browse recent sessions. This is your recall -- "
547          "every past session is searchable, and this tool summarizes what happened.\n\n"
548          "TWO MODES:\n"
549          "1. Recent sessions (no query): Call with no arguments to see what was worked on recently. "
550          "Returns titles, previews, and timestamps. Zero LLM cost, instant. "
551          "Start here when the user asks what were we working on or what did we do recently.\n"
552          "2. Keyword search (with query): Search for specific topics across all past sessions. "
553          "Returns LLM-generated summaries of matching sessions.\n\n"
554          "USE THIS PROACTIVELY when:\n"
555          "- The user says 'we did this before', 'remember when', 'last time', 'as I mentioned'\n"
556          "- The user asks about a topic you worked on before but don't have in current context\n"
557          "- The user references a project, person, or concept that seems familiar but isn't in memory\n"
558          "- You want to check if you've solved a similar problem before\n"
559          "- The user asks 'what did we do about X?' or 'how did we fix Y?'\n\n"
560          "Don't hesitate to search when it is actually cross-session -- it's fast and cheap. "
561          "Better to search and confirm than to guess or ask the user to repeat themselves.\n\n"
562          "Search syntax: keywords joined with OR for broad recall (elevenlabs OR baseten OR funding), "
563          "phrases for exact match (\"docker networking\"), boolean (python NOT java), prefix (deploy*). "
564          "IMPORTANT: Use OR between keywords for best results — FTS5 defaults to AND which misses "
565          "sessions that only mention some terms. If a broad OR query returns nothing, try individual "
566          "keyword searches in parallel. Returns summaries of the top matching sessions."
567      ),
568      "parameters": {
569          "type": "object",
570          "properties": {
571              "query": {
572                  "type": "string",
573                  "description": "Search query — keywords, phrases, or boolean expressions to find in past sessions. Omit this parameter entirely to browse recent sessions instead (returns titles, previews, timestamps with no LLM cost).",
574              },
575              "role_filter": {
576                  "type": "string",
577                  "description": "Optional: only search messages from specific roles (comma-separated). E.g. 'user,assistant' to skip tool outputs.",
578              },
579              "limit": {
580                  "type": "integer",
581                  "description": "Max sessions to summarize (default: 3, max: 5).",
582                  "default": 3,
583              },
584          },
585          "required": [],
586      },
587  }
588  
589  
590  # --- Registry ---
591  from tools.registry import registry, tool_error
592  
593  registry.register(
594      name="session_search",
595      toolset="session_search",
596      schema=SESSION_SEARCH_SCHEMA,
597      handler=lambda args, **kw: session_search(
598          query=args.get("query") or "",
599          role_filter=args.get("role_filter"),
600          limit=args.get("limit", 3),
601          db=kw.get("db"),
602          current_session_id=kw.get("current_session_id")),
603      check_fn=check_session_search_requirements,
604      emoji="🔍",
605  )