/ services / pipecat-agent / fast_path.py
fast_path.py
  1  """Fast-path handler for simple deterministic queries.
  2  
  3  Intercepts common queries and responds directly without LLM invocation.
  4  Runs as a Pipecat FrameProcessor between STT and the LLM context aggregator.
  5  
  6  Handles:
  7  - Time/date queries
  8  - Basic weather (from cached data)
  9  - System status (quick summary)
 10  
 11  If no fast-path match, passes through to LLM normally.
 12  """
 13  
 14  import json
 15  import os
 16  import re
 17  import urllib.request
 18  from datetime import datetime
 19  from zoneinfo import ZoneInfo
 20  
 21  from loguru import logger
 22  
 23  from pipecat.frames.frames import (
 24      Frame,
 25      OutputTransportMessageFrame,
 26      TextFrame,
 27      TranscriptionFrame,
 28      TTSSpeakFrame,
 29  )
 30  from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
 31  
 32  FAST_PATH_ENABLED = os.getenv("FAST_PATH_ENABLED", "true").lower() == "true"
 33  LOCAL_TZ = ZoneInfo(os.getenv("TIMEZONE", "America/New_York"))
 34  WEATHER_CACHE_URL = os.getenv("WEATHER_CACHE_URL", "")  # Optional: cached weather endpoint
 35  
 36  # Time/date patterns
 37  TIME_PATTERNS = [
 38      r'\bwhat time\b', r'\bwhat\'s the time\b', r'\bcurrent time\b',
 39      r'\btell me the time\b',
 40  ]
 41  DATE_PATTERNS = [
 42      r'\bwhat(?:\'s| is) (?:the |today\'s )?date\b', r'\bwhat day\b',
 43      r'\btoday\'s date\b', r'\bwhat is today\b',
 44  ]
 45  
 46  # Compile patterns
 47  _time_re = re.compile('|'.join(TIME_PATTERNS), re.IGNORECASE)
 48  _date_re = re.compile('|'.join(DATE_PATTERNS), re.IGNORECASE)
 49  
 50  
 51  def check_fast_path(text: str) -> str | None:
 52      """Check if the text matches a fast-path pattern. Returns response or None."""
 53      text_lower = text.lower().strip()
 54  
 55      # Time
 56      if _time_re.search(text_lower):
 57          now = datetime.now(LOCAL_TZ)
 58          return f"It's {now.strftime('%I:%M %p').lstrip('0')}."
 59  
 60      # Date
 61      if _date_re.search(text_lower):
 62          now = datetime.now(LOCAL_TZ)
 63          return f"Today is {now.strftime('%A, %B %d, %Y')}."
 64  
 65      return None
 66  
 67  
 68  class FastPathProcessor(FrameProcessor):
 69      """Intercepts simple queries and responds without LLM invocation."""
 70  
 71      def __init__(self, *, tts_service=None, **kwargs):
 72          super().__init__(**kwargs)
 73          self._tts = tts_service
 74  
 75      async def process_frame(self, frame: Frame, direction: FrameDirection):
 76          await super().process_frame(frame, direction)
 77  
 78          if not FAST_PATH_ENABLED:
 79              await self.push_frame(frame, direction)
 80              return
 81  
 82          if isinstance(frame, TranscriptionFrame):
 83              text = frame.text
 84              # Strip speaker tag if present
 85              if text.startswith("["):
 86                  idx = text.find("]")
 87                  if idx > 0:
 88                      text = text[idx+1:].strip()
 89  
 90              response = check_fast_path(text)
 91              if response:
 92                  logger.info(f"Fast-path response: '{text}' → '{response}'")
 93                  # Send transcript events to web client via transport
 94                  await self.push_frame(OutputTransportMessageFrame(
 95                      message={"type": "UserTranscriptionFrame", "text": frame.text}
 96                  ), direction)
 97                  await self.push_frame(OutputTransportMessageFrame(
 98                      message={"type": "TextFrame", "text": response}
 99                  ), direction)
100                  await self.push_frame(OutputTransportMessageFrame(
101                      message={"type": "BotStoppedSpeakingFrame"}
102                  ), direction)
103                  # TTSSpeakFrame bypasses TTS text aggregator which
104                  # would stall waiting for lookahead on sentence-ending punctuation
105                  await self.push_frame(TTSSpeakFrame(text=response), direction)
106                  return
107  
108          # No fast-path match — pass through to LLM
109          await self.push_frame(frame, direction)