session_timer.py
1 # 2 # Session Timer Processor 3 # 4 # Enforces a maximum session duration for calls. 5 # Ends the call immediately when the timeout is reached. 6 # 7 8 import asyncio 9 import os 10 from typing import Optional 11 12 from loguru import logger 13 from pipecat.frames.frames import ( 14 CancelFrame, 15 EndFrame, 16 EndTaskFrame, 17 Frame, 18 StartFrame, 19 ) 20 from pipecat.processors.frame_processor import FrameDirection, FrameProcessor 21 22 from end_of_call_reporter import EndedReason 23 24 25 class SessionTimerProcessor(FrameProcessor): 26 """Enforces a maximum session duration. 27 28 When the timer expires, it immediately ends the call. 29 With Gemini Live native audio, we can't reliably inject goodbye messages, 30 so we just terminate cleanly. 31 32 Environment Variables: 33 MAX_CALL_DURATION_SECS: Maximum call duration (default: 840 = 14 minutes) 34 """ 35 36 def __init__( 37 self, 38 max_duration_secs: Optional[float] = None, 39 **kwargs, 40 ): 41 """Initialize the session timer. 42 43 Args: 44 max_duration_secs: Maximum session duration in seconds. 45 Defaults to MAX_CALL_DURATION_SECS env var or 840. 46 """ 47 super().__init__(**kwargs) 48 49 default_max = int(os.getenv("MAX_CALL_DURATION_SECS", "840")) 50 self._max_duration = max_duration_secs if max_duration_secs is not None else default_max 51 52 self._timer_task: Optional[asyncio.Task] = None 53 self._session_started = False 54 55 logger.info(f"SessionTimerProcessor: max_duration={self._max_duration}s") 56 57 async def process_frame(self, frame: Frame, direction: FrameDirection): 58 await super().process_frame(frame, direction) 59 60 # Start timer when StartFrame is received 61 if isinstance(frame, StartFrame) and not self._session_started: 62 self._session_started = True 63 self._start_timer() 64 65 # Stop timer on end/cancel frames 66 if isinstance(frame, (EndFrame, CancelFrame)): 67 await self._stop_timer() 68 69 await self.push_frame(frame, direction) 70 71 def _start_timer(self): 72 """Start the session timer.""" 73 if self._timer_task is None: 74 logger.info(f"Session timer started: {self._max_duration}s max duration") 75 self._timer_task = self.create_task(self._timer_handler()) 76 77 async def _stop_timer(self): 78 """Stop the timer.""" 79 if self._timer_task: 80 await self.cancel_task(self._timer_task) 81 self._timer_task = None 82 83 async def _timer_handler(self): 84 """Handle session timeout.""" 85 await asyncio.sleep(self._max_duration) 86 logger.warning(f"Session max duration ({self._max_duration}s) reached, ending call") 87 await self.push_frame( 88 EndTaskFrame(reason=EndedReason.EXCEEDED_MAX_DURATION), 89 FrameDirection.UPSTREAM, 90 ) 91 92 async def cleanup(self): 93 await super().cleanup() 94 await self._stop_timer()