/ api / auth_router.py
auth_router.py
  1  """
  2  Router for authentication and user management
  3  -------------------------------------------------------------
  4  This module implements routes for authentication, registration
  5  and API user management.
  6  """
  7  
  8  import os
  9  import time
 10  import logging
 11  from typing import Optional, Dict, Any, List
 12  from datetime import datetime, timedelta
 13  
 14  from fastapi import APIRouter, Depends, HTTPException, Header, Request, status, Body
 15  from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
 16  from jose import JWTError, jwt
 17  from passlib.context import CryptContext
 18  from pydantic import BaseModel, EmailStr, validator
 19  
 20  # Import authentication models and utilities
 21  from auth_models import User, UserCreate, UserInDB, Token, TokenData
 22  from auth import (
 23      authenticate_user, 
 24      create_access_token, 
 25      get_current_user, 
 26      get_current_active_user,
 27      get_password_hash
 28  )
 29  from database import get_db_connection
 30  
 31  # Standardized API response models
 32  from .response_models import SuccessResponse, ErrorResponse
 33  
 34  # Logging configuration
 35  logger = logging.getLogger("api.auth")
 36  
 37  # Create router
 38  auth_router = APIRouter(
 39      prefix="/auth",
 40      tags=["Authentication"],
 41      responses={
 42          401: {"model": ErrorResponse, "description": "Unauthorized"},
 43          403: {"model": ErrorResponse, "description": "Access forbidden"},
 44          404: {"model": ErrorResponse, "description": "User not found"},
 45          500: {"model": ErrorResponse, "description": "Server error"}
 46      }
 47  )
 48  
 49  # Constants
 50  ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", "30"))
 51  REFRESH_TOKEN_EXPIRE_DAYS = int(os.getenv("REFRESH_TOKEN_EXPIRE_DAYS", "7"))
 52  
 53  # OAuth2 Configuration
 54  oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/token")
 55  
 56  # Route-specific models
 57  class RefreshTokenRequest(BaseModel):
 58      refresh_token: str
 59  
 60  class PasswordChangeRequest(BaseModel):
 61      current_password: str
 62      new_password: str
 63      
 64      @validator('new_password')
 65      def password_strength(cls, v):
 66          """Checks password strength"""
 67          if len(v) < 8:
 68              raise ValueError('Password must contain at least 8 characters')
 69          if not any(char.isdigit() for char in v):
 70              raise ValueError('Password must contain at least one digit')
 71          if not any(char.isupper() for char in v):
 72              raise ValueError('Password must contain at least one uppercase letter')
 73          return v
 74  
 75  class PasswordResetRequest(BaseModel):
 76      email: EmailStr
 77  
 78  class UserResponse(BaseModel):
 79      """Model for user response data"""
 80      username: str
 81      email: EmailStr
 82      is_active: bool
 83      is_admin: bool
 84      created_at: datetime
 85      subscription_level: Optional[str] = None
 86      last_login: Optional[datetime] = None
 87  
 88  # Authentication routes
 89  @auth_router.post("/token", response_model=Token)
 90  async def login_for_access_token(
 91      form_data: OAuth2PasswordRequestForm = Depends(),
 92      db = Depends(get_db_connection)
 93  ):
 94      """Generates a JWT access token for authentication"""
 95      user = authenticate_user(db, form_data.username, form_data.password)
 96      if not user:
 97          logger.warning(f"Failed login attempt for user {form_data.username}")
 98          raise HTTPException(
 99              status_code=status.HTTP_401_UNAUTHORIZED,
100              detail="Incorrect username or password",
101              headers={"WWW-Authenticate": "Bearer"},
102          )
103          
104      # Create access token
105      access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
106      access_token = create_access_token(
107          data={"sub": user.username}, expires_delta=access_token_expires
108      )
109      
110      # Create refresh token
111      refresh_token_expires = timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
112      refresh_token = create_access_token(
113          data={"sub": user.username, "refresh": True}, 
114          expires_delta=refresh_token_expires
115      )
116      
117      # Update last login date
118      db.users.update_one(
119          {"username": user.username},
120          {"$set": {"last_login": datetime.utcnow()}}
121      )
122      
123      return {
124          "access_token": access_token,
125          "refresh_token": refresh_token,
126          "token_type": "bearer"
127      }
128  
129  @auth_router.post("/refresh", response_model=Token)
130  async def refresh_token(
131      request: RefreshTokenRequest,
132      db = Depends(get_db_connection)
133  ):
134      """Refreshes an access token using a refresh token"""
135      try:
136          # Verify refresh token
137          payload = jwt.decode(
138              request.refresh_token, 
139              os.getenv("SECRET_KEY"), 
140              algorithms=[os.getenv("ALGORITHM", "HS256")]
141          )
142          
143          # Verify that it's a refresh token
144          if not payload.get("refresh"):
145              raise HTTPException(
146                  status_code=status.HTTP_401_UNAUTHORIZED,
147                  detail="Invalid refresh token",
148                  headers={"WWW-Authenticate": "Bearer"},
149              )
150          
151          username = payload.get("sub")
152          if username is None:
153              raise HTTPException(
154                  status_code=status.HTTP_401_UNAUTHORIZED,
155                  detail="Invalid refresh token",
156                  headers={"WWW-Authenticate": "Bearer"},
157              )
158              
159          # Verify that the user still exists
160          user_data = db.users.find_one({"username": username})
161          if not user_data:
162              raise HTTPException(
163                  status_code=status.HTTP_401_UNAUTHORIZED,
164                  detail="User not found",
165                  headers={"WWW-Authenticate": "Bearer"},
166              )
167              
168          # Create new access token
169          access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
170          access_token = create_access_token(
171              data={"sub": username}, 
172              expires_delta=access_token_expires
173          )
174          
175          # Create new refresh token
176          refresh_token_expires = timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
177          new_refresh_token = create_access_token(
178              data={"sub": username, "refresh": True}, 
179              expires_delta=refresh_token_expires
180          )
181          
182          return {
183              "access_token": access_token,
184              "refresh_token": new_refresh_token,
185              "token_type": "bearer"
186          }
187          
188      except JWTError:
189          raise HTTPException(
190              status_code=status.HTTP_401_UNAUTHORIZED,
191              detail="Invalid refresh token",
192              headers={"WWW-Authenticate": "Bearer"},
193          )
194  
195  @auth_router.post("/register", response_model=UserResponse)
196  async def register_user(
197      user_create: UserCreate,
198      db = Depends(get_db_connection)
199  ):
200      """Creates a new user"""
201      # Check if the user already exists
202      existing_user = db.users.find_one({"username": user_create.username})
203      if existing_user:
204          raise HTTPException(
205              status_code=status.HTTP_400_BAD_REQUEST,
206              detail="Username already in use"
207          )
208          
209      # Check if the email already exists
210      existing_email = db.users.find_one({"email": user_create.email})
211      if existing_email:
212          raise HTTPException(
213              status_code=status.HTTP_400_BAD_REQUEST,
214              detail="Email already in use"
215          )
216          
217      # Hash the password
218      hashed_password = get_password_hash(user_create.password)
219      
220      # Create a new user
221      new_user = UserInDB(
222          username=user_create.username,
223          email=user_create.email,
224          hashed_password=hashed_password,
225          is_active=True,
226          is_admin=False,
227          created_at=datetime.utcnow(),
228          subscription_level="free"
229      ).dict()
230      
231      # Insert the user into the database
232      db.users.insert_one(new_user)
233      
234      # Return user information without the password
235      user_response = UserResponse(
236          username=new_user["username"],
237          email=new_user["email"],
238          is_active=new_user["is_active"],
239          is_admin=new_user["is_admin"],
240          created_at=new_user["created_at"],
241          subscription_level=new_user["subscription_level"]
242      )
243      
244      return user_response
245  
246  @auth_router.get("/me", response_model=UserResponse)
247  async def read_users_me(
248      current_user: User = Depends(get_current_active_user)
249  ):
250      """Retrieves information about the currently logged in user"""
251      return UserResponse(
252          username=current_user.username,
253          email=current_user.email,
254          is_active=current_user.is_active,
255          is_admin=current_user.is_admin,
256          created_at=current_user.created_at,
257          subscription_level=current_user.subscription_level,
258          last_login=current_user.last_login
259      )
260  
261  @auth_router.post("/change-password", response_model=SuccessResponse)
262  async def change_password(
263      request: PasswordChangeRequest,
264      current_user: User = Depends(get_current_active_user),
265      db = Depends(get_db_connection)
266  ):
267      """Changes the password of the connected user"""
268      # Verify current password
269      user_data = db.users.find_one({"username": current_user.username})
270      if not user_data:
271          raise HTTPException(
272              status_code=status.HTTP_404_NOT_FOUND,
273              detail="User not found"
274          )
275          
276      user_db = UserInDB(**user_data)
277      
278      pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
279      if not pwd_context.verify(request.current_password, user_db.hashed_password):
280          raise HTTPException(
281              status_code=status.HTTP_400_BAD_REQUEST,
282              detail="Current password is incorrect"
283          )
284          
285      # Hash the new password
286      hashed_password = get_password_hash(request.new_password)
287      
288      # Update the password
289      db.users.update_one(
290          {"username": current_user.username},
291          {"$set": {"hashed_password": hashed_password}}
292      )
293      
294      return SuccessResponse(
295          success=True,
296          message="Password changed successfully"
297      )
298  
299  @auth_router.post("/reset-password", response_model=SuccessResponse)
300  async def request_password_reset(
301      request: PasswordResetRequest,
302      db = Depends(get_db_connection)
303  ):
304      """Request password reset"""
305      # Check if email exists
306      user = db.users.find_one({"email": request.email})
307      if not user:
308          # For security reasons, don't indicate if the email exists or not
309          return SuccessResponse(
310              success=True,
311              message="If the email exists, a reset link has been sent"
312          )
313          
314      # TODO: Send an email with a reset link
315      # In a real implementation, generate a unique token and send it by email
316      
317      return SuccessResponse(
318          success=True,
319          message="If the email exists, a reset link has been sent"
320      )
321  
322  # Administrative routes (restricted access)
323  @auth_router.get("/users", response_model=List[UserResponse])
324  async def list_users(
325      current_user: User = Depends(get_current_active_user),
326      db = Depends(get_db_connection)
327  ):
328      """Lists all users (reserved for administrators)"""
329      if not current_user.is_admin:
330          raise HTTPException(
331              status_code=status.HTTP_403_FORBIDDEN,
332              detail="Access reserved for administrators"
333          )
334          
335      users = list(db.users.find())
336      user_responses = []
337      
338      for user in users:
339          user_responses.append(UserResponse(
340              username=user["username"],
341              email=user["email"],
342              is_active=user["is_active"],
343              is_admin=user["is_admin"],
344              created_at=user["created_at"],
345              subscription_level=user.get("subscription_level", "free"),
346              last_login=user.get("last_login")
347          ))
348          
349      return user_responses
350  
351  @auth_router.delete("/users/{username}", response_model=SuccessResponse)
352  async def delete_user(
353      username: str,
354      current_user: User = Depends(get_current_active_user),
355      db = Depends(get_db_connection)
356  ):
357      """Deletes a user (reserved for administrators)"""
358      if not current_user.is_admin:
359          raise HTTPException(
360              status_code=status.HTTP_403_FORBIDDEN,
361              detail="Access reserved for administrators"
362          )
363          
364      # Prevent deletion of own account
365      if username == current_user.username:
366          raise HTTPException(
367              status_code=status.HTTP_400_BAD_REQUEST,
368              detail="You cannot delete your own account"
369          )
370          
371      # Check if user exists
372      user = db.users.find_one({"username": username})
373      if not user:
374          raise HTTPException(
375              status_code=status.HTTP_404_NOT_FOUND,
376              detail="User not found"
377          )
378          
379      # Delete the user
380      db.users.delete_one({"username": username})
381      
382      return SuccessResponse(
383          success=True,
384          message=f"User {username} deleted successfully"
385      )