smpte_pipe.py
1 """ 2 title: SMPTE Copilot RAG 3 description: Pipe for SMPTE-Copilot with clickable citations support 4 author: SMPTE 5 version: 1.0.0 6 license: MIT 7 """ 8 9 from collections.abc import Awaitable, Callable 10 11 import aiohttp 12 13 14 class Pipe: 15 """SMPTE Copilot Pipe for OpenWebUI with citation support. 16 17 This Pipe connects to the SMPTE-Copilot RAG backend and emits 18 citation events for each retrieved chunk, enabling clickable 19 references in the OpenWebUI chat interface. 20 """ 21 22 def __init__(self): 23 self.type = "pipe" 24 self.id = "smpte_copilot" 25 self.name = "SMPTE Copilot" 26 # Disable automatic citation to use our custom citation events 27 self.citation = False 28 29 class Valves: 30 """Configuration options for the Pipe.""" 31 32 def __init__(self): 33 self.SMPTE_API_BASE_URL = "http://api:8000" 34 self.REQUEST_TIMEOUT = 120 35 36 def pipes(self) -> list[dict]: 37 """Return the list of available models/pipes.""" 38 return [{"id": "smpte_copilot_rag", "name": " - RAG"}] 39 40 async def pipe( 41 self, 42 body: dict, 43 __user__: dict | None = None, 44 __event_emitter__: Callable[[dict], Awaitable[None]] | None = None, 45 ) -> str: 46 """Process a chat request through the SMPTE RAG backend.""" 47 48 # OpenWebUI calls pipe twice: once with stream=True (main), once with stream=False (completed) 49 # Only process on the first call (stream=True), skip the second to avoid duplicate work 50 if not body.get("stream", True): 51 # Second call (stream=False) - return last assistant message if exists 52 messages = body.get("messages", []) 53 for msg in reversed(messages): 54 if msg.get("role") == "assistant": 55 return msg.get("content", "") 56 return "" 57 58 valves = self.Valves() 59 60 messages = body.get("messages", []) 61 if not messages: 62 return "No messages provided." 63 64 user_messages = [m for m in messages if m.get("role") == "user"] 65 if not user_messages: 66 return "No user message found." 67 68 query = user_messages[-1].get("content", "") 69 if not query: 70 return "Empty query." 71 72 headers = {"Content-Type": "application/json"} 73 74 if __event_emitter__: 75 await __event_emitter__({ 76 "type": "status", 77 "data": {"description": "Searching documents...", "done": False} 78 }) 79 80 if __user__: 81 if __user__.get("email"): 82 headers["X-OpenWebUI-User-Email"] = __user__["email"] 83 if __user__.get("id"): 84 headers["X-OpenWebUI-User-Id"] = __user__["id"] 85 if __user__.get("name"): 86 headers["X-OpenWebUI-User-Name"] = __user__["name"] 87 if __user__.get("role"): 88 headers["X-OpenWebUI-User-Role"] = __user__["role"] 89 90 try: 91 async with aiohttp.ClientSession() as session, session.post( 92 f"{valves.SMPTE_API_BASE_URL}/v1/rag/query", 93 json={"query": query}, 94 headers=headers, 95 timeout=aiohttp.ClientTimeout(total=valves.REQUEST_TIMEOUT), 96 ) as resp: 97 if resp.status != 200: 98 error_text = await resp.text() 99 return f"Error from SMPTE API: {resp.status} - {error_text}" 100 101 data = await resp.json() 102 103 except aiohttp.ClientError as e: 104 return f"Failed to connect to SMPTE API: {e}" 105 except Exception as e: 106 return f"Unexpected error: {e}" 107 108 response_text = data.get("response", "No response received.") 109 citations = data.get("citations", []) 110 111 if __event_emitter__ and citations: 112 for citation in citations: 113 source_name = citation.get("source") or "Unknown source" 114 page = citation.get("page") 115 content = citation.get("content", "") 116 117 source_display = source_name 118 if page is not None: 119 source_display = f"{source_name} (page {page})" 120 121 await __event_emitter__( 122 { 123 "type": "citation", 124 "data": { 125 "document": [content], 126 "metadata": [ 127 { 128 "source": source_name, 129 "page": page, 130 "score": citation.get("score"), 131 } 132 ], 133 "source": {"name": source_display}, 134 }, 135 } 136 ) 137 138 if __event_emitter__: 139 await __event_emitter__({ 140 "type": "status", 141 "data": {"description": "Completed!", "done": True} 142 }) 143 144 return response_text