/ sussro_services / api / v1 / endpoints / auth.py
auth.py
 1  """Authentication API endpoints."""
 2  from datetime import timedelta
 3  from typing import Any
 4  
 5  from fastapi import APIRouter, Depends, HTTPException, status
 6  from fastapi.security import OAuth2PasswordRequestForm
 7  from sqlalchemy.orm import Session
 8  
 9  from sussro_services import crud, models, schemas
10  from sussro_services.api import deps
11  from sussro_services.core import security
12  from sussro_services.core.config import settings
13  from sussro_services.core.security import get_password_hash
14  from sussro_services.schemas.token import Token
15  from sussro_services.schemas.user import User as UserSchema
16  
17  router = APIRouter()
18  
19  @router.post("/login/access-token", response_model=schemas.Token)
20  def login_access_token(
21      db: Session = Depends(deps.get_db), form_data: OAuth2PasswordRequestForm = Depends()
22  ) -> Any:
23      """OAuth2 compatible token login, get an access token for future requests."""
24      user = crud.user.authenticate(
25          db, email=form_data.username, password=form_data.password
26      )
27      if not user:
28          raise HTTPException(
29              status_code=status.HTTP_400_BAD_REQUEST,
30              detail="Incorrect email or password",
31          )
32      elif not crud.user.is_active(user):
33          raise HTTPException(
34              status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user"
35          )
36      
37      access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
38      return {
39          "access_token": security.create_access_token(
40              user.id, expires_delta=access_token_expires
41          ),
42          "token_type": "bearer",
43      }
44  
45  @router.post("/login/test-token", response_model=schemas.User)
46  def test_token(current_user: models.User = Depends(deps.get_current_user)) -> Any:
47      """Test access token."""
48      return current_user
49  
50  @router.post("/password-recovery/{email}", response_model=schemas.Msg)
51  def recover_password(email: str, db: Session = Depends(deps.get_db)) -> Any:
52      """Password Recovery."""
53      user = crud.user.get_by_email(db, email=email)
54      
55      if not user:
56          raise HTTPException(
57              status_code=status.HTTP_404_NOT_FOUND,
58              detail="The user with this username does not exist in the system.",
59          )
60      
61      # TODO: Send email with password reset link
62      return {"msg": "Password recovery email sent"}
63  
64  @router.post("/reset-password/", response_model=schemas.Msg)
65  def reset_password(
66      token: str = Body(...),
67      new_password: str = Body(...),
68      db: Session = Depends(deps.get_db),
69  ) -> Any:
70      """Reset password."""
71      email = security.verify_password_reset_token(token)
72      if not email:
73          raise HTTPException(
74              status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid token"
75          )
76      
77      user = crud.user.get_by_email(db, email=email)
78      if not user:
79          raise HTTPException(
80              status_code=status.HTTP_404_NOT_FOUND,
81              detail="The user with this username does not exist in the system.",
82          )
83      elif not crud.user.is_active(user):
84          raise HTTPException(status_code=400, detail="Inactive user")
85      
86      hashed_password = get_password_hash(new_password)
87      user.hashed_password = hashed_password
88      db.add(user)
89      db.commit()
90      
91      return {"msg": "Password updated successfully"}