/ src / openwebui / smpte_pipe.py
smpte_pipe.py
  1  """
  2  title: SMPTE Copilot RAG
  3  description: Pipe for SMPTE-Copilot with clickable citations support
  4  author: SMPTE
  5  version: 1.0.0
  6  license: MIT
  7  """
  8  
  9  from collections.abc import Awaitable, Callable
 10  
 11  import aiohttp
 12  
 13  
 14  class Pipe:
 15      """SMPTE Copilot Pipe for OpenWebUI with citation support.
 16  
 17      This Pipe connects to the SMPTE-Copilot RAG backend and emits
 18      citation events for each retrieved chunk, enabling clickable
 19      references in the OpenWebUI chat interface.
 20      """
 21  
 22      def __init__(self):
 23          self.type = "pipe"
 24          self.id = "smpte_copilot"
 25          self.name = "SMPTE Copilot"
 26          # Disable automatic citation to use our custom citation events
 27          self.citation = False
 28  
 29      class Valves:
 30          """Configuration options for the Pipe."""
 31  
 32          def __init__(self):
 33              self.SMPTE_API_BASE_URL = "http://api:8000"
 34              self.REQUEST_TIMEOUT = 120
 35  
 36      def pipes(self) -> list[dict]:
 37          """Return the list of available models/pipes."""
 38          return [{"id": "smpte_copilot_rag", "name": " - RAG"}]
 39  
 40      async def pipe(
 41          self,
 42          body: dict,
 43          __user__: dict | None = None,
 44          __event_emitter__: Callable[[dict], Awaitable[None]] | None = None,
 45      ) -> str:
 46          """Process a chat request through the SMPTE RAG backend."""
 47  
 48          # OpenWebUI calls pipe twice: once with stream=True (main), once with stream=False (completed)
 49          # Only process on the first call (stream=True), skip the second to avoid duplicate work
 50          if not body.get("stream", True):
 51              # Second call (stream=False) - return last assistant message if exists
 52              messages = body.get("messages", [])
 53              for msg in reversed(messages):
 54                  if msg.get("role") == "assistant":
 55                      return msg.get("content", "")
 56              return ""
 57  
 58          valves = self.Valves()
 59  
 60          messages = body.get("messages", [])
 61          if not messages:
 62              return "No messages provided."
 63  
 64          user_messages = [m for m in messages if m.get("role") == "user"]
 65          if not user_messages:
 66              return "No user message found."
 67  
 68          query = user_messages[-1].get("content", "")
 69          if not query:
 70              return "Empty query."
 71  
 72          headers = {"Content-Type": "application/json"}
 73  
 74          if __event_emitter__:
 75              await __event_emitter__({
 76                  "type": "status",
 77                  "data": {"description": "Searching documents...", "done": False}
 78              })
 79  
 80          if __user__:
 81              if __user__.get("email"):
 82                  headers["X-OpenWebUI-User-Email"] = __user__["email"]
 83              if __user__.get("id"):
 84                  headers["X-OpenWebUI-User-Id"] = __user__["id"]
 85              if __user__.get("name"):
 86                  headers["X-OpenWebUI-User-Name"] = __user__["name"]
 87              if __user__.get("role"):
 88                  headers["X-OpenWebUI-User-Role"] = __user__["role"]
 89  
 90          try:
 91              async with aiohttp.ClientSession() as session, session.post(
 92                  f"{valves.SMPTE_API_BASE_URL}/v1/rag/query",
 93                  json={"query": query},
 94                  headers=headers,
 95                  timeout=aiohttp.ClientTimeout(total=valves.REQUEST_TIMEOUT),
 96              ) as resp:
 97                  if resp.status != 200:
 98                      error_text = await resp.text()
 99                      return f"Error from SMPTE API: {resp.status} - {error_text}"
100  
101                  data = await resp.json()
102  
103          except aiohttp.ClientError as e:
104              return f"Failed to connect to SMPTE API: {e}"
105          except Exception as e:
106              return f"Unexpected error: {e}"
107  
108          response_text = data.get("response", "No response received.")
109          citations = data.get("citations", [])
110  
111          if __event_emitter__ and citations:
112              for citation in citations:
113                  source_name = citation.get("source") or "Unknown source"
114                  page = citation.get("page")
115                  content = citation.get("content", "")
116  
117                  source_display = source_name
118                  if page is not None:
119                      source_display = f"{source_name} (page {page})"
120  
121                  await __event_emitter__(
122                      {
123                          "type": "citation",
124                          "data": {
125                              "document": [content],
126                              "metadata": [
127                                  {
128                                      "source": source_name,
129                                      "page": page,
130                                      "score": citation.get("score"),
131                                  }
132                              ],
133                              "source": {"name": source_display},
134                          },
135                      }
136                  )
137  
138          if __event_emitter__:
139              await __event_emitter__({
140                  "type": "status",
141                  "data": {"description": "Completed!", "done": True}
142              })
143  
144          return response_text