/ src / api / server.py
server.py
  1  #!/usr/bin/env python3
  2  """FastAPI server exposing OpenAI-compatible chat completions endpoint"""
  3  
  4  import logging
  5  from contextlib import asynccontextmanager
  6  from typing import Annotated, Any
  7  
  8  from fastapi import FastAPI, Header, HTTPException
  9  from fastapi.middleware.cors import CORSMiddleware
 10  
 11  from src import Config
 12  from src.api.helpers import (
 13      build_chat_response,
 14      estimate_token_usage,
 15      execute_rag_query_with_error_handling,
 16      resolve_user_role_from_headers,
 17  )
 18  from src.api.models import (
 19      ChatCompletionRequest,
 20      ChatCompletionResponse,
 21      Citation,
 22      RAGQueryRequest,
 23      RAGQueryResponse,
 24  )
 25  from src.components import RAGComponents, initialize_rag_components
 26  from src.logger import Logger
 27  from src.user_resolvers import UserRoleResolverFactory
 28  
 29  
 30  @asynccontextmanager
 31  async def lifespan(app: FastAPI):
 32      """Lifespan context manager for startup and shutdown events."""
 33      # Initialize state attributes
 34      app.state.logger = logging.getLogger(__name__)
 35      app.state.initialized = False
 36  
 37      # Startup
 38      try:
 39          config = Config.get_config()
 40          Logger.setup(config)
 41          app.state.logger.info("Initializing RAG components...")
 42          app.state.components = initialize_rag_components()
 43  
 44          # Load access control configuration
 45          app.state.default_user_role = config.access_control.default_user_role
 46          app.state.role_mapping = config.access_control.get_role_mapping()
 47  
 48          # Initialize user role resolver for dynamic role resolution
 49          # Pass default_role from access_control to avoid duplication
 50          resolver_config = dict(config.user_resolver.resolver_config or {})
 51          resolver_config["default_role"] = app.state.default_user_role
 52  
 53          app.state.user_resolver = UserRoleResolverFactory.create(
 54              config.user_resolver.resolver_name,
 55              **resolver_config,
 56          )
 57          app.state.logger.info(
 58              f"User resolver initialized: type={config.user_resolver.resolver_name.value}, "
 59              f"default_role={app.state.user_resolver.default_role}"
 60          )
 61  
 62          if app.state.role_mapping:
 63              app.state.logger.info(
 64                  f"Access control enabled: role mapping loaded with {len(app.state.role_mapping)} roles"
 65              )
 66  
 67          app.state.initialized = True
 68          app.state.logger.info("Server startup complete")
 69      except Exception:
 70          app.state.logger.exception("Failed to initialize components")
 71          app.state.initialized = False
 72  
 73      yield
 74  
 75      app.state.logger.info("Server shutting down")
 76  
 77  
 78  app = FastAPI(
 79      title="SMPTE-Copilot RAG API",
 80      description="OpenAI-compatible API for SMPTE document question answering",
 81      version="1.0.0",
 82      lifespan=lifespan,
 83  )
 84  
 85  app.add_middleware(
 86      CORSMiddleware,
 87      allow_origins=["*"],
 88      allow_credentials=True,
 89      allow_methods=["*"],
 90      allow_headers=["*"],
 91  )
 92  
 93  
 94  @app.get("/health")
 95  async def health_check():
 96      """Health check endpoint."""
 97      return {
 98          "status": "healthy" if app.state.initialized else "initializing",
 99          "initialized": app.state.initialized,
100      }
101  
102  
103  @app.get("/v1/models")
104  async def list_models() -> dict[str, Any]:
105  
106      return {
107          "object": "list",
108          "data": [
109              {
110                  "id": "smpte-copilot",
111                  "object": "model",
112                  "owned_by": "smpte",
113              }
114          ],
115      }
116  
117  
118  @app.post("/v1/chat/completions")
119  async def chat_completions(
120      request: ChatCompletionRequest,
121      x_openwebui_user_email: Annotated[str | None, Header(alias="X-OpenWebUI-User-Email")] = None,
122      x_openwebui_user_id: Annotated[str | None, Header(alias="X-OpenWebUI-User-Id")] = None,
123      x_openwebui_user_name: Annotated[str | None, Header(alias="X-OpenWebUI-User-Name")] = None,
124      x_openwebui_user_role: Annotated[str | None, Header(alias="X-OpenWebUI-User-Role")] = None,
125  ) -> ChatCompletionResponse:
126      """OpenAI-compatible chat completions endpoint.
127  
128      This endpoint processes chat messages, extracts the user query,
129      runs it through the RAG pipeline, and returns a response in
130      OpenAI-compatible format.
131  
132      When used with OpenWebUI (with ENABLE_FORWARD_USER_INFO_HEADERS=true),
133      the user's email is passed in headers and used to resolve their role
134      for access-controlled document retrieval.
135      """
136      components: RAGComponents = app.state.components
137      logger = app.state.logger
138  
139      logger.info(f"x_openwebui_user_email: {x_openwebui_user_email}")
140      logger.info(f"x_openwebui_user_id: {x_openwebui_user_id}")
141      logger.info(f"x_openwebui_user_name: {x_openwebui_user_name}")
142      logger.info(f"x_openwebui_user_role: {x_openwebui_user_role}")
143  
144      # Resolve user role from headers (OpenWebUI integration)
145      user_role = resolve_user_role_from_headers(
146          user_email=x_openwebui_user_email,
147          user_id=x_openwebui_user_id,
148          user_resolver=app.state.user_resolver,
149          default_role=app.state.default_user_role,
150          logger=logger,
151      )
152  
153      # Extract the last user message as the query
154      user_messages = [msg for msg in request.messages if msg.role == "user"]
155      if not user_messages:
156          raise HTTPException(
157              status_code=400,
158              detail="No user message found in request",
159          )
160  
161      query = user_messages[-1].content
162      logger.info(f"Processing query: {query}")
163  
164      context = execute_rag_query_with_error_handling(
165          components=components,
166          query=query,
167          user_role=user_role,
168          role_mapping=app.state.role_mapping,
169          logger=logger,
170          is_initialized=app.state.initialized,
171      )
172  
173      answer = context.llm_response or "I don't know based on the provided documents."
174      usage = estimate_token_usage(context.prompt, answer)
175      response = build_chat_response(
176          answer=answer,
177          model=request.model,
178          usage=usage,
179      )
180      logger.info("Query processed successfully")
181  
182      return response
183  
184  
185  @app.post("/v1/rag/query")
186  async def rag_query(
187      request: RAGQueryRequest,
188      x_openwebui_user_email: Annotated[str | None, Header(alias="X-OpenWebUI-User-Email")] = None,
189      x_openwebui_user_id: Annotated[str | None, Header(alias="X-OpenWebUI-User-Id")] = None,
190      x_openwebui_user_name: Annotated[str | None, Header(alias="X-OpenWebUI-User-Name")] = None,
191      x_openwebui_user_role: Annotated[str | None, Header(alias="X-OpenWebUI-User-Role")] = None,
192  ) -> RAGQueryResponse:
193      """RAG query endpoint that returns response with citations.
194  
195      This endpoint is designed to work with OpenWebUI Pipes that need
196      access to citation data for emitting citation events.
197      """
198      components: RAGComponents = app.state.components
199      logger = app.state.logger
200  
201      logger.info(f"x_openwebui_user_email: {x_openwebui_user_email}")
202      logger.info(f"x_openwebui_user_id: {x_openwebui_user_id}")
203      logger.info(f"x_openwebui_user_name: {x_openwebui_user_name}")
204      logger.info(f"x_openwebui_user_role: {x_openwebui_user_role}")
205  
206      # Resolve user role from headers (OpenWebUI integration)
207      user_role = resolve_user_role_from_headers(
208          user_email=x_openwebui_user_email,
209          user_id=x_openwebui_user_id,
210          user_resolver=app.state.user_resolver,
211          default_role=app.state.default_user_role,
212          logger=logger,
213      )
214  
215      logger.info(f"Processing RAG query: {request.query}")
216  
217      context = execute_rag_query_with_error_handling(
218          components=components,
219          query=request.query,
220          user_role=user_role,
221          role_mapping=app.state.role_mapping,
222          logger=logger,
223          is_initialized=app.state.initialized,
224      )
225  
226      response_text = context.llm_response or "I don't know based on the provided documents."
227  
228      citations = [
229          Citation(
230              id=c["id"],
231              source=c.get("source"),
232              page=c.get("page"),
233              score=c.get("score", 0.0),
234              content=c.get("content", ""),
235          )
236          for c in (context.citations or [])
237      ]
238  
239      logger.info(f"RAG query processed successfully with {len(citations)} citations")
240  
241      return RAGQueryResponse(response=response_text, citations=citations)