user_service.py
1 """User service for handling user-related business logic.""" 2 from datetime import datetime, timezone 3 from typing import Any, Dict, List, Optional, Union 4 5 from fastapi import HTTPException, status 6 from pydantic import EmailStr 7 from sqlalchemy.orm import Session 8 9 from .. import crud, models, schemas 10 from ..core.security import get_password_hash 11 from .auth_service import get_password_hash as auth_get_password_hash 12 13 14 def get_user(db: Session, user_id: int) -> Optional[models.User]: 15 """Get a user by ID. 16 17 Args: 18 db: Database session 19 user_id: ID of the user to retrieve 20 21 Returns: 22 Optional[models.User]: The user if found, None otherwise 23 """ 24 return crud.user.get(db, id=user_id) 25 26 27 def get_user_by_email(db: Session, email: str) -> Optional[models.User]: 28 """Get a user by email. 29 30 Args: 31 db: Database session 32 email: Email of the user to retrieve 33 34 Returns: 35 Optional[models.User]: The user if found, None otherwise 36 """ 37 return crud.user.get_by_email(db, email=email) 38 39 40 def get_users( 41 db: Session, skip: int = 0, limit: int = 100 42 ) -> List[models.User]: 43 """Get a list of users with pagination. 44 45 Args: 46 db: Database session 47 skip: Number of records to skip 48 limit: Maximum number of records to return 49 50 Returns: 51 List[models.User]: List of users 52 """ 53 return crud.user.get_multi(db, skip=skip, limit=limit) 54 55 56 def create_user(db: Session, user_in: schemas.UserCreate) -> models.User: 57 """Create a new user. 58 59 Args: 60 db: Database session 61 user_in: User creation data 62 63 Returns: 64 models.User: The created user 65 66 Raises: 67 HTTPException: If a user with the email already exists 68 """ 69 user = crud.user.get_by_email(db, email=user_in.email) 70 if user: 71 raise HTTPException( 72 status_code=status.HTTP_400_BAD_REQUEST, 73 detail="The user with this email already exists.", 74 ) 75 76 # Hash the password 77 hashed_password = auth_get_password_hash(user_in.password) 78 79 # Create the user 80 user_create = schemas.UserCreateDB( 81 **user_in.dict(exclude={"password"}), 82 hashed_password=hashed_password, 83 ) 84 85 return crud.user.create(db, obj_in=user_create) 86 87 88 def update_user( 89 db: Session, *, db_user: models.User, user_in: Union[schemas.UserUpdate, Dict[str, Any]] 90 ) -> models.User: 91 """Update a user. 92 93 Args: 94 db: Database session 95 db_user: The user to update 96 user_in: User update data 97 98 Returns: 99 models.User: The updated user 100 """ 101 if isinstance(user_in, dict): 102 update_data = user_in 103 else: 104 update_data = user_in.dict(exclude_unset=True) 105 106 # Handle password update 107 if "password" in update_data: 108 hashed_password = auth_get_password_hash(update_data["password"]) 109 del update_data["password"] 110 update_data["hashed_password"] = hashed_password 111 112 return crud.user.update(db, db_obj=db_user, obj_in=update_data) 113 114 115 def delete_user(db: Session, *, user_id: int) -> models.User: 116 """Delete a user. 117 118 Args: 119 db: Database session 120 user_id: ID of the user to delete 121 122 Returns: 123 models.User: The deleted user 124 125 Raises: 126 HTTPException: If the user is not found 127 """ 128 user = get_user(db, user_id=user_id) 129 if not user: 130 raise HTTPException( 131 status_code=status.HTTP_404_NOT_FOUND, 132 detail="User not found", 133 ) 134 return crud.user.remove(db, id=user_id) 135 136 137 def update_user_last_login(db: Session, user: models.User) -> models.User: 138 """Update the user's last login timestamp. 139 140 Args: 141 db: Database session 142 user: The user to update 143 144 Returns: 145 models.User: The updated user 146 """ 147 user.last_login = datetime.now(timezone.utc) 148 db.add(user) 149 db.commit() 150 db.refresh(user) 151 return user 152 153 154 def change_password( 155 db: Session, *, user: models.User, current_password: str, new_password: str 156 ) -> models.User: 157 """Change a user's password. 158 159 Args: 160 db: Database session 161 user: The user changing their password 162 current_password: Current password for verification 163 new_password: New password to set 164 165 Returns: 166 models.User: The updated user 167 168 Raises: 169 HTTPException: If the current password is incorrect 170 """ 171 if not crud.user.authenticate( 172 db, email=user.email, password=current_password 173 ): 174 raise HTTPException( 175 status_code=status.HTTP_400_BAD_REQUEST, 176 detail="Incorrect current password", 177 ) 178 179 hashed_password = auth_get_password_hash(new_password) 180 user.hashed_password = hashed_password 181 db.add(user) 182 db.commit() 183 db.refresh(user) 184 return user