session_compaction.py
1 """ 2 Session compaction state management for coordinating parallel summarization tasks. 3 """ 4 5 import asyncio 6 from cachetools import TTLCache 7 8 9 class SessionCompactionState: 10 """ 11 Manages per-session coordination state for parallel compaction tasks. 12 13 Ensures: 14 - Only one task per session compacts simultaneously (via locks) 15 - Deferred notifications are stored until after successful response 16 - Each agent has isolated state (agent-scoped, not global) 17 """ 18 19 def __init__(self): 20 # Per-session locks to prevent parallel tasks from duplicating summarization 21 # When multiple tasks hit context limit simultaneously, only one compacts per session 22 self.locks: TTLCache = TTLCache(maxsize=10000, ttl=3600) 23 self.locks_mutex = asyncio.Lock() 24 25 # Per-session summaries for deferred notification (after successful response) 26 # When compaction occurs during retries, we store the summary here instead of sending immediately 27 # This ensures users see the actual response first, then a clean notification about summarization 28 # TTLCache prevents memory leak if pop() doesn't run for some reason (maxsize=10000, ttl=3600) 29 self._deferred_summaries: TTLCache = TTLCache(maxsize=10000, ttl=3600) 30 31 async def get_lock(self, session_id: str) -> asyncio.Lock: 32 """ 33 Get or create an asyncio.Lock for the given session_id. 34 35 Ensures only one task per session can perform compaction at a time. 36 When multiple parallel tasks hit context limits, they coordinate via this lock. 37 38 Args: 39 session_id: The ADK session ID 40 41 Returns: 42 asyncio.Lock instance for this session 43 """ 44 async with self.locks_mutex: 45 if session_id not in self.locks: 46 self.locks[session_id] = asyncio.Lock() 47 else: 48 # Re-insert to reset TTL on access (idle timeout behavior) 49 lock = self.locks.pop(session_id) 50 self.locks[session_id] = lock 51 return self.locks[session_id] 52 53 def store_summary(self, session_id: str, summary: str) -> None: 54 """ 55 Store a deferred notification summary for a session. 56 57 The summary will be sent to the user after the task completes successfully. 58 If a summary already exists, it will be overwritten (keeping only the latest). 59 60 Args: 61 session_id: The session ID 62 summary: The summary text to store 63 """ 64 self._deferred_summaries[session_id] = summary 65 66 def get_summary(self, session_id: str) -> str | None: 67 """ 68 Peek at a deferred summary without removing it. 69 70 Used by subtasks that want to check if compaction happened without consuming 71 the summary (leaving it for the root task to notify). 72 73 Args: 74 session_id: The session ID 75 76 Returns: 77 The summary text if it exists, None otherwise 78 """ 79 return self._deferred_summaries.get(session_id) 80 81 def pop_summary(self, session_id: str) -> str | None: 82 """ 83 Retrieve and remove a deferred summary. 84 85 Used by root tasks after successful response to consume and send the notification. 86 87 Args: 88 session_id: The session ID 89 90 Returns: 91 The summary text if it exists, None otherwise 92 """ 93 return self._deferred_summaries.pop(session_id, None)