fast_path.py
1 """Fast-path handler for simple deterministic queries. 2 3 Intercepts common queries and responds directly without LLM invocation. 4 Runs as a Pipecat FrameProcessor between STT and the LLM context aggregator. 5 6 Handles: 7 - Time/date queries 8 - Basic weather (from cached data) 9 - System status (quick summary) 10 11 If no fast-path match, passes through to LLM normally. 12 """ 13 14 import json 15 import os 16 import re 17 import urllib.request 18 from datetime import datetime 19 from zoneinfo import ZoneInfo 20 21 from loguru import logger 22 23 from pipecat.frames.frames import ( 24 Frame, 25 OutputTransportMessageFrame, 26 TextFrame, 27 TranscriptionFrame, 28 TTSSpeakFrame, 29 ) 30 from pipecat.processors.frame_processor import FrameDirection, FrameProcessor 31 32 FAST_PATH_ENABLED = os.getenv("FAST_PATH_ENABLED", "true").lower() == "true" 33 LOCAL_TZ = ZoneInfo(os.getenv("TIMEZONE", "America/New_York")) 34 WEATHER_CACHE_URL = os.getenv("WEATHER_CACHE_URL", "") # Optional: cached weather endpoint 35 36 # Time/date patterns 37 TIME_PATTERNS = [ 38 r'\bwhat time\b', r'\bwhat\'s the time\b', r'\bcurrent time\b', 39 r'\btell me the time\b', 40 ] 41 DATE_PATTERNS = [ 42 r'\bwhat(?:\'s| is) (?:the |today\'s )?date\b', r'\bwhat day\b', 43 r'\btoday\'s date\b', r'\bwhat is today\b', 44 ] 45 46 # Compile patterns 47 _time_re = re.compile('|'.join(TIME_PATTERNS), re.IGNORECASE) 48 _date_re = re.compile('|'.join(DATE_PATTERNS), re.IGNORECASE) 49 50 51 def check_fast_path(text: str) -> str | None: 52 """Check if the text matches a fast-path pattern. Returns response or None.""" 53 text_lower = text.lower().strip() 54 55 # Time 56 if _time_re.search(text_lower): 57 now = datetime.now(LOCAL_TZ) 58 return f"It's {now.strftime('%I:%M %p').lstrip('0')}." 59 60 # Date 61 if _date_re.search(text_lower): 62 now = datetime.now(LOCAL_TZ) 63 return f"Today is {now.strftime('%A, %B %d, %Y')}." 64 65 return None 66 67 68 class FastPathProcessor(FrameProcessor): 69 """Intercepts simple queries and responds without LLM invocation.""" 70 71 def __init__(self, *, tts_service=None, **kwargs): 72 super().__init__(**kwargs) 73 self._tts = tts_service 74 75 async def process_frame(self, frame: Frame, direction: FrameDirection): 76 await super().process_frame(frame, direction) 77 78 if not FAST_PATH_ENABLED: 79 await self.push_frame(frame, direction) 80 return 81 82 if isinstance(frame, TranscriptionFrame): 83 text = frame.text 84 # Strip speaker tag if present 85 if text.startswith("["): 86 idx = text.find("]") 87 if idx > 0: 88 text = text[idx+1:].strip() 89 90 response = check_fast_path(text) 91 if response: 92 logger.info(f"Fast-path response: '{text}' → '{response}'") 93 # Send transcript events to web client via transport 94 await self.push_frame(OutputTransportMessageFrame( 95 message={"type": "UserTranscriptionFrame", "text": frame.text} 96 ), direction) 97 await self.push_frame(OutputTransportMessageFrame( 98 message={"type": "TextFrame", "text": response} 99 ), direction) 100 await self.push_frame(OutputTransportMessageFrame( 101 message={"type": "BotStoppedSpeakingFrame"} 102 ), direction) 103 # TTSSpeakFrame bypasses TTS text aggregator which 104 # would stall waiting for lookahead on sentence-ending punctuation 105 await self.push_frame(TTSSpeakFrame(text=response), direction) 106 return 107 108 # No fast-path match — pass through to LLM 109 await self.push_frame(frame, direction)