/ src / solace_agent_mesh / agent / adk / session_compaction.py
session_compaction.py
 1  """
 2  Session compaction state management for coordinating parallel summarization tasks.
 3  """
 4  
 5  import asyncio
 6  from cachetools import TTLCache
 7  
 8  
 9  class SessionCompactionState:
10      """
11      Manages per-session coordination state for parallel compaction tasks.
12  
13      Ensures:
14      - Only one task per session compacts simultaneously (via locks)
15      - Deferred notifications are stored until after successful response
16      - Each agent has isolated state (agent-scoped, not global)
17      """
18  
19      def __init__(self):
20          # Per-session locks to prevent parallel tasks from duplicating summarization
21          # When multiple tasks hit context limit simultaneously, only one compacts per session
22          self.locks: TTLCache = TTLCache(maxsize=10000, ttl=3600)
23          self.locks_mutex = asyncio.Lock()
24  
25          # Per-session summaries for deferred notification (after successful response)
26          # When compaction occurs during retries, we store the summary here instead of sending immediately
27          # This ensures users see the actual response first, then a clean notification about summarization
28          # TTLCache prevents memory leak if pop() doesn't run for some reason (maxsize=10000, ttl=3600)
29          self._deferred_summaries: TTLCache = TTLCache(maxsize=10000, ttl=3600)
30  
31      async def get_lock(self, session_id: str) -> asyncio.Lock:
32          """
33          Get or create an asyncio.Lock for the given session_id.
34  
35          Ensures only one task per session can perform compaction at a time.
36          When multiple parallel tasks hit context limits, they coordinate via this lock.
37  
38          Args:
39              session_id: The ADK session ID
40  
41          Returns:
42              asyncio.Lock instance for this session
43          """
44          async with self.locks_mutex:
45              if session_id not in self.locks:
46                  self.locks[session_id] = asyncio.Lock()
47              else:
48                  # Re-insert to reset TTL on access (idle timeout behavior)
49                  lock = self.locks.pop(session_id)
50                  self.locks[session_id] = lock
51              return self.locks[session_id]
52  
53      def store_summary(self, session_id: str, summary: str) -> None:
54          """
55          Store a deferred notification summary for a session.
56  
57          The summary will be sent to the user after the task completes successfully.
58          If a summary already exists, it will be overwritten (keeping only the latest).
59  
60          Args:
61              session_id: The session ID
62              summary: The summary text to store
63          """
64          self._deferred_summaries[session_id] = summary
65  
66      def get_summary(self, session_id: str) -> str | None:
67          """
68          Peek at a deferred summary without removing it.
69  
70          Used by subtasks that want to check if compaction happened without consuming
71          the summary (leaving it for the root task to notify).
72  
73          Args:
74              session_id: The session ID
75  
76          Returns:
77              The summary text if it exists, None otherwise
78          """
79          return self._deferred_summaries.get(session_id)
80  
81      def pop_summary(self, session_id: str) -> str | None:
82          """
83          Retrieve and remove a deferred summary.
84  
85          Used by root tasks after successful response to consume and send the notification.
86  
87          Args:
88              session_id: The session ID
89  
90          Returns:
91              The summary text if it exists, None otherwise
92          """
93          return self._deferred_summaries.pop(session_id, None)