/ bot / processors / session_timer.py
session_timer.py
 1  #
 2  # Session Timer Processor
 3  #
 4  # Enforces a maximum session duration for calls.
 5  # Ends the call immediately when the timeout is reached.
 6  #
 7  
 8  import asyncio
 9  import os
10  from typing import Optional
11  
12  from loguru import logger
13  from pipecat.frames.frames import (
14      CancelFrame,
15      EndFrame,
16      EndTaskFrame,
17      Frame,
18      StartFrame,
19  )
20  from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
21  
22  from end_of_call_reporter import EndedReason
23  
24  
25  class SessionTimerProcessor(FrameProcessor):
26      """Enforces a maximum session duration.
27  
28      When the timer expires, it immediately ends the call.
29      With Gemini Live native audio, we can't reliably inject goodbye messages,
30      so we just terminate cleanly.
31  
32      Environment Variables:
33          MAX_CALL_DURATION_SECS: Maximum call duration (default: 840 = 14 minutes)
34      """
35  
36      def __init__(
37          self,
38          max_duration_secs: Optional[float] = None,
39          **kwargs,
40      ):
41          """Initialize the session timer.
42  
43          Args:
44              max_duration_secs: Maximum session duration in seconds.
45                                Defaults to MAX_CALL_DURATION_SECS env var or 840.
46          """
47          super().__init__(**kwargs)
48  
49          default_max = int(os.getenv("MAX_CALL_DURATION_SECS", "840"))
50          self._max_duration = max_duration_secs if max_duration_secs is not None else default_max
51  
52          self._timer_task: Optional[asyncio.Task] = None
53          self._session_started = False
54  
55          logger.info(f"SessionTimerProcessor: max_duration={self._max_duration}s")
56  
57      async def process_frame(self, frame: Frame, direction: FrameDirection):
58          await super().process_frame(frame, direction)
59  
60          # Start timer when StartFrame is received
61          if isinstance(frame, StartFrame) and not self._session_started:
62              self._session_started = True
63              self._start_timer()
64  
65          # Stop timer on end/cancel frames
66          if isinstance(frame, (EndFrame, CancelFrame)):
67              await self._stop_timer()
68  
69          await self.push_frame(frame, direction)
70  
71      def _start_timer(self):
72          """Start the session timer."""
73          if self._timer_task is None:
74              logger.info(f"Session timer started: {self._max_duration}s max duration")
75              self._timer_task = self.create_task(self._timer_handler())
76  
77      async def _stop_timer(self):
78          """Stop the timer."""
79          if self._timer_task:
80              await self.cancel_task(self._timer_task)
81              self._timer_task = None
82  
83      async def _timer_handler(self):
84          """Handle session timeout."""
85          await asyncio.sleep(self._max_duration)
86          logger.warning(f"Session max duration ({self._max_duration}s) reached, ending call")
87          await self.push_frame(
88              EndTaskFrame(reason=EndedReason.EXCEEDED_MAX_DURATION),
89              FrameDirection.UPSTREAM,
90          )
91  
92      async def cleanup(self):
93          await super().cleanup()
94          await self._stop_timer()