/ src / api / routes / admin.py
admin.py
   1  """Admin API endpoints for platform management."""
   2  import json
   3  import logging
   4  import math
   5  import uuid
   6  from datetime import datetime, timedelta, timezone
   7  from typing import Optional
   8  
   9  import bcrypt
  10  from fastapi import APIRouter, Depends, HTTPException, Query, status
  11  from sqlalchemy import delete, func, select
  12  from sqlalchemy.ext.asyncio import AsyncSession
  13  
  14  from ...db.database import get_db
  15  from ...db.models import (
  16      APIKey, APIKeyAuditLog, Reseller,
  17      Session, User,
  18  )
  19  from ...services.reseller_service import ResellerNotFoundError, reseller_service
  20  from ..deps import AuthContext, get_auth_context
  21  from ..reseller_models import (
  22      AuditLogEntry, AuditLogResponse, ChangePasswordRequest,
  23      CreateResellerRequest, CreateResellerUserRequest,
  24      DeleteResellerResponse, DeleteUserResponse, PaginationInfo,
  25      PasswordChangedResponse, PlatformConfigResponse, PlatformStats,
  26      ResellerLimits, ResellerListResponse, ResellerResponse,
  27      ResellerSpending, ResellerStats, RetentionConfigResponse,
  28      RetentionRunResponse, SpendingCurrent, SpendingLimits,
  29      SuspendResellerResponse, SuspendRequest, SuspendUserResponse,
  30      UnsuspendResellerResponse, UpdatePlatformConfigRequest,
  31      UpdateResellerRequest, UpdateRetentionRequest,
  32      UsagePeriod, UsageResponse, UsageTotals,
  33  )
  34  
  35  logger = logging.getLogger(__name__)
  36  router = APIRouter(prefix="/admin", tags=["admin"])
  37  
  38  from src.api.routes._helpers import _read_version
  39  
  40  
  41  # =============================================================================
  42  # Admin auth dependency
  43  # =============================================================================
  44  
  45  async def _require_admin(
  46      auth: AuthContext = Depends(get_auth_context),
  47  ) -> AuthContext:
  48      """Reject non-admin callers with 403."""
  49      if not auth.is_admin:
  50          raise HTTPException(
  51              status_code=status.HTTP_403_FORBIDDEN,
  52              detail="Admin access required",
  53          )
  54      return auth
  55  
  56  
  57  # =============================================================================
  58  # Helpers
  59  # =============================================================================
  60  
  61  
  62  def _build_reseller_response(reseller: Reseller) -> ResellerResponse:
  63      """Convert a Reseller ORM object to a ResellerResponse."""
  64      quota = reseller.quota
  65      current_users = quota.current_user_count if quota else 0
  66      features = {}
  67      if reseller.features_json:
  68          try:
  69              features = json.loads(reseller.features_json)
  70          except (json.JSONDecodeError, TypeError):
  71              pass
  72  
  73      limits = ResellerLimits(
  74          max_users=reseller.max_users,
  75          current_users=current_users,
  76          max_concurrent_tasks=reseller.max_concurrent_tasks,
  77          max_daily_tasks=reseller.max_daily_tasks,
  78      )
  79  
  80      spending = ResellerSpending(
  81          limits=SpendingLimits(
  82              monthly_usd=reseller.max_monthly_spending_usd,
  83              daily_usd=reseller.max_daily_spending_usd,
  84          ),
  85          current=SpendingCurrent(
  86              monthly_usd=quota.monthly_cost_usd if quota else 0.0,
  87              daily_usd=quota.daily_cost_usd if quota else 0.0,
  88          ),
  89          alert_threshold_pct=reseller.spending_alert_threshold_pct,
  90      )
  91  
  92      owner_username = reseller.owner.username if reseller.owner else None
  93  
  94      return ResellerResponse(
  95          id=reseller.id,
  96          name=reseller.name,
  97          company=reseller.company,
  98          contact_email=reseller.contact_email,
  99          owner_user_id=reseller.owner_user_id,
 100          owner_username=owner_username,
 101          is_active=reseller.is_active,
 102          suspended_at=reseller.suspended_at,
 103          limits=limits,
 104          llm_provider=reseller.llm_provider,
 105          features=features,
 106          spending=spending,
 107          notes=reseller.notes,
 108          created_at=reseller.created_at,
 109          updated_at=reseller.updated_at,
 110      )
 111  
 112  
 113  # =============================================================================
 114  # Reseller management
 115  # =============================================================================
 116  
 117  @router.post("/resellers", status_code=201, response_model=ResellerResponse)
 118  async def create_reseller(
 119      body: CreateResellerRequest,
 120      db: AsyncSession = Depends(get_db),
 121      auth: AuthContext = Depends(_require_admin),
 122  ) -> ResellerResponse:
 123      """Create a new reseller with an owner user account and quota record."""
 124      try:
 125          reseller = await reseller_service.create_reseller(
 126              db=db,
 127              name=body.name,
 128              company=body.company,
 129              contact_email=body.contact_email,
 130              password=body.password,
 131              max_users=body.max_users,
 132              max_concurrent_tasks=body.max_concurrent_tasks,
 133              max_daily_tasks=body.max_daily_tasks,
 134              llm_provider=body.llm_provider,
 135              features=body.features,
 136              notes=body.notes,
 137              max_monthly_spending_usd=body.max_monthly_spending_usd,
 138              max_daily_spending_usd=body.max_daily_spending_usd,
 139              spending_alert_threshold_pct=body.spending_alert_threshold_pct,
 140          )
 141      except Exception as exc:
 142          logger.error("Failed to create reseller: %s", exc)
 143          raise HTTPException(
 144              status_code=status.HTTP_400_BAD_REQUEST,
 145              detail=str(exc),
 146          )
 147  
 148      # Reload with relationships for response building
 149      reseller = await reseller_service.get_reseller(db, reseller.id)
 150      return _build_reseller_response(reseller)
 151  
 152  
 153  @router.get("/resellers", response_model=ResellerListResponse)
 154  async def list_resellers(
 155      page: int = Query(default=1, ge=1),
 156      per_page: int = Query(default=50, ge=1, le=200),
 157      status_filter: str = Query(default="all", alias="status"),
 158      search: Optional[str] = Query(default=None),
 159      db: AsyncSession = Depends(get_db),
 160      auth: AuthContext = Depends(_require_admin),
 161  ) -> ResellerListResponse:
 162      """List resellers with pagination and optional filtering."""
 163      resellers, total = await reseller_service.list_resellers(
 164          db=db,
 165          page=page,
 166          per_page=per_page,
 167          status_filter=status_filter,
 168          search=search,
 169      )
 170  
 171      items = [_build_reseller_response(r) for r in resellers]
 172      total_pages = math.ceil(total / per_page) if total > 0 else 1
 173  
 174      return ResellerListResponse(
 175          resellers=items,
 176          pagination=PaginationInfo(
 177              page=page,
 178              per_page=per_page,
 179              total=total,
 180              total_pages=total_pages,
 181          ),
 182      )
 183  
 184  
 185  @router.get("/resellers/{reseller_id}", response_model=ResellerResponse)
 186  async def get_reseller(
 187      reseller_id: str,
 188      db: AsyncSession = Depends(get_db),
 189      auth: AuthContext = Depends(_require_admin),
 190  ) -> ResellerResponse:
 191      """Get full reseller details including stats and API keys."""
 192      reseller = await reseller_service.get_reseller(db, reseller_id)
 193      if reseller is None:
 194          raise HTTPException(status_code=404, detail="Reseller not found")
 195  
 196      stats_data = await reseller_service.get_reseller_stats(db, reseller_id)
 197      response = _build_reseller_response(reseller)
 198  
 199      response.stats = ResellerStats(
 200          user_count=stats_data.get("user_count", 0),
 201          active_users_30d=0,  # Not tracked separately
 202          total_sessions=stats_data.get("total_sessions", 0),
 203          total_cost_usd=stats_data.get("total_cost_usd", 0.0),
 204          api_keys_active=sum(1 for k in reseller.api_keys if k.is_active),
 205          sessions_this_month=0,
 206          cost_this_month_usd=stats_data.get("quota", {}).get("monthly_cost_usd", 0.0)
 207          if stats_data.get("quota") else 0.0,
 208      )
 209  
 210      return response
 211  
 212  
 213  @router.get("/resellers/{reseller_id}/config")
 214  async def get_reseller_config(
 215      reseller_id: str,
 216      db: AsyncSession = Depends(get_db),
 217      auth: AuthContext = Depends(_require_admin),
 218  ) -> dict:
 219      """Return full config tree: reseller defaults plus all per-user overrides."""
 220      reseller = await reseller_service.get_reseller(db, reseller_id)
 221      if reseller is None:
 222          raise HTTPException(status_code=404, detail="Reseller not found")
 223  
 224      reseller_features = {}
 225      if reseller.features_json:
 226          try:
 227              reseller_features = json.loads(reseller.features_json)
 228          except (json.JSONDecodeError, TypeError):
 229              pass
 230  
 231      user_configs = []
 232      for user in reseller.users:
 233          feature_overrides = {}
 234          security_overrides = {}
 235          if user.features_json:
 236              try:
 237                  feature_overrides = json.loads(user.features_json)
 238              except (json.JSONDecodeError, TypeError):
 239                  pass
 240          if user.security_overrides_json:
 241              try:
 242                  security_overrides = json.loads(user.security_overrides_json)
 243              except (json.JSONDecodeError, TypeError):
 244                  pass
 245  
 246          user_configs.append({
 247              "user_id": user.id,
 248              "username": user.username,
 249              "settings_mode": user.settings_mode,
 250              "feature_overrides": feature_overrides,
 251              "security_overrides": security_overrides,
 252              "spending_limits": {
 253                  "monthly_usd": user.spending_limit_monthly_usd,
 254                  "daily_usd": user.spending_limit_daily_usd,
 255                  "per_session_usd": user.spending_limit_per_session_usd,
 256              },
 257          })
 258  
 259      return {
 260          "reseller_id": reseller_id,
 261          "reseller_name": reseller.name,
 262          "defaults": {
 263              "max_users": reseller.max_users,
 264              "max_concurrent_tasks": reseller.max_concurrent_tasks,
 265              "max_daily_tasks": reseller.max_daily_tasks,
 266              "llm_provider": reseller.llm_provider,
 267              "features": reseller_features,
 268              "spending_caps": {
 269                  "monthly_usd": reseller.max_monthly_spending_usd,
 270                  "daily_usd": reseller.max_daily_spending_usd,
 271                  "alert_threshold_pct": reseller.spending_alert_threshold_pct,
 272              },
 273          },
 274          "users": user_configs,
 275      }
 276  
 277  
 278  @router.put("/resellers/{reseller_id}", response_model=ResellerResponse)
 279  async def update_reseller(
 280      reseller_id: str,
 281      body: UpdateResellerRequest,
 282      db: AsyncSession = Depends(get_db),
 283      auth: AuthContext = Depends(_require_admin),
 284  ) -> ResellerResponse:
 285      """Update reseller fields."""
 286      update_kwargs = body.model_dump(exclude_none=True)
 287      if "features" in update_kwargs:
 288          update_kwargs["features_json"] = json.dumps(update_kwargs.pop("features"))
 289  
 290      try:
 291          reseller = await reseller_service.update_reseller(db, reseller_id, **update_kwargs)
 292      except ResellerNotFoundError:
 293          raise HTTPException(status_code=404, detail="Reseller not found")
 294  
 295      return _build_reseller_response(reseller)
 296  
 297  
 298  @router.post("/resellers/{reseller_id}/suspend", response_model=SuspendResellerResponse)
 299  async def suspend_reseller(
 300      reseller_id: str,
 301      body: Optional[SuspendRequest] = None,
 302      db: AsyncSession = Depends(get_db),
 303      auth: AuthContext = Depends(_require_admin),
 304  ) -> SuspendResellerResponse:
 305      """Suspend a reseller and all their users, API keys, and active sessions."""
 306      try:
 307          result = await reseller_service.suspend_reseller(
 308              db, reseller_id, reason=body.reason if body else None
 309          )
 310      except ResellerNotFoundError:
 311          raise HTTPException(status_code=404, detail="Reseller not found")
 312  
 313      reseller = await reseller_service.get_reseller(db, reseller_id)
 314      return SuspendResellerResponse(
 315          id=reseller_id,
 316          name=reseller.name if reseller else "",
 317          is_active=False,
 318          suspended_at=reseller.suspended_at if reseller else datetime.now(timezone.utc),
 319          users_suspended=result.get("users_suspended", 0),
 320          sessions_cancelled=result.get("sessions_cancelled", 0),
 321          api_keys_deactivated=result.get("keys_deactivated", 0),
 322      )
 323  
 324  
 325  @router.post("/resellers/{reseller_id}/unsuspend", response_model=UnsuspendResellerResponse)
 326  async def unsuspend_reseller(
 327      reseller_id: str,
 328      db: AsyncSession = Depends(get_db),
 329      auth: AuthContext = Depends(_require_admin),
 330  ) -> UnsuspendResellerResponse:
 331      """Restore a suspended reseller."""
 332      try:
 333          result = await reseller_service.unsuspend_reseller(db, reseller_id)
 334      except ResellerNotFoundError:
 335          raise HTTPException(status_code=404, detail="Reseller not found")
 336  
 337      reseller = await reseller_service.get_reseller(db, reseller_id)
 338      return UnsuspendResellerResponse(
 339          id=reseller_id,
 340          name=reseller.name if reseller else "",
 341          is_active=True,
 342          users_restored=result.get("users_restored", 0),
 343          api_keys_reactivated=result.get("keys_reactivated", 0),
 344      )
 345  
 346  
 347  @router.delete("/resellers/{reseller_id}", response_model=DeleteResellerResponse)
 348  async def delete_reseller(
 349      reseller_id: str,
 350      confirm: bool = Query(default=False),
 351      db: AsyncSession = Depends(get_db),
 352      auth: AuthContext = Depends(_require_admin),
 353  ) -> DeleteResellerResponse:
 354      """Delete a reseller and all associated data. Requires ?confirm=true."""
 355      if not confirm:
 356          raise HTTPException(
 357              status_code=status.HTTP_400_BAD_REQUEST,
 358              detail="Deletion requires ?confirm=true query parameter.",
 359          )
 360  
 361      reseller = await reseller_service.get_reseller(db, reseller_id)
 362      if reseller is None:
 363          raise HTTPException(status_code=404, detail="Reseller not found")
 364      reseller_name = reseller.name
 365  
 366      try:
 367          result = await reseller_service.delete_reseller(db, reseller_id)
 368      except ResellerNotFoundError:
 369          raise HTTPException(status_code=404, detail="Reseller not found")
 370  
 371      return DeleteResellerResponse(
 372          status="deleted",
 373          name=reseller_name,
 374          users_deleted=result.get("users_deleted", 0),
 375          sessions_deleted=result.get("sessions_deleted", 0),
 376      )
 377  
 378  
 379  # =============================================================================
 380  # User management (admin override — cross-reseller)
 381  # =============================================================================
 382  
 383  @router.get("/users")
 384  async def list_all_users(
 385      page: int = Query(default=1, ge=1),
 386      per_page: int = Query(default=50, ge=1, le=200),
 387      reseller_id: Optional[str] = Query(default=None),
 388      status_filter: str = Query(default="all", alias="status"),
 389      role: Optional[str] = Query(default=None),
 390      search: Optional[str] = Query(default=None),
 391      db: AsyncSession = Depends(get_db),
 392      auth: AuthContext = Depends(_require_admin),
 393  ) -> dict:
 394      """List all users across all resellers."""
 395      query = select(User)
 396  
 397      if reseller_id is not None:
 398          query = query.where(User.reseller_id == reseller_id)
 399      if status_filter == "active":
 400          query = query.where(User.is_active == True)  # noqa: E712
 401      elif status_filter == "suspended":
 402          query = query.where(User.is_active == False)  # noqa: E712
 403      if role:
 404          query = query.where(User.role == role)
 405      if search:
 406          query = query.where(
 407              User.username.ilike(f"%{search}%") | User.email.ilike(f"%{search}%")
 408          )
 409  
 410      count_result = await db.execute(
 411          select(func.count()).select_from(query.subquery())
 412      )
 413      total = count_result.scalar_one()
 414  
 415      offset = (page - 1) * per_page
 416      paged_result = await db.execute(
 417          query.order_by(User.created_at.desc()).offset(offset).limit(per_page)
 418      )
 419      users = list(paged_result.scalars().all())
 420  
 421      # Batch-load reseller names
 422      reseller_ids = {u.reseller_id for u in users if u.reseller_id}
 423      reseller_names: dict[str, str] = {}
 424      if reseller_ids:
 425          res_result = await db.execute(
 426              select(Reseller.id, Reseller.name).where(Reseller.id.in_(reseller_ids))
 427          )
 428          for row in res_result:
 429              reseller_names[row[0]] = row[1]
 430  
 431      items = [
 432          {
 433              "id": u.id,
 434              "username": u.username,
 435              "email": u.email,
 436              "role": u.role,
 437              "is_active": u.is_active,
 438              "reseller_id": u.reseller_id,
 439              "reseller_name": reseller_names.get(u.reseller_id) if u.reseller_id else None,
 440              "created_at": u.created_at.isoformat(),
 441          }
 442          for u in users
 443      ]
 444  
 445      total_pages = math.ceil(total / per_page) if total > 0 else 1
 446      return {
 447          "users": items,
 448          "pagination": {
 449              "page": page,
 450              "per_page": per_page,
 451              "total": total,
 452              "total_pages": total_pages,
 453          },
 454      }
 455  
 456  
 457  @router.post("/users", status_code=201)
 458  async def create_direct_user(
 459      body: CreateResellerUserRequest,
 460      role: str = Query(default="user"),
 461      db: AsyncSession = Depends(get_db),
 462      auth: AuthContext = Depends(_require_admin),
 463  ) -> dict:
 464      """Create a user without reseller association (admin-managed direct user)."""
 465      if role not in ("user", "admin"):
 466          raise HTTPException(
 467              status_code=status.HTTP_400_BAD_REQUEST,
 468              detail="role must be 'user' or 'admin'",
 469          )
 470  
 471      # Check username/email uniqueness
 472      existing = await db.execute(
 473          select(User).where(
 474              (User.username == body.username) | (User.email == body.email)
 475          )
 476      )
 477      if existing.scalar_one_or_none():
 478          raise HTTPException(
 479              status_code=status.HTTP_400_BAD_REQUEST,
 480              detail="Username or email already in use",
 481          )
 482  
 483      password_hash = bcrypt.hashpw(
 484          body.password.encode(), bcrypt.gensalt()
 485      ).decode()
 486  
 487      user = User(
 488          id=str(uuid.uuid4()),
 489          username=body.username,
 490          email=body.email,
 491          password_hash=password_hash,
 492          role=role,
 493          jwt_secret=uuid.uuid4().hex,
 494          linux_uid=None,
 495          is_active=True,
 496          reseller_id=None,
 497      )
 498      db.add(user)
 499      await db.commit()
 500      await db.refresh(user)
 501  
 502      logger.info("Admin created direct user '%s' (role=%s)", body.username, role)
 503      return {
 504          "id": user.id,
 505          "username": user.username,
 506          "email": user.email,
 507          "role": user.role,
 508          "is_active": user.is_active,
 509          "created_at": user.created_at.isoformat(),
 510      }
 511  
 512  
 513  @router.post("/users/{user_id}/suspend", response_model=SuspendUserResponse)
 514  async def admin_suspend_user(
 515      user_id: str,
 516      body: Optional[SuspendRequest] = None,
 517      db: AsyncSession = Depends(get_db),
 518      auth: AuthContext = Depends(_require_admin),
 519  ) -> SuspendUserResponse:
 520      """Suspend any user regardless of reseller."""
 521      result = await db.execute(select(User).where(User.id == user_id))
 522      user = result.scalar_one_or_none()
 523      if user is None:
 524          raise HTTPException(status_code=404, detail="User not found")
 525  
 526      user.is_active = False
 527      user.updated_at = datetime.now(timezone.utc)
 528  
 529      # Cancel active sessions
 530      sessions_result = await db.execute(
 531          select(Session).where(
 532              Session.user_id == user_id,
 533              Session.status.in_(("pending", "running", "queued")),
 534          )
 535      )
 536      cancelled = 0
 537      for session in sessions_result.scalars().all():
 538          session.status = "cancelled"
 539          cancelled += 1
 540  
 541      await db.commit()
 542      logger.info("Admin suspended user %s (%s sessions cancelled)", user_id, cancelled)
 543  
 544      return SuspendUserResponse(
 545          id=user_id,
 546          username=user.username,
 547          is_active=False,
 548          suspended_at=datetime.now(timezone.utc),
 549          active_sessions_cancelled=cancelled,
 550      )
 551  
 552  
 553  @router.post("/users/{user_id}/unsuspend", response_model=SuspendUserResponse)
 554  async def admin_unsuspend_user(
 555      user_id: str,
 556      db: AsyncSession = Depends(get_db),
 557      auth: AuthContext = Depends(_require_admin),
 558  ) -> SuspendUserResponse:
 559      """Restore any suspended user."""
 560      result = await db.execute(select(User).where(User.id == user_id))
 561      user = result.scalar_one_or_none()
 562      if user is None:
 563          raise HTTPException(status_code=404, detail="User not found")
 564  
 565      user.is_active = True
 566      user.updated_at = datetime.now(timezone.utc)
 567      await db.commit()
 568  
 569      logger.info("Admin unsuspended user %s", user_id)
 570      return SuspendUserResponse(
 571          id=user_id,
 572          username=user.username,
 573          is_active=True,
 574          suspended_at=None,
 575          active_sessions_cancelled=0,
 576      )
 577  
 578  
 579  @router.post("/users/{user_id}/change-password", response_model=PasswordChangedResponse)
 580  async def admin_change_password(
 581      user_id: str,
 582      body: ChangePasswordRequest,
 583      db: AsyncSession = Depends(get_db),
 584      auth: AuthContext = Depends(_require_admin),
 585  ) -> PasswordChangedResponse:
 586      """Change any user's password and revoke all existing tokens."""
 587      result = await db.execute(select(User).where(User.id == user_id))
 588      user = result.scalar_one_or_none()
 589      if user is None:
 590          raise HTTPException(status_code=404, detail="User not found")
 591  
 592      user.password_hash = bcrypt.hashpw(
 593          body.new_password.encode(), bcrypt.gensalt()
 594      ).decode()
 595      # Revoke all tokens by incrementing token_version
 596      user.token_version = (user.token_version or 0) + 1
 597      user.updated_at = datetime.now(timezone.utc)
 598      await db.commit()
 599  
 600      logger.info("Admin changed password for user %s", user_id)
 601      return PasswordChangedResponse(status="password_changed", tokens_revoked=True)
 602  
 603  
 604  @router.delete("/users/{user_id}", response_model=DeleteUserResponse)
 605  async def admin_delete_user(
 606      user_id: str,
 607      confirm: bool = Query(default=False),
 608      db: AsyncSession = Depends(get_db),
 609      auth: AuthContext = Depends(_require_admin),
 610  ) -> DeleteUserResponse:
 611      """Delete any user. Requires ?confirm=true."""
 612      if not confirm:
 613          raise HTTPException(
 614              status_code=status.HTTP_400_BAD_REQUEST,
 615              detail="Deletion requires ?confirm=true query parameter.",
 616          )
 617  
 618      result = await db.execute(select(User).where(User.id == user_id))
 619      user = result.scalar_one_or_none()
 620      if user is None:
 621          raise HTTPException(status_code=404, detail="User not found")
 622  
 623      username = user.username
 624  
 625      # Delete sessions first (bulk)
 626      del_result = await db.execute(
 627          delete(Session).where(Session.user_id == user_id)
 628      )
 629      sessions_deleted = del_result.rowcount
 630  
 631      await db.delete(user)
 632      await db.commit()
 633  
 634      logger.info("Admin deleted user '%s' (%s sessions removed)", username, sessions_deleted)
 635      return DeleteUserResponse(
 636          status="deleted",
 637          id=user_id,
 638          username=username,
 639          sessions_deleted=sessions_deleted,
 640          files_cleaned=False,  # Filesystem cleanup is out-of-band
 641      )
 642  
 643  
 644  # =============================================================================
 645  # User feature flags
 646  # =============================================================================
 647  
 648  # Features that admins can toggle per user
 649  _ADMIN_TOGGLEABLE_FEATURES = frozenset({"ssh_enabled"})
 650  
 651  
 652  def _parse_features_json(raw: str | None) -> dict:
 653      """Parse user.features_json safely, returning {} on any failure."""
 654      if not raw:
 655          return {}
 656      try:
 657          return json.loads(raw)
 658      except (json.JSONDecodeError, TypeError):
 659          return {}
 660  
 661  
 662  async def _resolve_effective_features(
 663      db: AsyncSession, user: User,
 664  ) -> dict:
 665      """Resolve effective features for a user through the 3-tier hierarchy."""
 666      from ...services.feature_flag_service import feature_flag_service
 667      await feature_flag_service.ensure_loaded(db)
 668  
 669      reseller = None
 670      if user.reseller_id:
 671          res_result = await db.execute(
 672              select(Reseller).where(Reseller.id == user.reseller_id)
 673          )
 674          reseller = res_result.scalar_one_or_none()
 675  
 676      return feature_flag_service.get_user_effective_features(user, reseller)
 677  
 678  
 679  def _features_response(
 680      effective: dict, user_overrides: dict,
 681  ) -> dict:
 682      """Build the standard features response dict."""
 683      return {
 684          "effective": effective,
 685          "user_overrides": user_overrides,
 686          "toggleable": list(_ADMIN_TOGGLEABLE_FEATURES),
 687      }
 688  
 689  
 690  @router.get("/users/{user_id}/features")
 691  async def get_user_features(
 692      user_id: str,
 693      db: AsyncSession = Depends(get_db),
 694      auth: AuthContext = Depends(_require_admin),
 695  ):
 696      """Get effective features for a user (resolved through 3-tier hierarchy)."""
 697      result = await db.execute(select(User).where(User.id == user_id))
 698      user = result.scalar_one_or_none()
 699      if not user:
 700          raise HTTPException(status_code=404, detail="User not found")
 701  
 702      effective = await _resolve_effective_features(db, user)
 703      return _features_response(effective, _parse_features_json(user.features_json))
 704  
 705  
 706  @router.put("/users/{user_id}/features")
 707  async def update_user_features(
 708      user_id: str,
 709      body: dict,
 710      db: AsyncSession = Depends(get_db),
 711      auth: AuthContext = Depends(_require_admin),
 712  ):
 713      """Toggle feature flags for a user. Only admin-toggleable features accepted."""
 714      invalid_keys = set(body.keys()) - _ADMIN_TOGGLEABLE_FEATURES
 715      if invalid_keys:
 716          raise HTTPException(
 717              status_code=400,
 718              detail=f"Cannot toggle features: {', '.join(invalid_keys)}. "
 719                     f"Allowed: {', '.join(_ADMIN_TOGGLEABLE_FEATURES)}",
 720          )
 721  
 722      result = await db.execute(select(User).where(User.id == user_id))
 723      user = result.scalar_one_or_none()
 724      if not user:
 725          raise HTTPException(status_code=404, detail="User not found")
 726  
 727      # Merge with existing overrides
 728      current = _parse_features_json(user.features_json)
 729      for key, value in body.items():
 730          if value is None:
 731              current.pop(key, None)  # Reset to inherit
 732          else:
 733              current[key] = value
 734  
 735      user.features_json = json.dumps(current) if current else None
 736      await db.commit()
 737  
 738      # Invalidate Redis cache for SSH feature
 739      if "ssh_enabled" in body:
 740          try:
 741              from ...services.ssh_service_manager import ssh_service_manager
 742              await ssh_service_manager.invalidate_ssh_cache(user_id)
 743          except Exception:
 744              pass  # Non-fatal
 745  
 746      effective = await _resolve_effective_features(db, user)
 747      logger.info(
 748          "Admin %s updated features for user %s: %s",
 749          auth.user_id, user_id, body,
 750      )
 751      return _features_response(effective, current)
 752  
 753  
 754  # =============================================================================
 755  # Platform configuration
 756  # =============================================================================
 757  
 758  @router.get("/config", response_model=PlatformConfigResponse)
 759  async def get_platform_config(
 760      db: AsyncSession = Depends(get_db),
 761      auth: AuthContext = Depends(_require_admin),
 762  ) -> PlatformConfigResponse:
 763      """Return platform-level default configuration.
 764  
 765      Exposes the effective settings (hardcoded + DB overrides) that
 766      resellers and users inherit from.
 767      """
 768      from ...services.feature_flag_service import feature_flag_service
 769  
 770      await feature_flag_service.ensure_loaded(db)
 771  
 772      return PlatformConfigResponse(
 773          default_features=feature_flag_service.get_platform_features(),
 774          default_quotas=feature_flag_service.get_platform_quotas(),
 775          default_spending_limits=feature_flag_service.get_platform_spending(),
 776      )
 777  
 778  
 779  @router.put("/config", response_model=PlatformConfigResponse)
 780  async def update_platform_config(
 781      body: UpdatePlatformConfigRequest,
 782      db: AsyncSession = Depends(get_db),
 783      auth: AuthContext = Depends(_require_admin),
 784  ) -> PlatformConfigResponse:
 785      """Update platform-level default configuration.
 786  
 787      Updates the DB-stored overrides. Null values in the body reset
 788      individual keys back to hardcoded defaults.
 789      """
 790      from ...services.feature_flag_service import feature_flag_service
 791  
 792      await feature_flag_service.ensure_loaded(db)
 793  
 794      if body.features is not None:
 795          await feature_flag_service.update_platform_defaults(
 796              db, "features", body.features, updated_by=auth.user_id,
 797          )
 798      if body.quotas is not None:
 799          await feature_flag_service.update_platform_defaults(
 800              db, "quotas", body.quotas, updated_by=auth.user_id,
 801          )
 802      if body.spending is not None:
 803          await feature_flag_service.update_platform_defaults(
 804              db, "spending", body.spending, updated_by=auth.user_id,
 805          )
 806  
 807      return PlatformConfigResponse(
 808          default_features=feature_flag_service.get_platform_features(),
 809          default_quotas=feature_flag_service.get_platform_quotas(),
 810          default_spending_limits=feature_flag_service.get_platform_spending(),
 811      )
 812  
 813  
 814  # =============================================================================
 815  # Platform stats & reporting
 816  # =============================================================================
 817  
 818  @router.get("/stats", response_model=PlatformStats)
 819  async def platform_stats(
 820      db: AsyncSession = Depends(get_db),
 821      auth: AuthContext = Depends(_require_admin),
 822  ) -> PlatformStats:
 823      """Return platform-wide dashboard statistics."""
 824      version = _read_version()
 825  
 826      # Reseller counts
 827      total_resellers_r = await db.execute(select(func.count(Reseller.id)))
 828      total_resellers = total_resellers_r.scalar_one() or 0
 829  
 830      active_resellers_r = await db.execute(
 831          select(func.count(Reseller.id)).where(Reseller.is_active == True)  # noqa: E712
 832      )
 833      active_resellers = active_resellers_r.scalar_one() or 0
 834      suspended_resellers = total_resellers - active_resellers
 835  
 836      # User counts
 837      total_users_r = await db.execute(select(func.count(User.id)))
 838      total_users = total_users_r.scalar_one() or 0
 839  
 840      active_users_r = await db.execute(
 841          select(func.count(User.id)).where(User.is_active == True)  # noqa: E712
 842      )
 843      active_users = active_users_r.scalar_one() or 0
 844  
 845      suspended_users = total_users - active_users
 846  
 847      # Users by role
 848      roles_r = await db.execute(
 849          select(User.role, func.count(User.id)).group_by(User.role)
 850      )
 851      by_role = {row[0]: row[1] for row in roles_r}
 852  
 853      # Session counts
 854      total_sessions_r = await db.execute(select(func.count(Session.id)))
 855      total_sessions = total_sessions_r.scalar_one() or 0
 856  
 857      today_start = datetime.now(timezone.utc).replace(
 858          hour=0, minute=0, second=0, microsecond=0
 859      )
 860      today_sessions_r = await db.execute(
 861          select(func.count(Session.id)).where(Session.created_at >= today_start)
 862      )
 863      sessions_today = today_sessions_r.scalar_one() or 0
 864  
 865      active_now_r = await db.execute(
 866          select(func.count(Session.id)).where(
 867              Session.status.in_(("running", "pending"))
 868          )
 869      )
 870      active_now = active_now_r.scalar_one() or 0
 871  
 872      queued_r = await db.execute(
 873          select(func.count(Session.id)).where(Session.status == "queued")
 874      )
 875      queued = queued_r.scalar_one() or 0
 876  
 877      # Usage this month
 878      month_start = datetime.now(timezone.utc).replace(
 879          day=1, hour=0, minute=0, second=0, microsecond=0
 880      )
 881      month_sessions_r = await db.execute(
 882          select(func.count(Session.id)).where(Session.created_at >= month_start)
 883      )
 884      month_sessions = month_sessions_r.scalar_one() or 0
 885  
 886      month_cost_r = await db.execute(
 887          select(func.sum(Session.cumulative_cost_usd)).where(
 888              Session.created_at >= month_start
 889          )
 890      )
 891      month_cost = float(month_cost_r.scalar_one() or 0.0)
 892  
 893      month_input_r = await db.execute(
 894          select(func.sum(Session.cumulative_input_tokens)).where(
 895              Session.created_at >= month_start
 896          )
 897      )
 898      month_input_tokens = int(month_input_r.scalar_one() or 0)
 899  
 900      month_output_r = await db.execute(
 901          select(func.sum(Session.cumulative_output_tokens)).where(
 902              Session.created_at >= month_start
 903          )
 904      )
 905      month_output_tokens = int(month_output_r.scalar_one() or 0)
 906  
 907      avg_cost = round(month_cost / month_sessions, 6) if month_sessions > 0 else 0.0
 908  
 909      # Top models this month
 910      models_r = await db.execute(
 911          select(Session.model, func.count(Session.id).label("cnt"))
 912          .where(Session.created_at >= month_start, Session.model.is_not(None))
 913          .group_by(Session.model)
 914          .order_by(func.count(Session.id).desc())
 915          .limit(5)
 916      )
 917      top_models = [{"model": row[0], "sessions": row[1]} for row in models_r]
 918  
 919      return PlatformStats(
 920          platform={
 921              "version": version,
 922              "uptime_seconds": 0,
 923          },
 924          resellers={
 925              "total": total_resellers,
 926              "active": active_resellers,
 927              "suspended": suspended_resellers,
 928          },
 929          users={
 930              "total": total_users,
 931              "active": active_users,
 932              "suspended": suspended_users,
 933              "by_role": by_role,
 934          },
 935          sessions={
 936              "total": total_sessions,
 937              "today": sessions_today,
 938              "active_now": active_now,
 939              "queued": queued,
 940          },
 941          usage_this_month={
 942              "total_sessions": month_sessions,
 943              "cost_usd": round(month_cost, 6),
 944              "input_tokens": month_input_tokens,
 945              "output_tokens": month_output_tokens,
 946              "avg_cost_usd": avg_cost,
 947              "top_models": top_models,
 948          },
 949          capacity={
 950              "global_max_concurrent": 4,
 951              "active": active_now,
 952              "redis_memory_mb": 0,
 953              "disk_usage_gb": 0,
 954          },
 955      )
 956  
 957  
 958  @router.get("/usage", response_model=UsageResponse)
 959  async def platform_usage(
 960      period: str = Query(default="month", description="'day', 'week', 'month', or 'custom'"),
 961      start: Optional[datetime] = Query(default=None),
 962      end: Optional[datetime] = Query(default=None),
 963      group_by: Optional[str] = Query(default=None, description="'reseller' or 'user'"),
 964      reseller_id: Optional[str] = Query(default=None),
 965      db: AsyncSession = Depends(get_db),
 966      auth: AuthContext = Depends(_require_admin),
 967  ) -> UsageResponse:
 968      """Aggregate usage report across all resellers."""
 969      now = datetime.now(timezone.utc)
 970  
 971      if period == "day":
 972          period_start = now - timedelta(days=1)
 973      elif period == "week":
 974          period_start = now - timedelta(weeks=1)
 975      elif period == "month":
 976          period_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
 977      elif period == "custom" and start and end:
 978          period_start = start
 979          now = end
 980      else:
 981          period_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
 982  
 983      query = select(Session).where(Session.created_at >= period_start, Session.created_at <= now)
 984      if reseller_id:
 985          # Filter to sessions belonging to users of this reseller
 986          user_ids_r = await db.execute(
 987              select(User.id).where(User.reseller_id == reseller_id)
 988          )
 989          user_ids = [row[0] for row in user_ids_r]
 990          if user_ids:
 991              query = query.where(Session.user_id.in_(user_ids))
 992          else:
 993              # Reseller has no users — return empty immediately
 994              return UsageResponse(
 995                  period=UsagePeriod(start=period_start, end=now),
 996                  totals=UsageTotals(),
 997              )
 998  
 999      result = await db.execute(query)
