/ services / pipecat-agent / context_compactor.py
context_compactor.py
  1  """Context compaction for multi-turn voice conversations.
  2  
  3  Monitors the LLM context size and compacts older turns when approaching
  4  the token limit. Preserves recent turns verbatim while summarizing older
  5  history into a compact form.
  6  
  7  Integrates as a hook on the LLM context aggregator, not as a pipeline processor.
  8  """
  9  
 10  import json
 11  import os
 12  import urllib.request
 13  from typing import Optional
 14  
 15  from loguru import logger
 16  
 17  LLM_URL = os.getenv("LLM_BASE_URL", "http://127.0.0.1:8000/v1")
 18  LLM_MODEL = os.getenv("LLM_MODEL", "Qwen/Qwen3-32B-AWQ")
 19  MAX_CONTEXT_TOKENS = int(os.getenv("MAX_CONTEXT_TOKENS", "12000"))  # Trigger compaction at 12K of 16K
 20  KEEP_RECENT_TURNS = int(os.getenv("KEEP_RECENT_TURNS", "6"))  # Keep last 6 messages verbatim
 21  
 22  
 23  def estimate_tokens(text: str) -> int:
 24      """Rough token estimate: ~4 chars per token for English."""
 25      return len(text) // 4
 26  
 27  
 28  def estimate_context_tokens(messages: list) -> int:
 29      """Estimate total tokens in conversation context."""
 30      total = 0
 31      for msg in messages:
 32          total += estimate_tokens(msg.get("content", ""))
 33      return total
 34  
 35  
 36  def compact_context(messages: list) -> list:
 37      """Compact conversation context by summarizing older turns.
 38  
 39      Returns a new message list with:
 40      - System prompt (unchanged)
 41      - Summary of older turns (1 message)
 42      - Recent turns (verbatim)
 43      """
 44      if len(messages) <= KEEP_RECENT_TURNS + 1:
 45          return messages  # Not enough to compact
 46  
 47      total_tokens = estimate_context_tokens(messages)
 48      if total_tokens < MAX_CONTEXT_TOKENS:
 49          return messages  # Under limit, no compaction needed
 50  
 51      logger.info(f"Context compaction triggered: ~{total_tokens} tokens, {len(messages)} messages")
 52  
 53      # Split: system prompt + old messages + recent messages
 54      system_msg = messages[0] if messages[0].get("role") == "system" else None
 55      non_system = messages[1:] if system_msg else messages
 56      old_messages = non_system[:-KEEP_RECENT_TURNS]
 57      recent_messages = non_system[-KEEP_RECENT_TURNS:]
 58  
 59      if not old_messages:
 60          return messages
 61  
 62      # Summarize old messages
 63      old_text = ""
 64      for msg in old_messages:
 65          role = "Bob" if msg.get("role") == "assistant" else "User"
 66          content = msg.get("content", "")[:500]
 67          old_text += f"{role}: {content}\n"
 68  
 69      summary = _summarize(old_text)
 70  
 71      # Reconstruct
 72      compacted = []
 73      if system_msg:
 74          compacted.append(system_msg)
 75      if summary:
 76          compacted.append({
 77              "role": "system",
 78              "content": f"[Earlier conversation summary: {summary}]",
 79          })
 80      compacted.extend(recent_messages)
 81  
 82      new_tokens = estimate_context_tokens(compacted)
 83      logger.info(f"Compacted: {len(messages)} → {len(compacted)} messages, ~{total_tokens} → ~{new_tokens} tokens")
 84      return compacted
 85  
 86  
 87  def _summarize(text: str) -> str:
 88      """Generate a brief summary of conversation history."""
 89      try:
 90          payload = {
 91              "model": LLM_MODEL,
 92              "messages": [
 93                  {"role": "system", "content": "Summarize this conversation history in 2-3 sentences. Include key facts, decisions, and topics discussed. Be concise."},
 94                  {"role": "user", "content": text[:3000]},
 95              ],
 96              "max_tokens": 150,
 97              "temperature": 0.2,
 98          }
 99          data = json.dumps(payload).encode()
100          req = urllib.request.Request(
101              f"{LLM_URL}/chat/completions",
102              data=data,
103              headers={"Content-Type": "application/json"},
104          )
105          with urllib.request.urlopen(req, timeout=10) as resp:
106              result = json.loads(resp.read().decode())
107          summary = result["choices"][0]["message"]["content"].strip()
108          import re
109          summary = re.sub(r'<think>.*?</think>', '', summary, flags=re.DOTALL).strip()
110          return summary
111      except Exception as e:
112          logger.warning(f"Context summarization failed: {e}")
113          # Fallback: just take first sentence of each message
114          return text[:300] + "..."
115  
116  
117  def maybe_compact_context(context) -> bool:
118      """Check and compact the LLM context if needed. Returns True if compacted."""
119      try:
120          messages = list(context.messages)
121          tokens = estimate_context_tokens(messages)
122  
123          if tokens >= MAX_CONTEXT_TOKENS:
124              compacted = compact_context(messages)
125              if len(compacted) < len(messages):
126                  context.messages.clear()
127                  context.messages.extend(compacted)
128                  return True
129      except Exception as e:
130          logger.debug(f"Context compaction check failed: {e}")
131      return False