/ sussro_services / services / user_service.py
user_service.py
  1  """User service for handling user-related business logic."""
  2  from datetime import datetime, timezone
  3  from typing import Any, Dict, List, Optional, Union
  4  
  5  from fastapi import HTTPException, status
  6  from pydantic import EmailStr
  7  from sqlalchemy.orm import Session
  8  
  9  from .. import crud, models, schemas
 10  from ..core.security import get_password_hash
 11  from .auth_service import get_password_hash as auth_get_password_hash
 12  
 13  
 14  def get_user(db: Session, user_id: int) -> Optional[models.User]:
 15      """Get a user by ID.
 16      
 17      Args:
 18          db: Database session
 19          user_id: ID of the user to retrieve
 20          
 21      Returns:
 22          Optional[models.User]: The user if found, None otherwise
 23      """
 24      return crud.user.get(db, id=user_id)
 25  
 26  
 27  def get_user_by_email(db: Session, email: str) -> Optional[models.User]:
 28      """Get a user by email.
 29      
 30      Args:
 31          db: Database session
 32          email: Email of the user to retrieve
 33          
 34      Returns:
 35          Optional[models.User]: The user if found, None otherwise
 36      """
 37      return crud.user.get_by_email(db, email=email)
 38  
 39  
 40  def get_users(
 41      db: Session, skip: int = 0, limit: int = 100
 42  ) -> List[models.User]:
 43      """Get a list of users with pagination.
 44      
 45      Args:
 46          db: Database session
 47          skip: Number of records to skip
 48          limit: Maximum number of records to return
 49          
 50      Returns:
 51          List[models.User]: List of users
 52      """
 53      return crud.user.get_multi(db, skip=skip, limit=limit)
 54  
 55  
 56  def create_user(db: Session, user_in: schemas.UserCreate) -> models.User:
 57      """Create a new user.
 58      
 59      Args:
 60          db: Database session
 61          user_in: User creation data
 62          
 63      Returns:
 64          models.User: The created user
 65          
 66      Raises:
 67          HTTPException: If a user with the email already exists
 68      """
 69      user = crud.user.get_by_email(db, email=user_in.email)
 70      if user:
 71          raise HTTPException(
 72              status_code=status.HTTP_400_BAD_REQUEST,
 73              detail="The user with this email already exists.",
 74          )
 75      
 76      # Hash the password
 77      hashed_password = auth_get_password_hash(user_in.password)
 78      
 79      # Create the user
 80      user_create = schemas.UserCreateDB(
 81          **user_in.dict(exclude={"password"}),
 82          hashed_password=hashed_password,
 83      )
 84      
 85      return crud.user.create(db, obj_in=user_create)
 86  
 87  
 88  def update_user(
 89      db: Session, *, db_user: models.User, user_in: Union[schemas.UserUpdate, Dict[str, Any]]
 90  ) -> models.User:
 91      """Update a user.
 92      
 93      Args:
 94          db: Database session
 95          db_user: The user to update
 96          user_in: User update data
 97          
 98      Returns:
 99          models.User: The updated user
100      """
101      if isinstance(user_in, dict):
102          update_data = user_in
103      else:
104          update_data = user_in.dict(exclude_unset=True)
105      
106      # Handle password update
107      if "password" in update_data:
108          hashed_password = auth_get_password_hash(update_data["password"])
109          del update_data["password"]
110          update_data["hashed_password"] = hashed_password
111      
112      return crud.user.update(db, db_obj=db_user, obj_in=update_data)
113  
114  
115  def delete_user(db: Session, *, user_id: int) -> models.User:
116      """Delete a user.
117      
118      Args:
119          db: Database session
120          user_id: ID of the user to delete
121          
122      Returns:
123          models.User: The deleted user
124          
125      Raises:
126          HTTPException: If the user is not found
127      """
128      user = get_user(db, user_id=user_id)
129      if not user:
130          raise HTTPException(
131              status_code=status.HTTP_404_NOT_FOUND,
132              detail="User not found",
133          )
134      return crud.user.remove(db, id=user_id)
135  
136  
137  def update_user_last_login(db: Session, user: models.User) -> models.User:
138      """Update the user's last login timestamp.
139      
140      Args:
141          db: Database session
142          user: The user to update
143          
144      Returns:
145          models.User: The updated user
146      """
147      user.last_login = datetime.now(timezone.utc)
148      db.add(user)
149      db.commit()
150      db.refresh(user)
151      return user
152  
153  
154  def change_password(
155      db: Session, *, user: models.User, current_password: str, new_password: str
156  ) -> models.User:
157      """Change a user's password.
158      
159      Args:
160          db: Database session
161          user: The user changing their password
162          current_password: Current password for verification
163          new_password: New password to set
164          
165      Returns:
166          models.User: The updated user
167          
168      Raises:
169          HTTPException: If the current password is incorrect
170      """
171      if not crud.user.authenticate(
172          db, email=user.email, password=current_password
173      ):
174          raise HTTPException(
175              status_code=status.HTTP_400_BAD_REQUEST,
176              detail="Incorrect current password",
177          )
178      
179      hashed_password = auth_get_password_hash(new_password)
180      user.hashed_password = hashed_password
181      db.add(user)
182      db.commit()
183      db.refresh(user)
184      return user