1000      sessions = list(result.scalars().all())
1001  
1002      total_sessions = len(sessions)
1003      total_cost = sum(s.cumulative_cost_usd or 0.0 for s in sessions)
1004      total_input = sum(s.cumulative_input_tokens or 0 for s in sessions)
1005      total_output = sum(s.cumulative_output_tokens or 0 for s in sessions)
1006      unique_users = len({s.user_id for s in sessions})
1007  
1008      return UsageResponse(
1009          period=UsagePeriod(start=period_start, end=now),
1010          totals=UsageTotals(
1011              sessions=total_sessions,
1012              input_tokens=total_input,
1013              output_tokens=total_output,
1014              cost_usd=round(total_cost, 6),
1015              active_users=unique_users,
1016          ),
1017      )
1018  
1019  
1020  @router.get("/audit", response_model=AuditLogResponse)
1021  async def audit_log(
1022      page: int = Query(default=1, ge=1),
1023      per_page: int = Query(default=50, ge=1, le=200),
1024      reseller_id: Optional[str] = Query(default=None),
1025      action: Optional[str] = Query(default=None),
1026      start: Optional[datetime] = Query(default=None),
1027      end: Optional[datetime] = Query(default=None),
1028      db: AsyncSession = Depends(get_db),
1029      auth: AuthContext = Depends(_require_admin),
1030  ) -> AuditLogResponse:
1031      """List API key audit log entries with optional filtering."""
1032      query = select(APIKeyAuditLog)
1033  
1034      if reseller_id:
1035          query = query.where(APIKeyAuditLog.reseller_id == reseller_id)
1036      if action:
1037          query = query.where(APIKeyAuditLog.action == action)
1038      if start:
1039          query = query.where(APIKeyAuditLog.timestamp >= start)
1040      if end:
1041          query = query.where(APIKeyAuditLog.timestamp <= end)
1042  
1043      count_result = await db.execute(
1044          select(func.count()).select_from(query.subquery())
1045      )
1046      total = count_result.scalar_one()
1047  
1048      offset = (page - 1) * per_page
1049      paged_result = await db.execute(
1050          query.order_by(APIKeyAuditLog.timestamp.desc()).offset(offset).limit(per_page)
1051      )
1052      entries_raw = list(paged_result.scalars().all())
1053  
1054      # Batch-load API key names and reseller names for display
1055      key_ids = {e.api_key_id for e in entries_raw if e.api_key_id}
1056      key_names: dict[str, str] = {}
1057      if key_ids:
1058          keys_r = await db.execute(
1059              select(APIKey.id, APIKey.name).where(APIKey.id.in_(key_ids))
1060          )
1061          for row in keys_r:
1062              key_names[row[0]] = row[1]
1063  
1064      res_ids = {e.reseller_id for e in entries_raw if e.reseller_id}
1065      res_names: dict[str, str] = {}
1066      if res_ids:
1067          res_r = await db.execute(
1068              select(Reseller.id, Reseller.name).where(Reseller.id.in_(res_ids))
1069          )
1070          for row in res_r:
1071              res_names[row[0]] = row[1]
1072  
1073      entries = [
1074          AuditLogEntry(
1075              id=e.id,
1076              timestamp=e.timestamp,
1077              api_key_name=key_names.get(e.api_key_id) if e.api_key_id else None,
1078              reseller_name=res_names.get(e.reseller_id) if e.reseller_id else None,
1079              action=e.action,
1080              target_user=e.target_user_id,
1081              ip_address=e.ip_address,
1082              status_code=e.status_code,
1083              error=e.error,
1084          )
1085          for e in entries_raw
1086      ]
1087  
1088      total_pages = math.ceil(total / per_page) if total > 0 else 1
1089      return AuditLogResponse(
1090          entries=entries,
1091          pagination=PaginationInfo(
1092              page=page,
1093              per_page=per_page,
1094              total=total,
1095              total_pages=total_pages,
1096          ),
1097      )
1098  
1099  
1100  # =============================================================================
1101  # Data retention
1102  # =============================================================================
1103  
1104  @router.get("/retention", response_model=RetentionConfigResponse)
1105  async def get_retention_config(
1106      db: AsyncSession = Depends(get_db),
1107      auth: AuthContext = Depends(_require_admin),
1108  ) -> RetentionConfigResponse:
1109      """Return current data retention configuration (days per table)."""
1110      from ...services.data_retention_service import data_retention_service
1111      config = await data_retention_service.get_retention_config(db)
1112      return RetentionConfigResponse(**config)
1113  
1114  
1115  @router.put("/retention", response_model=RetentionConfigResponse)
1116  async def update_retention_config(
1117      body: UpdateRetentionRequest,
1118      db: AsyncSession = Depends(get_db),
1119      auth: AuthContext = Depends(_require_admin),
1120  ) -> RetentionConfigResponse:
1121      """Update data retention periods. Values in days (minimum 1)."""
1122      from ...services.data_retention_service import data_retention_service
1123      updates = body.model_dump(exclude_none=True)
1124      if not updates:
1125          config = await data_retention_service.get_retention_config(db)
1126      else:
1127          config = await data_retention_service.update_retention_config(
1128              db, updates, updated_by=auth.user_id,
1129          )
1130      return RetentionConfigResponse(**config)
1131  
1132  
1133  @router.post("/retention/run", response_model=RetentionRunResponse)
1134  async def run_retention(
1135      db: AsyncSession = Depends(get_db),
1136      auth: AuthContext = Depends(_require_admin),
1137  ) -> RetentionRunResponse:
1138      """Manually trigger a data retention purge and return results."""
1139      from ...services.data_retention_service import data_retention_service
1140      results = await data_retention_service.run_all(db)
1141      total = results.pop("total_purged", 0)
1142      return RetentionRunResponse(total_purged=total, tables=results)