user.py
  1  """Endpoints for user management"""
  2  
  3  import hashlib
  4  from typing import Any
  5  
  6  from fastapi import APIRouter, Body, Query, Request
  7  from pyotp import random_base32
  8  from sqlalchemy import asc, func
  9  
 10  from .. import models
 11  from ..auth import admin_auth, get_user, is_admin, user_auth
 12  from ..database import db, filter_by, select
 13  from ..exceptions.auth import PermissionDeniedError, admin_responses, user_responses
 14  from ..exceptions.oauth import InvalidOAuthTokenError, RemoteAlreadyLinkedError
 15  from ..exceptions.user import (
 16      CannotDeleteLastLoginMethodError,
 17      InvalidCodeError,
 18      MFAAlreadyEnabledError,
 19      MFANotEnabledError,
 20      MFANotInitializedError,
 21      NoLoginMethodError,
 22      OAuthRegistrationDisabledError,
 23      RecaptchaError,
 24      RegistrationDisabledError,
 25      UserAlreadyExistsError,
 26      UserNotFoundError,
 27  )
 28  from ..redis import redis
 29  from ..schemas.session import LoginResponse
 30  from ..schemas.user import MFA_CODE_REGEX, CreateUser, UpdateUser, User, UsersResponse
 31  from ..settings import settings
 32  from ..utils.mfa import check_mfa_code
 33  from ..utils.recaptcha import check_recaptcha, recaptcha_enabled
 34  
 35  
 36  router = APIRouter()
 37  
 38  
 39  @router.get("/users", dependencies=[admin_auth], responses=admin_responses(UsersResponse))
 40  async def get_users(
 41      limit: int = Query(100, ge=1, le=100, description="The maximum number of users to return"),
 42      offset: int = Query(0, ge=0, description="The number of users to skip for pagination"),
 43      name: str | None = Query(None, max_length=256, description="A search term to match against the user's name"),
 44      enabled: bool | None = Query(None, description="Return only users with the given enabled status"),
 45      admin: bool | None = Query(None, description="Return only users with the given admin status"),
 46      mfa_enabled: bool | None = Query(None, description="Return only users with the given MFA status"),
 47  ) -> Any:
 48      """
 49      Return a list of all users matching the given criteria.
 50  
 51      *Requirements:* **ADMIN**
 52      """
 53  
 54      query = select(models.User)
 55      order = []
 56      if name:
 57          query = query.where(func.lower(models.User.name).contains(name.lower(), autoescape=True))
 58          order.append(asc(func.length(models.User.name)))
 59      if enabled is not None:
 60          query = query.where(models.User.enabled == enabled)
 61      if admin is not None:
 62          query = query.where(models.User.admin == admin)
 63      if mfa_enabled is not None:
 64          query = query.where(models.User.mfa_enabled == mfa_enabled)
 65  
 66      return {
 67          "total": await db.count(query),
 68          "users": [
 69              user.serialize
 70              async for user in await db.stream(
 71                  query.order_by(*order, asc(models.User.registration)).limit(limit).offset(offset)
 72              )
 73          ],
 74      }
 75  
 76  
 77  @router.get("/users/{user_id}", responses=admin_responses(User, UserNotFoundError))
 78  async def get_user_by_id(user: models.User = get_user(require_self_or_admin=True)) -> Any:
 79      """
 80      Return a user by ID.
 81  
 82      *Requirements:* **SELF** or **ADMIN**
 83      """
 84  
 85      return user.serialize
 86  
 87  
 88  @router.post(
 89      "/users",
 90      responses=user_responses(
 91          LoginResponse,
 92          UserAlreadyExistsError,
 93          RemoteAlreadyLinkedError,
 94          NoLoginMethodError,
 95          RegistrationDisabledError,
 96          OAuthRegistrationDisabledError,
 97          RecaptchaError,
 98          InvalidOAuthTokenError,
 99      ),
100  )
101  async def create_user(data: CreateUser, request: Request, admin: bool = is_admin) -> Any:
102      """
103      Create a new user and a new session for them.
104  
105      If the **ADMIN** requirement is *not* met:
106      - The user is always created as a regular user (`"enabled": true, "admin": false`).
107      - A recaptcha response is required if recaptcha is enabled (see `GET /recaptcha`).
108  
109      The value of the `User-agent` header is used as the device name of the created session.
110      """
111  
112      if not data.oauth_register_token and not data.password:
113          raise NoLoginMethodError
114      if not admin:
115          if data.password and not settings.open_registration:
116              raise RegistrationDisabledError
117          if data.oauth_register_token and not settings.open_oauth_registration:
118              raise OAuthRegistrationDisabledError
119  
120          if recaptcha_enabled() and not (data.recaptcha_response and await check_recaptcha(data.recaptcha_response)):
121              raise RecaptchaError
122  
123      if await db.exists(models.User.filter_by_name(data.name)):
124          raise UserAlreadyExistsError
125  
126      if data.oauth_register_token:
127          async with redis.pipeline() as pipe:
128              await pipe.get(key1 := f"oauth_register_token:{data.oauth_register_token}:provider")
129              await pipe.get(key2 := f"oauth_register_token:{data.oauth_register_token}:user_id")
130              await pipe.get(key3 := f"oauth_register_token:{data.oauth_register_token}:display_name")
131              provider_id, remote_user_id, display_name = await pipe.execute()
132  
133          if not provider_id or not remote_user_id:
134              raise InvalidOAuthTokenError
135  
136          await redis.delete(key1, key2, key3)
137  
138          if await db.exists(
139              filter_by(models.OAuthUserConnection, provider_id=provider_id, remote_user_id=remote_user_id)
140          ):
141              raise RemoteAlreadyLinkedError
142  
143      user = await models.User.create(data.name, data.password, data.enabled or not admin, data.admin and admin)
144  
145      if data.oauth_register_token:
146          await models.OAuthUserConnection.create(user.id, provider_id, remote_user_id, display_name)
147  
148      session, access_token, refresh_token = await user.create_session(request.headers.get("User-agent", "")[:256])
149      return {
150          "user": user.serialize,
151          "session": session.serialize,
152          "access_token": access_token,
153          "refresh_token": refresh_token,
154      }
155  
156  
157  @router.patch(
158      "/users/{user_id}",
159      responses=admin_responses(User, UserNotFoundError, UserAlreadyExistsError, CannotDeleteLastLoginMethodError),
160  )
161  async def update_user(
162      data: UpdateUser,
163      user: models.User = get_user(models.User.sessions, models.User.oauth_connections, require_self_or_admin=True),
164      admin: bool = is_admin,
165      session: models.Session = user_auth,
166  ) -> Any:
167      """
168      Update an existing user.
169  
170      - Setting `password` to `null` or omitting it will not change the user's password while setting it to
171        the empty string will remove the user's password.
172      - Disabling a user will also log them out.
173      - A user can never change their own admin status.
174  
175      *Requirements:* **SELF** or **ADMIN**
176  
177      If the **ADMIN** requirement is *not* met:
178      - The username cannot be changed.
179      - The user cannot be enabled or disabled.
180      - The admin status cannot be changed.
181      """
182  
183      if data.name is not None and data.name != user.name:
184          if not admin:
185              raise PermissionDeniedError
186          if await db.exists(models.User.filter_by_name(data.name).where(models.User.id != user.id)):
187              raise UserAlreadyExistsError
188  
189          user.name = data.name
190  
191      if data.password is not None:
192          if not data.password and not user.oauth_connections:
193              raise CannotDeleteLastLoginMethodError
194  
195          await user.change_password(data.password)
196  
197      if data.enabled is not None and data.enabled != user.enabled:
198          if user.id == session.user_id:
199              raise PermissionDeniedError
200  
201          user.enabled = data.enabled
202          if not user.enabled:
203              await user.logout()
204  
205      if data.admin is not None and data.admin != user.admin:
206          if user.id == session.user_id:
207              raise PermissionDeniedError
208  
209          user.admin = data.admin
210  
211      return user.serialize
212  
213  
214  @router.post("/users/{user_id}/mfa", responses=admin_responses(str, UserNotFoundError, MFAAlreadyEnabledError))
215  async def initialize_mfa(user: models.User = get_user(require_self_or_admin=True)) -> Any:
216      """
217      Initialize MFA for a user by generating a new TOTP secret.
218  
219      The TOTP secret generated by this endpoint should be used to configure the user's MFA app. After that the
220      `PUT /users/{user_id}/mfa` endpoint can be used to enable MFA.
221  
222      *Requirements:* **SELF** or **ADMIN**
223      """
224  
225      if user.mfa_enabled:
226          raise MFAAlreadyEnabledError
227  
228      user.mfa_secret = random_base32(32)
229      return user.mfa_secret
230  
231  
232  @router.put(
233      "/users/{user_id}/mfa",
234      responses=admin_responses(str, UserNotFoundError, MFAAlreadyEnabledError, MFANotInitializedError, InvalidCodeError),
235  )
236  async def enable_mfa(
237      code: str = Body(embed=True, regex=MFA_CODE_REGEX, description="The 6-digit code generated by the user's MFA app"),
238      user: models.User = get_user(require_self_or_admin=True),
239  ) -> Any:
240      """
241      Enable MFA for a user and generate the recovery code.
242  
243      This endpoint should be used after initializing MFA (see `POST /users/{user_id}/mfa`) to actually enable it
244      on the account.
245  
246      The recovery code generated by this endpoint can be used to login if the user has lost their MFA app and should
247      therefore be kept in a safe place.
248  
249      *Requirements:* **SELF** or **ADMIN**
250      """
251  
252      if user.mfa_enabled:
253          raise MFAAlreadyEnabledError
254      if not user.mfa_secret:
255          raise MFANotInitializedError
256      if not await check_mfa_code(code, user.mfa_secret):
257          raise InvalidCodeError
258  
259      recovery_code = "-".join(random_base32()[:6] for _ in range(4))
260      user.mfa_recovery_code = hashlib.sha256(recovery_code.encode()).hexdigest()
261      user.mfa_enabled = True
262  
263      return recovery_code
264  
265  
266  @router.delete("/users/{user_id}/mfa", responses=admin_responses(bool, UserNotFoundError, MFANotEnabledError))
267  async def disable_mfa(user: models.User = get_user(require_self_or_admin=True)) -> Any:
268      """
269      Disable MFA for a user.
270  
271      *Requirements:* **SELF** or **ADMIN**
272      """
273  
274      if not user.mfa_secret and not user.mfa_enabled:
275          raise MFANotEnabledError
276  
277      user.mfa_enabled = False
278      user.mfa_secret = None
279      user.mfa_recovery_code = None
280      return True
281  
282  
283  @router.delete("/users/{user_id}", responses=admin_responses(bool, UserNotFoundError))
284  async def delete_user(
285      user: models.User = get_user(models.User.sessions, require_self_or_admin=True), admin: bool = is_admin
286  ) -> Any:
287      """
288      Delete a user.
289  
290      If only one admin exists, this user cannot be deleted.
291  
292      *Requirements:* **SELF** or **ADMIN**
293      """
294  
295      if not (settings.open_registration or settings.open_oauth_registration) and not admin:
296          raise PermissionDeniedError
297  
298      if user.admin and not await db.exists(filter_by(models.User, admin=True).filter(models.User.id != user.id)):
299          raise PermissionDeniedError
300  
301      await user.logout()
302      await db.delete(user)
303      return True