/ src / api / helpers.py
helpers.py
  1  """Helper functions for API response construction."""
  2  
  3  import logging
  4  import time
  5  import uuid
  6  
  7  from fastapi import HTTPException
  8  
  9  from src.components import RAGComponents, execute_query
 10  from src.pipeline import PipelineStatus, QueryContext
 11  from src.user_resolvers import UserRoleResolver
 12  
 13  from .models import ChatCompletionChoice, ChatCompletionResponse, Message, Usage
 14  
 15  
 16  def estimate_token_usage(prompt: str | None, answer: str) -> Usage:
 17      """Estimate token usage for the query and response.
 18  
 19      Parameters
 20      ----------
 21      prompt
 22          The prompt sent to the LLM
 23      answer
 24          The generated answer
 25  
 26      Returns
 27      -------
 28      Usage object with estimated token counts
 29      """
 30      prompt_tokens = len(prompt.split()) if prompt else 0
 31      completion_tokens = len(answer.split())
 32      total_tokens = prompt_tokens + completion_tokens
 33  
 34      return Usage(
 35          prompt_tokens=prompt_tokens,
 36          completion_tokens=completion_tokens,
 37          total_tokens=total_tokens,
 38      )
 39  
 40  
 41  def build_chat_response(
 42      answer: str,
 43      model: str,
 44      usage: Usage,
 45  ) -> ChatCompletionResponse:
 46      """Build an OpenAI-compatible chat completion response.
 47  
 48      Parameters
 49      ----------
 50      answer
 51          The generated answer text
 52      model
 53          The model identifier to include in the response
 54      usage
 55          Token usage statistics
 56  
 57      Returns
 58      -------
 59      ChatCompletionResponse object
 60      """
 61      response_id = f"chatcmpl-{uuid.uuid4().hex[:24]}"
 62      created_timestamp = int(time.time())
 63  
 64      choice = ChatCompletionChoice(
 65          index=0,
 66          message=Message(role="assistant", content=answer),
 67          finish_reason="stop",
 68      )
 69  
 70      return ChatCompletionResponse(
 71          id=response_id,
 72          created=created_timestamp,
 73          model=model,
 74          choices=[choice],
 75          usage=usage,
 76      )
 77  
 78  
 79  def resolve_user_role_from_headers(
 80      user_email: str | None,
 81      user_id: str | None,
 82      user_resolver: UserRoleResolver,
 83      default_role: str,
 84      logger: logging.Logger,
 85  ) -> str:
 86      """Resolve user role from OpenWebUI headers.
 87  
 88      Parameters
 89      ----------
 90      user_email
 91          User's email from X-OpenWebUI-User-Email header
 92      user_id
 93          User's ID from X-OpenWebUI-User-Id header
 94      user_resolver
 95          User role resolver instance
 96      default_role
 97          Default role to use when no headers are present
 98      logger
 99          Logger instance for logging
100  
101      Returns
102      -------
103      str
104          Resolved user role
105      """
106      if user_email or user_id:
107          user_role = user_resolver.resolve_role(
108              user_email=user_email,
109              user_id=user_id,
110          )
111          logger.info(f"Resolved role for user '{user_email}': {user_role}")
112          return user_role
113      else:
114          logger.info(f"No user headers, using default role: {default_role}")
115          return default_role
116  
117  
118  def execute_rag_query_with_error_handling(
119      components: RAGComponents,
120      query: str,
121      user_role: str,
122      role_mapping: dict[str, list[str]] | None,
123      logger: logging.Logger,
124      is_initialized: bool,
125  ) -> QueryContext:
126      """Execute RAG query with common error handling.
127  
128      This function encapsulates the common logic for executing a RAG query
129      and handling errors consistently across endpoints.
130  
131      Parameters
132      ----------
133      components
134          Initialized RAG components
135      query
136          User's query text
137      user_role
138          User's role for access control
139      role_mapping
140          Role-to-tags mapping for access control
141      logger
142          Logger instance for logging
143      is_initialized
144          Whether the server is initialized
145  
146      Returns
147      -------
148      QueryContext
149          Pipeline context containing query results
150  
151      Raises
152      ------
153      HTTPException
154          If service is not initialized, pipeline fails, or other errors occur
155      """
156      if not is_initialized:
157          raise HTTPException(
158              status_code=503,
159              detail="Service not initialized. Please ensure vector database is available.",
160          )
161  
162      try:
163          context = execute_query(
164              components,
165              query,
166              user_role=user_role,
167              role_mapping=role_mapping,
168          )
169  
170          if context.status == PipelineStatus.FAILED:
171              logger.error(f"Pipeline failed: {context.error}")
172              raise HTTPException(
173                  status_code=500,
174                  detail=f"RAG pipeline failed: {context.error}",
175              )
176  
177          return context
178  
179      except HTTPException:
180          raise
181      except Exception as e:
182          logger.error(f"Error processing query: {e}", exc_info=True)
183          raise HTTPException(
184              status_code=500,
185              detail=f"Internal server error: {e!s}",
186          ) from e