/ sussro_services / crud / user.py
user.py
  1  """CRUD operations for users."""
  2  from typing import Any, Dict, Optional, Union
  3  
  4  from fastapi.encoders import jsonable_encoder
  5  from sqlalchemy.orm import Session
  6  
  7  from ..core.security import get_password_hash, verify_password
  8  from ..models.user import User
  9  from .base import CRUDBase
 10  from ..schemas.user import UserCreate, UserUpdate
 11  
 12  
 13  class CRUDUser(CRUDBase[User, UserCreate, UserUpdate]):
 14      """CRUD operations for User model."""
 15  
 16      def get_by_email(self, db: Session, *, email: str) -> Optional[User]:
 17          """Get a user by email.
 18  
 19          Args:
 20              db: Database session
 21              email: Email to search for
 22  
 23          Returns:
 24              Optional[User]: The user if found, None otherwise
 25          """
 26          return db.query(User).filter(User.email == email).first()
 27  
 28      def create(self, db: Session, *, obj_in: UserCreate) -> User:
 29          """Create a new user with hashed password.
 30  
 31          Args:
 32              db: Database session
 33              obj_in: User creation data
 34  
 35          Returns:
 36              User: The created user
 37          """
 38          db_obj = User(
 39              email=obj_in.email,
 40              hashed_password=get_password_hash(obj_in.password),
 41              full_name=obj_in.full_name,
 42              is_superuser=obj_in.is_superuser,
 43          )
 44          db.add(db_obj)
 45          db.commit()
 46          db.refresh(db_obj)
 47          return db_obj
 48  
 49      def update(
 50          self, db: Session, *, db_obj: User, obj_in: Union[UserUpdate, Dict[str, Any]]
 51      ) -> User:
 52          """Update a user.
 53  
 54          Args:
 55              db: Database session
 56              db_obj: The user to update
 57              obj_in: Updated user data
 58  
 59          Returns:
 60              User: The updated user
 61          """
 62          if isinstance(obj_in, dict):
 63              update_data = obj_in
 64          else:
 65              update_data = obj_in.dict(exclude_unset=True)
 66          
 67          if "password" in update_data:
 68              hashed_password = get_password_hash(update_data["password"])
 69              del update_data["password"]
 70              update_data["hashed_password"] = hashed_password
 71          
 72          return super().update(db, db_obj=db_obj, obj_in=update_data)
 73  
 74      def authenticate(self, db: Session, *, email: str, password: str) -> Optional[User]:
 75          """Authenticate a user.
 76  
 77          Args:
 78              db: Database session
 79              email: User's email
 80              password: Plain text password
 81  
 82          Returns:
 83              Optional[User]: The user if authentication is successful, None otherwise
 84          """
 85          user = self.get_by_email(db, email=email)
 86          if not user:
 87              return None
 88          if not verify_password(password, user.hashed_password):
 89              return None
 90          return user
 91  
 92      def is_active(self, user: User) -> bool:
 93          """Check if a user is active.
 94  
 95          Args:
 96              user: The user to check
 97  
 98          Returns:
 99              bool: True if the user is active, False otherwise
100          """
101          return user.is_active
102  
103      def is_superuser(self, user: User) -> bool:
104          """Check if a user is a superuser.
105  
106          Args:
107              user: The user to check
108  
109          Returns:
110              bool: True if the user is a superuser, False otherwise
111          """
112          return user.is_superuser
113  
114  
115  # Create a singleton instance
116  user = CRUDUser(User)