auth.py
1 """Authentication API endpoints.""" 2 from datetime import timedelta 3 from typing import Any 4 5 from fastapi import APIRouter, Depends, HTTPException, status 6 from fastapi.security import OAuth2PasswordRequestForm 7 from sqlalchemy.orm import Session 8 9 from sussro_services import crud, models, schemas 10 from sussro_services.api import deps 11 from sussro_services.core import security 12 from sussro_services.core.config import settings 13 from sussro_services.core.security import get_password_hash 14 from sussro_services.schemas.token import Token 15 from sussro_services.schemas.user import User as UserSchema 16 17 router = APIRouter() 18 19 @router.post("/login/access-token", response_model=schemas.Token) 20 def login_access_token( 21 db: Session = Depends(deps.get_db), form_data: OAuth2PasswordRequestForm = Depends() 22 ) -> Any: 23 """OAuth2 compatible token login, get an access token for future requests.""" 24 user = crud.user.authenticate( 25 db, email=form_data.username, password=form_data.password 26 ) 27 if not user: 28 raise HTTPException( 29 status_code=status.HTTP_400_BAD_REQUEST, 30 detail="Incorrect email or password", 31 ) 32 elif not crud.user.is_active(user): 33 raise HTTPException( 34 status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user" 35 ) 36 37 access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) 38 return { 39 "access_token": security.create_access_token( 40 user.id, expires_delta=access_token_expires 41 ), 42 "token_type": "bearer", 43 } 44 45 @router.post("/login/test-token", response_model=schemas.User) 46 def test_token(current_user: models.User = Depends(deps.get_current_user)) -> Any: 47 """Test access token.""" 48 return current_user 49 50 @router.post("/password-recovery/{email}", response_model=schemas.Msg) 51 def recover_password(email: str, db: Session = Depends(deps.get_db)) -> Any: 52 """Password Recovery.""" 53 user = crud.user.get_by_email(db, email=email) 54 55 if not user: 56 raise HTTPException( 57 status_code=status.HTTP_404_NOT_FOUND, 58 detail="The user with this username does not exist in the system.", 59 ) 60 61 # TODO: Send email with password reset link 62 return {"msg": "Password recovery email sent"} 63 64 @router.post("/reset-password/", response_model=schemas.Msg) 65 def reset_password( 66 token: str = Body(...), 67 new_password: str = Body(...), 68 db: Session = Depends(deps.get_db), 69 ) -> Any: 70 """Reset password.""" 71 email = security.verify_password_reset_token(token) 72 if not email: 73 raise HTTPException( 74 status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid token" 75 ) 76 77 user = crud.user.get_by_email(db, email=email) 78 if not user: 79 raise HTTPException( 80 status_code=status.HTTP_404_NOT_FOUND, 81 detail="The user with this username does not exist in the system.", 82 ) 83 elif not crud.user.is_active(user): 84 raise HTTPException(status_code=400, detail="Inactive user") 85 86 hashed_password = get_password_hash(new_password) 87 user.hashed_password = hashed_password 88 db.add(user) 89 db.commit() 90 91 return {"msg": "Password updated successfully"}