user.py
1 """Endpoints for user management""" 2 3 import hashlib 4 from typing import Any 5 6 from fastapi import APIRouter, Body, Query, Request 7 from pyotp import random_base32 8 from sqlalchemy import asc, func 9 10 from .. import models 11 from ..auth import admin_auth, get_user, is_admin, user_auth 12 from ..database import db, filter_by, select 13 from ..exceptions.auth import PermissionDeniedError, admin_responses, user_responses 14 from ..exceptions.oauth import InvalidOAuthTokenError, RemoteAlreadyLinkedError 15 from ..exceptions.user import ( 16 CannotDeleteLastLoginMethodError, 17 InvalidCodeError, 18 MFAAlreadyEnabledError, 19 MFANotEnabledError, 20 MFANotInitializedError, 21 NoLoginMethodError, 22 OAuthRegistrationDisabledError, 23 RecaptchaError, 24 RegistrationDisabledError, 25 UserAlreadyExistsError, 26 UserNotFoundError, 27 ) 28 from ..redis import redis 29 from ..schemas.session import LoginResponse 30 from ..schemas.user import MFA_CODE_REGEX, CreateUser, UpdateUser, User, UsersResponse 31 from ..settings import settings 32 from ..utils.mfa import check_mfa_code 33 from ..utils.recaptcha import check_recaptcha, recaptcha_enabled 34 35 36 router = APIRouter() 37 38 39 @router.get("/users", dependencies=[admin_auth], responses=admin_responses(UsersResponse)) 40 async def get_users( 41 limit: int = Query(100, ge=1, le=100, description="The maximum number of users to return"), 42 offset: int = Query(0, ge=0, description="The number of users to skip for pagination"), 43 name: str | None = Query(None, max_length=256, description="A search term to match against the user's name"), 44 enabled: bool | None = Query(None, description="Return only users with the given enabled status"), 45 admin: bool | None = Query(None, description="Return only users with the given admin status"), 46 mfa_enabled: bool | None = Query(None, description="Return only users with the given MFA status"), 47 ) -> Any: 48 """ 49 Return a list of all users matching the given criteria. 50 51 *Requirements:* **ADMIN** 52 """ 53 54 query = select(models.User) 55 order = [] 56 if name: 57 query = query.where(func.lower(models.User.name).contains(name.lower(), autoescape=True)) 58 order.append(asc(func.length(models.User.name))) 59 if enabled is not None: 60 query = query.where(models.User.enabled == enabled) 61 if admin is not None: 62 query = query.where(models.User.admin == admin) 63 if mfa_enabled is not None: 64 query = query.where(models.User.mfa_enabled == mfa_enabled) 65 66 return { 67 "total": await db.count(query), 68 "users": [ 69 user.serialize 70 async for user in await db.stream( 71 query.order_by(*order, asc(models.User.registration)).limit(limit).offset(offset) 72 ) 73 ], 74 } 75 76 77 @router.get("/users/{user_id}", responses=admin_responses(User, UserNotFoundError)) 78 async def get_user_by_id(user: models.User = get_user(require_self_or_admin=True)) -> Any: 79 """ 80 Return a user by ID. 81 82 *Requirements:* **SELF** or **ADMIN** 83 """ 84 85 return user.serialize 86 87 88 @router.post( 89 "/users", 90 responses=user_responses( 91 LoginResponse, 92 UserAlreadyExistsError, 93 RemoteAlreadyLinkedError, 94 NoLoginMethodError, 95 RegistrationDisabledError, 96 OAuthRegistrationDisabledError, 97 RecaptchaError, 98 InvalidOAuthTokenError, 99 ), 100 ) 101 async def create_user(data: CreateUser, request: Request, admin: bool = is_admin) -> Any: 102 """ 103 Create a new user and a new session for them. 104 105 If the **ADMIN** requirement is *not* met: 106 - The user is always created as a regular user (`"enabled": true, "admin": false`). 107 - A recaptcha response is required if recaptcha is enabled (see `GET /recaptcha`). 108 109 The value of the `User-agent` header is used as the device name of the created session. 110 """ 111 112 if not data.oauth_register_token and not data.password: 113 raise NoLoginMethodError 114 if not admin: 115 if data.password and not settings.open_registration: 116 raise RegistrationDisabledError 117 if data.oauth_register_token and not settings.open_oauth_registration: 118 raise OAuthRegistrationDisabledError 119 120 if recaptcha_enabled() and not (data.recaptcha_response and await check_recaptcha(data.recaptcha_response)): 121 raise RecaptchaError 122 123 if await db.exists(models.User.filter_by_name(data.name)): 124 raise UserAlreadyExistsError 125 126 if data.oauth_register_token: 127 async with redis.pipeline() as pipe: 128 await pipe.get(key1 := f"oauth_register_token:{data.oauth_register_token}:provider") 129 await pipe.get(key2 := f"oauth_register_token:{data.oauth_register_token}:user_id") 130 await pipe.get(key3 := f"oauth_register_token:{data.oauth_register_token}:display_name") 131 provider_id, remote_user_id, display_name = await pipe.execute() 132 133 if not provider_id or not remote_user_id: 134 raise InvalidOAuthTokenError 135 136 await redis.delete(key1, key2, key3) 137 138 if await db.exists( 139 filter_by(models.OAuthUserConnection, provider_id=provider_id, remote_user_id=remote_user_id) 140 ): 141 raise RemoteAlreadyLinkedError 142 143 user = await models.User.create(data.name, data.password, data.enabled or not admin, data.admin and admin) 144 145 if data.oauth_register_token: 146 await models.OAuthUserConnection.create(user.id, provider_id, remote_user_id, display_name) 147 148 session, access_token, refresh_token = await user.create_session(request.headers.get("User-agent", "")[:256]) 149 return { 150 "user": user.serialize, 151 "session": session.serialize, 152 "access_token": access_token, 153 "refresh_token": refresh_token, 154 } 155 156 157 @router.patch( 158 "/users/{user_id}", 159 responses=admin_responses(User, UserNotFoundError, UserAlreadyExistsError, CannotDeleteLastLoginMethodError), 160 ) 161 async def update_user( 162 data: UpdateUser, 163 user: models.User = get_user(models.User.sessions, models.User.oauth_connections, require_self_or_admin=True), 164 admin: bool = is_admin, 165 session: models.Session = user_auth, 166 ) -> Any: 167 """ 168 Update an existing user. 169 170 - Setting `password` to `null` or omitting it will not change the user's password while setting it to 171 the empty string will remove the user's password. 172 - Disabling a user will also log them out. 173 - A user can never change their own admin status. 174 175 *Requirements:* **SELF** or **ADMIN** 176 177 If the **ADMIN** requirement is *not* met: 178 - The username cannot be changed. 179 - The user cannot be enabled or disabled. 180 - The admin status cannot be changed. 181 """ 182 183 if data.name is not None and data.name != user.name: 184 if not admin: 185 raise PermissionDeniedError 186 if await db.exists(models.User.filter_by_name(data.name).where(models.User.id != user.id)): 187 raise UserAlreadyExistsError 188 189 user.name = data.name 190 191 if data.password is not None: 192 if not data.password and not user.oauth_connections: 193 raise CannotDeleteLastLoginMethodError 194 195 await user.change_password(data.password) 196 197 if data.enabled is not None and data.enabled != user.enabled: 198 if user.id == session.user_id: 199 raise PermissionDeniedError 200 201 user.enabled = data.enabled 202 if not user.enabled: 203 await user.logout() 204 205 if data.admin is not None and data.admin != user.admin: 206 if user.id == session.user_id: 207 raise PermissionDeniedError 208 209 user.admin = data.admin 210 211 return user.serialize 212 213 214 @router.post("/users/{user_id}/mfa", responses=admin_responses(str, UserNotFoundError, MFAAlreadyEnabledError)) 215 async def initialize_mfa(user: models.User = get_user(require_self_or_admin=True)) -> Any: 216 """ 217 Initialize MFA for a user by generating a new TOTP secret. 218 219 The TOTP secret generated by this endpoint should be used to configure the user's MFA app. After that the 220 `PUT /users/{user_id}/mfa` endpoint can be used to enable MFA. 221 222 *Requirements:* **SELF** or **ADMIN** 223 """ 224 225 if user.mfa_enabled: 226 raise MFAAlreadyEnabledError 227 228 user.mfa_secret = random_base32(32) 229 return user.mfa_secret 230 231 232 @router.put( 233 "/users/{user_id}/mfa", 234 responses=admin_responses(str, UserNotFoundError, MFAAlreadyEnabledError, MFANotInitializedError, InvalidCodeError), 235 ) 236 async def enable_mfa( 237 code: str = Body(embed=True, regex=MFA_CODE_REGEX, description="The 6-digit code generated by the user's MFA app"), 238 user: models.User = get_user(require_self_or_admin=True), 239 ) -> Any: 240 """ 241 Enable MFA for a user and generate the recovery code. 242 243 This endpoint should be used after initializing MFA (see `POST /users/{user_id}/mfa`) to actually enable it 244 on the account. 245 246 The recovery code generated by this endpoint can be used to login if the user has lost their MFA app and should 247 therefore be kept in a safe place. 248 249 *Requirements:* **SELF** or **ADMIN** 250 """ 251 252 if user.mfa_enabled: 253 raise MFAAlreadyEnabledError 254 if not user.mfa_secret: 255 raise MFANotInitializedError 256 if not await check_mfa_code(code, user.mfa_secret): 257 raise InvalidCodeError 258 259 recovery_code = "-".join(random_base32()[:6] for _ in range(4)) 260 user.mfa_recovery_code = hashlib.sha256(recovery_code.encode()).hexdigest() 261 user.mfa_enabled = True 262 263 return recovery_code 264 265 266 @router.delete("/users/{user_id}/mfa", responses=admin_responses(bool, UserNotFoundError, MFANotEnabledError)) 267 async def disable_mfa(user: models.User = get_user(require_self_or_admin=True)) -> Any: 268 """ 269 Disable MFA for a user. 270 271 *Requirements:* **SELF** or **ADMIN** 272 """ 273 274 if not user.mfa_secret and not user.mfa_enabled: 275 raise MFANotEnabledError 276 277 user.mfa_enabled = False 278 user.mfa_secret = None 279 user.mfa_recovery_code = None 280 return True 281 282 283 @router.delete("/users/{user_id}", responses=admin_responses(bool, UserNotFoundError)) 284 async def delete_user( 285 user: models.User = get_user(models.User.sessions, require_self_or_admin=True), admin: bool = is_admin 286 ) -> Any: 287 """ 288 Delete a user. 289 290 If only one admin exists, this user cannot be deleted. 291 292 *Requirements:* **SELF** or **ADMIN** 293 """ 294 295 if not (settings.open_registration or settings.open_oauth_registration) and not admin: 296 raise PermissionDeniedError 297 298 if user.admin and not await db.exists(filter_by(models.User, admin=True).filter(models.User.id != user.id)): 299 raise PermissionDeniedError 300 301 await user.logout() 302 await db.delete(user) 303 return True