auth_router.py
1 """ 2 Router for authentication and user management 3 ------------------------------------------------------------- 4 This module implements routes for authentication, registration 5 and API user management. 6 """ 7 8 import os 9 import time 10 import logging 11 from typing import Optional, Dict, Any, List 12 from datetime import datetime, timedelta 13 14 from fastapi import APIRouter, Depends, HTTPException, Header, Request, status, Body 15 from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm 16 from jose import JWTError, jwt 17 from passlib.context import CryptContext 18 from pydantic import BaseModel, EmailStr, validator 19 20 # Import authentication models and utilities 21 from auth_models import User, UserCreate, UserInDB, Token, TokenData 22 from auth import ( 23 authenticate_user, 24 create_access_token, 25 get_current_user, 26 get_current_active_user, 27 get_password_hash 28 ) 29 from database import get_db_connection 30 31 # Standardized API response models 32 from .response_models import SuccessResponse, ErrorResponse 33 34 # Logging configuration 35 logger = logging.getLogger("api.auth") 36 37 # Create router 38 auth_router = APIRouter( 39 prefix="/auth", 40 tags=["Authentication"], 41 responses={ 42 401: {"model": ErrorResponse, "description": "Unauthorized"}, 43 403: {"model": ErrorResponse, "description": "Access forbidden"}, 44 404: {"model": ErrorResponse, "description": "User not found"}, 45 500: {"model": ErrorResponse, "description": "Server error"} 46 } 47 ) 48 49 # Constants 50 ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", "30")) 51 REFRESH_TOKEN_EXPIRE_DAYS = int(os.getenv("REFRESH_TOKEN_EXPIRE_DAYS", "7")) 52 53 # OAuth2 Configuration 54 oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/token") 55 56 # Route-specific models 57 class RefreshTokenRequest(BaseModel): 58 refresh_token: str 59 60 class PasswordChangeRequest(BaseModel): 61 current_password: str 62 new_password: str 63 64 @validator('new_password') 65 def password_strength(cls, v): 66 """Checks password strength""" 67 if len(v) < 8: 68 raise ValueError('Password must contain at least 8 characters') 69 if not any(char.isdigit() for char in v): 70 raise ValueError('Password must contain at least one digit') 71 if not any(char.isupper() for char in v): 72 raise ValueError('Password must contain at least one uppercase letter') 73 return v 74 75 class PasswordResetRequest(BaseModel): 76 email: EmailStr 77 78 class UserResponse(BaseModel): 79 """Model for user response data""" 80 username: str 81 email: EmailStr 82 is_active: bool 83 is_admin: bool 84 created_at: datetime 85 subscription_level: Optional[str] = None 86 last_login: Optional[datetime] = None 87 88 # Authentication routes 89 @auth_router.post("/token", response_model=Token) 90 async def login_for_access_token( 91 form_data: OAuth2PasswordRequestForm = Depends(), 92 db = Depends(get_db_connection) 93 ): 94 """Generates a JWT access token for authentication""" 95 user = authenticate_user(db, form_data.username, form_data.password) 96 if not user: 97 logger.warning(f"Failed login attempt for user {form_data.username}") 98 raise HTTPException( 99 status_code=status.HTTP_401_UNAUTHORIZED, 100 detail="Incorrect username or password", 101 headers={"WWW-Authenticate": "Bearer"}, 102 ) 103 104 # Create access token 105 access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) 106 access_token = create_access_token( 107 data={"sub": user.username}, expires_delta=access_token_expires 108 ) 109 110 # Create refresh token 111 refresh_token_expires = timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS) 112 refresh_token = create_access_token( 113 data={"sub": user.username, "refresh": True}, 114 expires_delta=refresh_token_expires 115 ) 116 117 # Update last login date 118 db.users.update_one( 119 {"username": user.username}, 120 {"$set": {"last_login": datetime.utcnow()}} 121 ) 122 123 return { 124 "access_token": access_token, 125 "refresh_token": refresh_token, 126 "token_type": "bearer" 127 } 128 129 @auth_router.post("/refresh", response_model=Token) 130 async def refresh_token( 131 request: RefreshTokenRequest, 132 db = Depends(get_db_connection) 133 ): 134 """Refreshes an access token using a refresh token""" 135 try: 136 # Verify refresh token 137 payload = jwt.decode( 138 request.refresh_token, 139 os.getenv("SECRET_KEY"), 140 algorithms=[os.getenv("ALGORITHM", "HS256")] 141 ) 142 143 # Verify that it's a refresh token 144 if not payload.get("refresh"): 145 raise HTTPException( 146 status_code=status.HTTP_401_UNAUTHORIZED, 147 detail="Invalid refresh token", 148 headers={"WWW-Authenticate": "Bearer"}, 149 ) 150 151 username = payload.get("sub") 152 if username is None: 153 raise HTTPException( 154 status_code=status.HTTP_401_UNAUTHORIZED, 155 detail="Invalid refresh token", 156 headers={"WWW-Authenticate": "Bearer"}, 157 ) 158 159 # Verify that the user still exists 160 user_data = db.users.find_one({"username": username}) 161 if not user_data: 162 raise HTTPException( 163 status_code=status.HTTP_401_UNAUTHORIZED, 164 detail="User not found", 165 headers={"WWW-Authenticate": "Bearer"}, 166 ) 167 168 # Create new access token 169 access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) 170 access_token = create_access_token( 171 data={"sub": username}, 172 expires_delta=access_token_expires 173 ) 174 175 # Create new refresh token 176 refresh_token_expires = timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS) 177 new_refresh_token = create_access_token( 178 data={"sub": username, "refresh": True}, 179 expires_delta=refresh_token_expires 180 ) 181 182 return { 183 "access_token": access_token, 184 "refresh_token": new_refresh_token, 185 "token_type": "bearer" 186 } 187 188 except JWTError: 189 raise HTTPException( 190 status_code=status.HTTP_401_UNAUTHORIZED, 191 detail="Invalid refresh token", 192 headers={"WWW-Authenticate": "Bearer"}, 193 ) 194 195 @auth_router.post("/register", response_model=UserResponse) 196 async def register_user( 197 user_create: UserCreate, 198 db = Depends(get_db_connection) 199 ): 200 """Creates a new user""" 201 # Check if the user already exists 202 existing_user = db.users.find_one({"username": user_create.username}) 203 if existing_user: 204 raise HTTPException( 205 status_code=status.HTTP_400_BAD_REQUEST, 206 detail="Username already in use" 207 ) 208 209 # Check if the email already exists 210 existing_email = db.users.find_one({"email": user_create.email}) 211 if existing_email: 212 raise HTTPException( 213 status_code=status.HTTP_400_BAD_REQUEST, 214 detail="Email already in use" 215 ) 216 217 # Hash the password 218 hashed_password = get_password_hash(user_create.password) 219 220 # Create a new user 221 new_user = UserInDB( 222 username=user_create.username, 223 email=user_create.email, 224 hashed_password=hashed_password, 225 is_active=True, 226 is_admin=False, 227 created_at=datetime.utcnow(), 228 subscription_level="free" 229 ).dict() 230 231 # Insert the user into the database 232 db.users.insert_one(new_user) 233 234 # Return user information without the password 235 user_response = UserResponse( 236 username=new_user["username"], 237 email=new_user["email"], 238 is_active=new_user["is_active"], 239 is_admin=new_user["is_admin"], 240 created_at=new_user["created_at"], 241 subscription_level=new_user["subscription_level"] 242 ) 243 244 return user_response 245 246 @auth_router.get("/me", response_model=UserResponse) 247 async def read_users_me( 248 current_user: User = Depends(get_current_active_user) 249 ): 250 """Retrieves information about the currently logged in user""" 251 return UserResponse( 252 username=current_user.username, 253 email=current_user.email, 254 is_active=current_user.is_active, 255 is_admin=current_user.is_admin, 256 created_at=current_user.created_at, 257 subscription_level=current_user.subscription_level, 258 last_login=current_user.last_login 259 ) 260 261 @auth_router.post("/change-password", response_model=SuccessResponse) 262 async def change_password( 263 request: PasswordChangeRequest, 264 current_user: User = Depends(get_current_active_user), 265 db = Depends(get_db_connection) 266 ): 267 """Changes the password of the connected user""" 268 # Verify current password 269 user_data = db.users.find_one({"username": current_user.username}) 270 if not user_data: 271 raise HTTPException( 272 status_code=status.HTTP_404_NOT_FOUND, 273 detail="User not found" 274 ) 275 276 user_db = UserInDB(**user_data) 277 278 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") 279 if not pwd_context.verify(request.current_password, user_db.hashed_password): 280 raise HTTPException( 281 status_code=status.HTTP_400_BAD_REQUEST, 282 detail="Current password is incorrect" 283 ) 284 285 # Hash the new password 286 hashed_password = get_password_hash(request.new_password) 287 288 # Update the password 289 db.users.update_one( 290 {"username": current_user.username}, 291 {"$set": {"hashed_password": hashed_password}} 292 ) 293 294 return SuccessResponse( 295 success=True, 296 message="Password changed successfully" 297 ) 298 299 @auth_router.post("/reset-password", response_model=SuccessResponse) 300 async def request_password_reset( 301 request: PasswordResetRequest, 302 db = Depends(get_db_connection) 303 ): 304 """Request password reset""" 305 # Check if email exists 306 user = db.users.find_one({"email": request.email}) 307 if not user: 308 # For security reasons, don't indicate if the email exists or not 309 return SuccessResponse( 310 success=True, 311 message="If the email exists, a reset link has been sent" 312 ) 313 314 # TODO: Send an email with a reset link 315 # In a real implementation, generate a unique token and send it by email 316 317 return SuccessResponse( 318 success=True, 319 message="If the email exists, a reset link has been sent" 320 ) 321 322 # Administrative routes (restricted access) 323 @auth_router.get("/users", response_model=List[UserResponse]) 324 async def list_users( 325 current_user: User = Depends(get_current_active_user), 326 db = Depends(get_db_connection) 327 ): 328 """Lists all users (reserved for administrators)""" 329 if not current_user.is_admin: 330 raise HTTPException( 331 status_code=status.HTTP_403_FORBIDDEN, 332 detail="Access reserved for administrators" 333 ) 334 335 users = list(db.users.find()) 336 user_responses = [] 337 338 for user in users: 339 user_responses.append(UserResponse( 340 username=user["username"], 341 email=user["email"], 342 is_active=user["is_active"], 343 is_admin=user["is_admin"], 344 created_at=user["created_at"], 345 subscription_level=user.get("subscription_level", "free"), 346 last_login=user.get("last_login") 347 )) 348 349 return user_responses 350 351 @auth_router.delete("/users/{username}", response_model=SuccessResponse) 352 async def delete_user( 353 username: str, 354 current_user: User = Depends(get_current_active_user), 355 db = Depends(get_db_connection) 356 ): 357 """Deletes a user (reserved for administrators)""" 358 if not current_user.is_admin: 359 raise HTTPException( 360 status_code=status.HTTP_403_FORBIDDEN, 361 detail="Access reserved for administrators" 362 ) 363 364 # Prevent deletion of own account 365 if username == current_user.username: 366 raise HTTPException( 367 status_code=status.HTTP_400_BAD_REQUEST, 368 detail="You cannot delete your own account" 369 ) 370 371 # Check if user exists 372 user = db.users.find_one({"username": username}) 373 if not user: 374 raise HTTPException( 375 status_code=status.HTTP_404_NOT_FOUND, 376 detail="User not found" 377 ) 378 379 # Delete the user 380 db.users.delete_one({"username": username}) 381 382 return SuccessResponse( 383 success=True, 384 message=f"User {username} deleted successfully" 385 )