context_compactor.py
1 """Context compaction for multi-turn voice conversations. 2 3 Monitors the LLM context size and compacts older turns when approaching 4 the token limit. Preserves recent turns verbatim while summarizing older 5 history into a compact form. 6 7 Integrates as a hook on the LLM context aggregator, not as a pipeline processor. 8 """ 9 10 import json 11 import os 12 import urllib.request 13 from typing import Optional 14 15 from loguru import logger 16 17 LLM_URL = os.getenv("LLM_BASE_URL", "http://127.0.0.1:8000/v1") 18 LLM_MODEL = os.getenv("LLM_MODEL", "Qwen/Qwen3-32B-AWQ") 19 MAX_CONTEXT_TOKENS = int(os.getenv("MAX_CONTEXT_TOKENS", "12000")) # Trigger compaction at 12K of 16K 20 KEEP_RECENT_TURNS = int(os.getenv("KEEP_RECENT_TURNS", "6")) # Keep last 6 messages verbatim 21 22 23 def estimate_tokens(text: str) -> int: 24 """Rough token estimate: ~4 chars per token for English.""" 25 return len(text) // 4 26 27 28 def estimate_context_tokens(messages: list) -> int: 29 """Estimate total tokens in conversation context.""" 30 total = 0 31 for msg in messages: 32 total += estimate_tokens(msg.get("content", "")) 33 return total 34 35 36 def compact_context(messages: list) -> list: 37 """Compact conversation context by summarizing older turns. 38 39 Returns a new message list with: 40 - System prompt (unchanged) 41 - Summary of older turns (1 message) 42 - Recent turns (verbatim) 43 """ 44 if len(messages) <= KEEP_RECENT_TURNS + 1: 45 return messages # Not enough to compact 46 47 total_tokens = estimate_context_tokens(messages) 48 if total_tokens < MAX_CONTEXT_TOKENS: 49 return messages # Under limit, no compaction needed 50 51 logger.info(f"Context compaction triggered: ~{total_tokens} tokens, {len(messages)} messages") 52 53 # Split: system prompt + old messages + recent messages 54 system_msg = messages[0] if messages[0].get("role") == "system" else None 55 non_system = messages[1:] if system_msg else messages 56 old_messages = non_system[:-KEEP_RECENT_TURNS] 57 recent_messages = non_system[-KEEP_RECENT_TURNS:] 58 59 if not old_messages: 60 return messages 61 62 # Summarize old messages 63 old_text = "" 64 for msg in old_messages: 65 role = "Bob" if msg.get("role") == "assistant" else "User" 66 content = msg.get("content", "")[:500] 67 old_text += f"{role}: {content}\n" 68 69 summary = _summarize(old_text) 70 71 # Reconstruct 72 compacted = [] 73 if system_msg: 74 compacted.append(system_msg) 75 if summary: 76 compacted.append({ 77 "role": "system", 78 "content": f"[Earlier conversation summary: {summary}]", 79 }) 80 compacted.extend(recent_messages) 81 82 new_tokens = estimate_context_tokens(compacted) 83 logger.info(f"Compacted: {len(messages)} → {len(compacted)} messages, ~{total_tokens} → ~{new_tokens} tokens") 84 return compacted 85 86 87 def _summarize(text: str) -> str: 88 """Generate a brief summary of conversation history.""" 89 try: 90 payload = { 91 "model": LLM_MODEL, 92 "messages": [ 93 {"role": "system", "content": "Summarize this conversation history in 2-3 sentences. Include key facts, decisions, and topics discussed. Be concise."}, 94 {"role": "user", "content": text[:3000]}, 95 ], 96 "max_tokens": 150, 97 "temperature": 0.2, 98 } 99 data = json.dumps(payload).encode() 100 req = urllib.request.Request( 101 f"{LLM_URL}/chat/completions", 102 data=data, 103 headers={"Content-Type": "application/json"}, 104 ) 105 with urllib.request.urlopen(req, timeout=10) as resp: 106 result = json.loads(resp.read().decode()) 107 summary = result["choices"][0]["message"]["content"].strip() 108 import re 109 summary = re.sub(r'<think>.*?</think>', '', summary, flags=re.DOTALL).strip() 110 return summary 111 except Exception as e: 112 logger.warning(f"Context summarization failed: {e}") 113 # Fallback: just take first sentence of each message 114 return text[:300] + "..." 115 116 117 def maybe_compact_context(context) -> bool: 118 """Check and compact the LLM context if needed. Returns True if compacted.""" 119 try: 120 messages = list(context.messages) 121 tokens = estimate_context_tokens(messages) 122 123 if tokens >= MAX_CONTEXT_TOKENS: 124 compacted = compact_context(messages) 125 if len(compacted) < len(messages): 126 context.messages.clear() 127 context.messages.extend(compacted) 128 return True 129 except Exception as e: 130 logger.debug(f"Context compaction check failed: {e}") 131 return False