user.py
1 """CRUD operations for users.""" 2 from typing import Any, Dict, Optional, Union 3 4 from fastapi.encoders import jsonable_encoder 5 from sqlalchemy.orm import Session 6 7 from ..core.security import get_password_hash, verify_password 8 from ..models.user import User 9 from .base import CRUDBase 10 from ..schemas.user import UserCreate, UserUpdate 11 12 13 class CRUDUser(CRUDBase[User, UserCreate, UserUpdate]): 14 """CRUD operations for User model.""" 15 16 def get_by_email(self, db: Session, *, email: str) -> Optional[User]: 17 """Get a user by email. 18 19 Args: 20 db: Database session 21 email: Email to search for 22 23 Returns: 24 Optional[User]: The user if found, None otherwise 25 """ 26 return db.query(User).filter(User.email == email).first() 27 28 def create(self, db: Session, *, obj_in: UserCreate) -> User: 29 """Create a new user with hashed password. 30 31 Args: 32 db: Database session 33 obj_in: User creation data 34 35 Returns: 36 User: The created user 37 """ 38 db_obj = User( 39 email=obj_in.email, 40 hashed_password=get_password_hash(obj_in.password), 41 full_name=obj_in.full_name, 42 is_superuser=obj_in.is_superuser, 43 ) 44 db.add(db_obj) 45 db.commit() 46 db.refresh(db_obj) 47 return db_obj 48 49 def update( 50 self, db: Session, *, db_obj: User, obj_in: Union[UserUpdate, Dict[str, Any]] 51 ) -> User: 52 """Update a user. 53 54 Args: 55 db: Database session 56 db_obj: The user to update 57 obj_in: Updated user data 58 59 Returns: 60 User: The updated user 61 """ 62 if isinstance(obj_in, dict): 63 update_data = obj_in 64 else: 65 update_data = obj_in.dict(exclude_unset=True) 66 67 if "password" in update_data: 68 hashed_password = get_password_hash(update_data["password"]) 69 del update_data["password"] 70 update_data["hashed_password"] = hashed_password 71 72 return super().update(db, db_obj=db_obj, obj_in=update_data) 73 74 def authenticate(self, db: Session, *, email: str, password: str) -> Optional[User]: 75 """Authenticate a user. 76 77 Args: 78 db: Database session 79 email: User's email 80 password: Plain text password 81 82 Returns: 83 Optional[User]: The user if authentication is successful, None otherwise 84 """ 85 user = self.get_by_email(db, email=email) 86 if not user: 87 return None 88 if not verify_password(password, user.hashed_password): 89 return None 90 return user 91 92 def is_active(self, user: User) -> bool: 93 """Check if a user is active. 94 95 Args: 96 user: The user to check 97 98 Returns: 99 bool: True if the user is active, False otherwise 100 """ 101 return user.is_active 102 103 def is_superuser(self, user: User) -> bool: 104 """Check if a user is a superuser. 105 106 Args: 107 user: The user to check 108 109 Returns: 110 bool: True if the user is a superuser, False otherwise 111 """ 112 return user.is_superuser 113 114 115 # Create a singleton instance 116 user = CRUDUser(User)