admin.py
1 """Admin API endpoints for platform management.""" 2 import json 3 import logging 4 import math 5 import uuid 6 from datetime import datetime, timedelta, timezone 7 from typing import Optional 8 9 import bcrypt 10 from fastapi import APIRouter, Depends, HTTPException, Query, status 11 from sqlalchemy import delete, func, select 12 from sqlalchemy.ext.asyncio import AsyncSession 13 14 from ...db.database import get_db 15 from ...db.models import ( 16 APIKey, APIKeyAuditLog, Reseller, 17 Session, User, 18 ) 19 from ...services.reseller_service import ResellerNotFoundError, reseller_service 20 from ..deps import AuthContext, get_auth_context 21 from ..reseller_models import ( 22 AuditLogEntry, AuditLogResponse, ChangePasswordRequest, 23 CreateResellerRequest, CreateResellerUserRequest, 24 DeleteResellerResponse, DeleteUserResponse, PaginationInfo, 25 PasswordChangedResponse, PlatformConfigResponse, PlatformStats, 26 ResellerLimits, ResellerListResponse, ResellerResponse, 27 ResellerSpending, ResellerStats, RetentionConfigResponse, 28 RetentionRunResponse, SpendingCurrent, SpendingLimits, 29 SuspendResellerResponse, SuspendRequest, SuspendUserResponse, 30 UnsuspendResellerResponse, UpdatePlatformConfigRequest, 31 UpdateResellerRequest, UpdateRetentionRequest, 32 UsagePeriod, UsageResponse, UsageTotals, 33 ) 34 35 logger = logging.getLogger(__name__) 36 router = APIRouter(prefix="/admin", tags=["admin"]) 37 38 from src.api.routes._helpers import _read_version 39 40 41 # ============================================================================= 42 # Admin auth dependency 43 # ============================================================================= 44 45 async def _require_admin( 46 auth: AuthContext = Depends(get_auth_context), 47 ) -> AuthContext: 48 """Reject non-admin callers with 403.""" 49 if not auth.is_admin: 50 raise HTTPException( 51 status_code=status.HTTP_403_FORBIDDEN, 52 detail="Admin access required", 53 ) 54 return auth 55 56 57 # ============================================================================= 58 # Helpers 59 # ============================================================================= 60 61 62 def _build_reseller_response(reseller: Reseller) -> ResellerResponse: 63 """Convert a Reseller ORM object to a ResellerResponse.""" 64 quota = reseller.quota 65 current_users = quota.current_user_count if quota else 0 66 features = {} 67 if reseller.features_json: 68 try: 69 features = json.loads(reseller.features_json) 70 except (json.JSONDecodeError, TypeError): 71 pass 72 73 limits = ResellerLimits( 74 max_users=reseller.max_users, 75 current_users=current_users, 76 max_concurrent_tasks=reseller.max_concurrent_tasks, 77 max_daily_tasks=reseller.max_daily_tasks, 78 ) 79 80 spending = ResellerSpending( 81 limits=SpendingLimits( 82 monthly_usd=reseller.max_monthly_spending_usd, 83 daily_usd=reseller.max_daily_spending_usd, 84 ), 85 current=SpendingCurrent( 86 monthly_usd=quota.monthly_cost_usd if quota else 0.0, 87 daily_usd=quota.daily_cost_usd if quota else 0.0, 88 ), 89 alert_threshold_pct=reseller.spending_alert_threshold_pct, 90 ) 91 92 owner_username = reseller.owner.username if reseller.owner else None 93 94 return ResellerResponse( 95 id=reseller.id, 96 name=reseller.name, 97 company=reseller.company, 98 contact_email=reseller.contact_email, 99 owner_user_id=reseller.owner_user_id, 100 owner_username=owner_username, 101 is_active=reseller.is_active, 102 suspended_at=reseller.suspended_at, 103 limits=limits, 104 llm_provider=reseller.llm_provider, 105 features=features, 106 spending=spending, 107 notes=reseller.notes, 108 created_at=reseller.created_at, 109 updated_at=reseller.updated_at, 110 ) 111 112 113 # ============================================================================= 114 # Reseller management 115 # ============================================================================= 116 117 @router.post("/resellers", status_code=201, response_model=ResellerResponse) 118 async def create_reseller( 119 body: CreateResellerRequest, 120 db: AsyncSession = Depends(get_db), 121 auth: AuthContext = Depends(_require_admin), 122 ) -> ResellerResponse: 123 """Create a new reseller with an owner user account and quota record.""" 124 try: 125 reseller = await reseller_service.create_reseller( 126 db=db, 127 name=body.name, 128 company=body.company, 129 contact_email=body.contact_email, 130 password=body.password, 131 max_users=body.max_users, 132 max_concurrent_tasks=body.max_concurrent_tasks, 133 max_daily_tasks=body.max_daily_tasks, 134 llm_provider=body.llm_provider, 135 features=body.features, 136 notes=body.notes, 137 max_monthly_spending_usd=body.max_monthly_spending_usd, 138 max_daily_spending_usd=body.max_daily_spending_usd, 139 spending_alert_threshold_pct=body.spending_alert_threshold_pct, 140 ) 141 except Exception as exc: 142 logger.error("Failed to create reseller: %s", exc) 143 raise HTTPException( 144 status_code=status.HTTP_400_BAD_REQUEST, 145 detail=str(exc), 146 ) 147 148 # Reload with relationships for response building 149 reseller = await reseller_service.get_reseller(db, reseller.id) 150 return _build_reseller_response(reseller) 151 152 153 @router.get("/resellers", response_model=ResellerListResponse) 154 async def list_resellers( 155 page: int = Query(default=1, ge=1), 156 per_page: int = Query(default=50, ge=1, le=200), 157 status_filter: str = Query(default="all", alias="status"), 158 search: Optional[str] = Query(default=None), 159 db: AsyncSession = Depends(get_db), 160 auth: AuthContext = Depends(_require_admin), 161 ) -> ResellerListResponse: 162 """List resellers with pagination and optional filtering.""" 163 resellers, total = await reseller_service.list_resellers( 164 db=db, 165 page=page, 166 per_page=per_page, 167 status_filter=status_filter, 168 search=search, 169 ) 170 171 items = [_build_reseller_response(r) for r in resellers] 172 total_pages = math.ceil(total / per_page) if total > 0 else 1 173 174 return ResellerListResponse( 175 resellers=items, 176 pagination=PaginationInfo( 177 page=page, 178 per_page=per_page, 179 total=total, 180 total_pages=total_pages, 181 ), 182 ) 183 184 185 @router.get("/resellers/{reseller_id}", response_model=ResellerResponse) 186 async def get_reseller( 187 reseller_id: str, 188 db: AsyncSession = Depends(get_db), 189 auth: AuthContext = Depends(_require_admin), 190 ) -> ResellerResponse: 191 """Get full reseller details including stats and API keys.""" 192 reseller = await reseller_service.get_reseller(db, reseller_id) 193 if reseller is None: 194 raise HTTPException(status_code=404, detail="Reseller not found") 195 196 stats_data = await reseller_service.get_reseller_stats(db, reseller_id) 197 response = _build_reseller_response(reseller) 198 199 response.stats = ResellerStats( 200 user_count=stats_data.get("user_count", 0), 201 active_users_30d=0, # Not tracked separately 202 total_sessions=stats_data.get("total_sessions", 0), 203 total_cost_usd=stats_data.get("total_cost_usd", 0.0), 204 api_keys_active=sum(1 for k in reseller.api_keys if k.is_active), 205 sessions_this_month=0, 206 cost_this_month_usd=stats_data.get("quota", {}).get("monthly_cost_usd", 0.0) 207 if stats_data.get("quota") else 0.0, 208 ) 209 210 return response 211 212 213 @router.get("/resellers/{reseller_id}/config") 214 async def get_reseller_config( 215 reseller_id: str, 216 db: AsyncSession = Depends(get_db), 217 auth: AuthContext = Depends(_require_admin), 218 ) -> dict: 219 """Return full config tree: reseller defaults plus all per-user overrides.""" 220 reseller = await reseller_service.get_reseller(db, reseller_id) 221 if reseller is None: 222 raise HTTPException(status_code=404, detail="Reseller not found") 223 224 reseller_features = {} 225 if reseller.features_json: 226 try: 227 reseller_features = json.loads(reseller.features_json) 228 except (json.JSONDecodeError, TypeError): 229 pass 230 231 user_configs = [] 232 for user in reseller.users: 233 feature_overrides = {} 234 security_overrides = {} 235 if user.features_json: 236 try: 237 feature_overrides = json.loads(user.features_json) 238 except (json.JSONDecodeError, TypeError): 239 pass 240 if user.security_overrides_json: 241 try: 242 security_overrides = json.loads(user.security_overrides_json) 243 except (json.JSONDecodeError, TypeError): 244 pass 245 246 user_configs.append({ 247 "user_id": user.id, 248 "username": user.username, 249 "settings_mode": user.settings_mode, 250 "feature_overrides": feature_overrides, 251 "security_overrides": security_overrides, 252 "spending_limits": { 253 "monthly_usd": user.spending_limit_monthly_usd, 254 "daily_usd": user.spending_limit_daily_usd, 255 "per_session_usd": user.spending_limit_per_session_usd, 256 }, 257 }) 258 259 return { 260 "reseller_id": reseller_id, 261 "reseller_name": reseller.name, 262 "defaults": { 263 "max_users": reseller.max_users, 264 "max_concurrent_tasks": reseller.max_concurrent_tasks, 265 "max_daily_tasks": reseller.max_daily_tasks, 266 "llm_provider": reseller.llm_provider, 267 "features": reseller_features, 268 "spending_caps": { 269 "monthly_usd": reseller.max_monthly_spending_usd, 270 "daily_usd": reseller.max_daily_spending_usd, 271 "alert_threshold_pct": reseller.spending_alert_threshold_pct, 272 }, 273 }, 274 "users": user_configs, 275 } 276 277 278 @router.put("/resellers/{reseller_id}", response_model=ResellerResponse) 279 async def update_reseller( 280 reseller_id: str, 281 body: UpdateResellerRequest, 282 db: AsyncSession = Depends(get_db), 283 auth: AuthContext = Depends(_require_admin), 284 ) -> ResellerResponse: 285 """Update reseller fields.""" 286 update_kwargs = body.model_dump(exclude_none=True) 287 if "features" in update_kwargs: 288 update_kwargs["features_json"] = json.dumps(update_kwargs.pop("features")) 289 290 try: 291 reseller = await reseller_service.update_reseller(db, reseller_id, **update_kwargs) 292 except ResellerNotFoundError: 293 raise HTTPException(status_code=404, detail="Reseller not found") 294 295 return _build_reseller_response(reseller) 296 297 298 @router.post("/resellers/{reseller_id}/suspend", response_model=SuspendResellerResponse) 299 async def suspend_reseller( 300 reseller_id: str, 301 body: Optional[SuspendRequest] = None, 302 db: AsyncSession = Depends(get_db), 303 auth: AuthContext = Depends(_require_admin), 304 ) -> SuspendResellerResponse: 305 """Suspend a reseller and all their users, API keys, and active sessions.""" 306 try: 307 result = await reseller_service.suspend_reseller( 308 db, reseller_id, reason=body.reason if body else None 309 ) 310 except ResellerNotFoundError: 311 raise HTTPException(status_code=404, detail="Reseller not found") 312 313 reseller = await reseller_service.get_reseller(db, reseller_id) 314 return SuspendResellerResponse( 315 id=reseller_id, 316 name=reseller.name if reseller else "", 317 is_active=False, 318 suspended_at=reseller.suspended_at if reseller else datetime.now(timezone.utc), 319 users_suspended=result.get("users_suspended", 0), 320 sessions_cancelled=result.get("sessions_cancelled", 0), 321 api_keys_deactivated=result.get("keys_deactivated", 0), 322 ) 323 324 325 @router.post("/resellers/{reseller_id}/unsuspend", response_model=UnsuspendResellerResponse) 326 async def unsuspend_reseller( 327 reseller_id: str, 328 db: AsyncSession = Depends(get_db), 329 auth: AuthContext = Depends(_require_admin), 330 ) -> UnsuspendResellerResponse: 331 """Restore a suspended reseller.""" 332 try: 333 result = await reseller_service.unsuspend_reseller(db, reseller_id) 334 except ResellerNotFoundError: 335 raise HTTPException(status_code=404, detail="Reseller not found") 336 337 reseller = await reseller_service.get_reseller(db, reseller_id) 338 return UnsuspendResellerResponse( 339 id=reseller_id, 340 name=reseller.name if reseller else "", 341 is_active=True, 342 users_restored=result.get("users_restored", 0), 343 api_keys_reactivated=result.get("keys_reactivated", 0), 344 ) 345 346 347 @router.delete("/resellers/{reseller_id}", response_model=DeleteResellerResponse) 348 async def delete_reseller( 349 reseller_id: str, 350 confirm: bool = Query(default=False), 351 db: AsyncSession = Depends(get_db), 352 auth: AuthContext = Depends(_require_admin), 353 ) -> DeleteResellerResponse: 354 """Delete a reseller and all associated data. Requires ?confirm=true.""" 355 if not confirm: 356 raise HTTPException( 357 status_code=status.HTTP_400_BAD_REQUEST, 358 detail="Deletion requires ?confirm=true query parameter.", 359 ) 360 361 reseller = await reseller_service.get_reseller(db, reseller_id) 362 if reseller is None: 363 raise HTTPException(status_code=404, detail="Reseller not found") 364 reseller_name = reseller.name 365 366 try: 367 result = await reseller_service.delete_reseller(db, reseller_id) 368 except ResellerNotFoundError: 369 raise HTTPException(status_code=404, detail="Reseller not found") 370 371 return DeleteResellerResponse( 372 status="deleted", 373 name=reseller_name, 374 users_deleted=result.get("users_deleted", 0), 375 sessions_deleted=result.get("sessions_deleted", 0), 376 ) 377 378 379 # ============================================================================= 380 # User management (admin override — cross-reseller) 381 # ============================================================================= 382 383 @router.get("/users") 384 async def list_all_users( 385 page: int = Query(default=1, ge=1), 386 per_page: int = Query(default=50, ge=1, le=200), 387 reseller_id: Optional[str] = Query(default=None), 388 status_filter: str = Query(default="all", alias="status"), 389 role: Optional[str] = Query(default=None), 390 search: Optional[str] = Query(default=None), 391 db: AsyncSession = Depends(get_db), 392 auth: AuthContext = Depends(_require_admin), 393 ) -> dict: 394 """List all users across all resellers.""" 395 query = select(User) 396 397 if reseller_id is not None: 398 query = query.where(User.reseller_id == reseller_id) 399 if status_filter == "active": 400 query = query.where(User.is_active == True) # noqa: E712 401 elif status_filter == "suspended": 402 query = query.where(User.is_active == False) # noqa: E712 403 if role: 404 query = query.where(User.role == role) 405 if search: 406 query = query.where( 407 User.username.ilike(f"%{search}%") | User.email.ilike(f"%{search}%") 408 ) 409 410 count_result = await db.execute( 411 select(func.count()).select_from(query.subquery()) 412 ) 413 total = count_result.scalar_one() 414 415 offset = (page - 1) * per_page 416 paged_result = await db.execute( 417 query.order_by(User.created_at.desc()).offset(offset).limit(per_page) 418 ) 419 users = list(paged_result.scalars().all()) 420 421 # Batch-load reseller names 422 reseller_ids = {u.reseller_id for u in users if u.reseller_id} 423 reseller_names: dict[str, str] = {} 424 if reseller_ids: 425 res_result = await db.execute( 426 select(Reseller.id, Reseller.name).where(Reseller.id.in_(reseller_ids)) 427 ) 428 for row in res_result: 429 reseller_names[row[0]] = row[1] 430 431 items = [ 432 { 433 "id": u.id, 434 "username": u.username, 435 "email": u.email, 436 "role": u.role, 437 "is_active": u.is_active, 438 "reseller_id": u.reseller_id, 439 "reseller_name": reseller_names.get(u.reseller_id) if u.reseller_id else None, 440 "created_at": u.created_at.isoformat(), 441 } 442 for u in users 443 ] 444 445 total_pages = math.ceil(total / per_page) if total > 0 else 1 446 return { 447 "users": items, 448 "pagination": { 449 "page": page, 450 "per_page": per_page, 451 "total": total, 452 "total_pages": total_pages, 453 }, 454 } 455 456 457 @router.post("/users", status_code=201) 458 async def create_direct_user( 459 body: CreateResellerUserRequest, 460 role: str = Query(default="user"), 461 db: AsyncSession = Depends(get_db), 462 auth: AuthContext = Depends(_require_admin), 463 ) -> dict: 464 """Create a user without reseller association (admin-managed direct user).""" 465 if role not in ("user", "admin"): 466 raise HTTPException( 467 status_code=status.HTTP_400_BAD_REQUEST, 468 detail="role must be 'user' or 'admin'", 469 ) 470 471 # Check username/email uniqueness 472 existing = await db.execute( 473 select(User).where( 474 (User.username == body.username) | (User.email == body.email) 475 ) 476 ) 477 if existing.scalar_one_or_none(): 478 raise HTTPException( 479 status_code=status.HTTP_400_BAD_REQUEST, 480 detail="Username or email already in use", 481 ) 482 483 password_hash = bcrypt.hashpw( 484 body.password.encode(), bcrypt.gensalt() 485 ).decode() 486 487 user = User( 488 id=str(uuid.uuid4()), 489 username=body.username, 490 email=body.email, 491 password_hash=password_hash, 492 role=role, 493 jwt_secret=uuid.uuid4().hex, 494 linux_uid=None, 495 is_active=True, 496 reseller_id=None, 497 ) 498 db.add(user) 499 await db.commit() 500 await db.refresh(user) 501 502 logger.info("Admin created direct user '%s' (role=%s)", body.username, role) 503 return { 504 "id": user.id, 505 "username": user.username, 506 "email": user.email, 507 "role": user.role, 508 "is_active": user.is_active, 509 "created_at": user.created_at.isoformat(), 510 } 511 512 513 @router.post("/users/{user_id}/suspend", response_model=SuspendUserResponse) 514 async def admin_suspend_user( 515 user_id: str, 516 body: Optional[SuspendRequest] = None, 517 db: AsyncSession = Depends(get_db), 518 auth: AuthContext = Depends(_require_admin), 519 ) -> SuspendUserResponse: 520 """Suspend any user regardless of reseller.""" 521 result = await db.execute(select(User).where(User.id == user_id)) 522 user = result.scalar_one_or_none() 523 if user is None: 524 raise HTTPException(status_code=404, detail="User not found") 525 526 user.is_active = False 527 user.updated_at = datetime.now(timezone.utc) 528 529 # Cancel active sessions 530 sessions_result = await db.execute( 531 select(Session).where( 532 Session.user_id == user_id, 533 Session.status.in_(("pending", "running", "queued")), 534 ) 535 ) 536 cancelled = 0 537 for session in sessions_result.scalars().all(): 538 session.status = "cancelled" 539 cancelled += 1 540 541 await db.commit() 542 logger.info("Admin suspended user %s (%s sessions cancelled)", user_id, cancelled) 543 544 return SuspendUserResponse( 545 id=user_id, 546 username=user.username, 547 is_active=False, 548 suspended_at=datetime.now(timezone.utc), 549 active_sessions_cancelled=cancelled, 550 ) 551 552 553 @router.post("/users/{user_id}/unsuspend", response_model=SuspendUserResponse) 554 async def admin_unsuspend_user( 555 user_id: str, 556 db: AsyncSession = Depends(get_db), 557 auth: AuthContext = Depends(_require_admin), 558 ) -> SuspendUserResponse: 559 """Restore any suspended user.""" 560 result = await db.execute(select(User).where(User.id == user_id)) 561 user = result.scalar_one_or_none() 562 if user is None: 563 raise HTTPException(status_code=404, detail="User not found") 564 565 user.is_active = True 566 user.updated_at = datetime.now(timezone.utc) 567 await db.commit() 568 569 logger.info("Admin unsuspended user %s", user_id) 570 return SuspendUserResponse( 571 id=user_id, 572 username=user.username, 573 is_active=True, 574 suspended_at=None, 575 active_sessions_cancelled=0, 576 ) 577 578 579 @router.post("/users/{user_id}/change-password", response_model=PasswordChangedResponse) 580 async def admin_change_password( 581 user_id: str, 582 body: ChangePasswordRequest, 583 db: AsyncSession = Depends(get_db), 584 auth: AuthContext = Depends(_require_admin), 585 ) -> PasswordChangedResponse: 586 """Change any user's password and revoke all existing tokens.""" 587 result = await db.execute(select(User).where(User.id == user_id)) 588 user = result.scalar_one_or_none() 589 if user is None: 590 raise HTTPException(status_code=404, detail="User not found") 591 592 user.password_hash = bcrypt.hashpw( 593 body.new_password.encode(), bcrypt.gensalt() 594 ).decode() 595 # Revoke all tokens by incrementing token_version 596 user.token_version = (user.token_version or 0) + 1 597 user.updated_at = datetime.now(timezone.utc) 598 await db.commit() 599 600 logger.info("Admin changed password for user %s", user_id) 601 return PasswordChangedResponse(status="password_changed", tokens_revoked=True) 602 603 604 @router.delete("/users/{user_id}", response_model=DeleteUserResponse) 605 async def admin_delete_user( 606 user_id: str, 607 confirm: bool = Query(default=False), 608 db: AsyncSession = Depends(get_db), 609 auth: AuthContext = Depends(_require_admin), 610 ) -> DeleteUserResponse: 611 """Delete any user. Requires ?confirm=true.""" 612 if not confirm: 613 raise HTTPException( 614 status_code=status.HTTP_400_BAD_REQUEST, 615 detail="Deletion requires ?confirm=true query parameter.", 616 ) 617 618 result = await db.execute(select(User).where(User.id == user_id)) 619 user = result.scalar_one_or_none() 620 if user is None: 621 raise HTTPException(status_code=404, detail="User not found") 622 623 username = user.username 624 625 # Delete sessions first (bulk) 626 del_result = await db.execute( 627 delete(Session).where(Session.user_id == user_id) 628 ) 629 sessions_deleted = del_result.rowcount 630 631 await db.delete(user) 632 await db.commit() 633 634 logger.info("Admin deleted user '%s' (%s sessions removed)", username, sessions_deleted) 635 return DeleteUserResponse( 636 status="deleted", 637 id=user_id, 638 username=username, 639 sessions_deleted=sessions_deleted, 640 files_cleaned=False, # Filesystem cleanup is out-of-band 641 ) 642 643 644 # ============================================================================= 645 # User feature flags 646 # ============================================================================= 647 648 # Features that admins can toggle per user 649 _ADMIN_TOGGLEABLE_FEATURES = frozenset({"ssh_enabled"}) 650 651 652 def _parse_features_json(raw: str | None) -> dict: 653 """Parse user.features_json safely, returning {} on any failure.""" 654 if not raw: 655 return {} 656 try: 657 return json.loads(raw) 658 except (json.JSONDecodeError, TypeError): 659 return {} 660 661 662 async def _resolve_effective_features( 663 db: AsyncSession, user: User, 664 ) -> dict: 665 """Resolve effective features for a user through the 3-tier hierarchy.""" 666 from ...services.feature_flag_service import feature_flag_service 667 await feature_flag_service.ensure_loaded(db) 668 669 reseller = None 670 if user.reseller_id: 671 res_result = await db.execute( 672 select(Reseller).where(Reseller.id == user.reseller_id) 673 ) 674 reseller = res_result.scalar_one_or_none() 675 676 return feature_flag_service.get_user_effective_features(user, reseller) 677 678 679 def _features_response( 680 effective: dict, user_overrides: dict, 681 ) -> dict: 682 """Build the standard features response dict.""" 683 return { 684 "effective": effective, 685 "user_overrides": user_overrides, 686 "toggleable": list(_ADMIN_TOGGLEABLE_FEATURES), 687 } 688 689 690 @router.get("/users/{user_id}/features") 691 async def get_user_features( 692 user_id: str, 693 db: AsyncSession = Depends(get_db), 694 auth: AuthContext = Depends(_require_admin), 695 ): 696 """Get effective features for a user (resolved through 3-tier hierarchy).""" 697 result = await db.execute(select(User).where(User.id == user_id)) 698 user = result.scalar_one_or_none() 699 if not user: 700 raise HTTPException(status_code=404, detail="User not found") 701 702 effective = await _resolve_effective_features(db, user) 703 return _features_response(effective, _parse_features_json(user.features_json)) 704 705 706 @router.put("/users/{user_id}/features") 707 async def update_user_features( 708 user_id: str, 709 body: dict, 710 db: AsyncSession = Depends(get_db), 711 auth: AuthContext = Depends(_require_admin), 712 ): 713 """Toggle feature flags for a user. Only admin-toggleable features accepted.""" 714 invalid_keys = set(body.keys()) - _ADMIN_TOGGLEABLE_FEATURES 715 if invalid_keys: 716 raise HTTPException( 717 status_code=400, 718 detail=f"Cannot toggle features: {', '.join(invalid_keys)}. " 719 f"Allowed: {', '.join(_ADMIN_TOGGLEABLE_FEATURES)}", 720 ) 721 722 result = await db.execute(select(User).where(User.id == user_id)) 723 user = result.scalar_one_or_none() 724 if not user: 725 raise HTTPException(status_code=404, detail="User not found") 726 727 # Merge with existing overrides 728 current = _parse_features_json(user.features_json) 729 for key, value in body.items(): 730 if value is None: 731 current.pop(key, None) # Reset to inherit 732 else: 733 current[key] = value 734 735 user.features_json = json.dumps(current) if current else None 736 await db.commit() 737 738 # Invalidate Redis cache for SSH feature 739 if "ssh_enabled" in body: 740 try: 741 from ...services.ssh_service_manager import ssh_service_manager 742 await ssh_service_manager.invalidate_ssh_cache(user_id) 743 except Exception: 744 pass # Non-fatal 745 746 effective = await _resolve_effective_features(db, user) 747 logger.info( 748 "Admin %s updated features for user %s: %s", 749 auth.user_id, user_id, body, 750 ) 751 return _features_response(effective, current) 752 753 754 # ============================================================================= 755 # Platform configuration 756 # ============================================================================= 757 758 @router.get("/config", response_model=PlatformConfigResponse) 759 async def get_platform_config( 760 db: AsyncSession = Depends(get_db), 761 auth: AuthContext = Depends(_require_admin), 762 ) -> PlatformConfigResponse: 763 """Return platform-level default configuration. 764 765 Exposes the effective settings (hardcoded + DB overrides) that 766 resellers and users inherit from. 767 """ 768 from ...services.feature_flag_service import feature_flag_service 769 770 await feature_flag_service.ensure_loaded(db) 771 772 return PlatformConfigResponse( 773 default_features=feature_flag_service.get_platform_features(), 774 default_quotas=feature_flag_service.get_platform_quotas(), 775 default_spending_limits=feature_flag_service.get_platform_spending(), 776 ) 777 778 779 @router.put("/config", response_model=PlatformConfigResponse) 780 async def update_platform_config( 781 body: UpdatePlatformConfigRequest, 782 db: AsyncSession = Depends(get_db), 783 auth: AuthContext = Depends(_require_admin), 784 ) -> PlatformConfigResponse: 785 """Update platform-level default configuration. 786 787 Updates the DB-stored overrides. Null values in the body reset 788 individual keys back to hardcoded defaults. 789 """ 790 from ...services.feature_flag_service import feature_flag_service 791 792 await feature_flag_service.ensure_loaded(db) 793 794 if body.features is not None: 795 await feature_flag_service.update_platform_defaults( 796 db, "features", body.features, updated_by=auth.user_id, 797 ) 798 if body.quotas is not None: 799 await feature_flag_service.update_platform_defaults( 800 db, "quotas", body.quotas, updated_by=auth.user_id, 801 ) 802 if body.spending is not None: 803 await feature_flag_service.update_platform_defaults( 804 db, "spending", body.spending, updated_by=auth.user_id, 805 ) 806 807 return PlatformConfigResponse( 808 default_features=feature_flag_service.get_platform_features(), 809 default_quotas=feature_flag_service.get_platform_quotas(), 810 default_spending_limits=feature_flag_service.get_platform_spending(), 811 ) 812 813 814 # ============================================================================= 815 # Platform stats & reporting 816 # ============================================================================= 817 818 @router.get("/stats", response_model=PlatformStats) 819 async def platform_stats( 820 db: AsyncSession = Depends(get_db), 821 auth: AuthContext = Depends(_require_admin), 822 ) -> PlatformStats: 823 """Return platform-wide dashboard statistics.""" 824 version = _read_version() 825 826 # Reseller counts 827 total_resellers_r = await db.execute(select(func.count(Reseller.id))) 828 total_resellers = total_resellers_r.scalar_one() or 0 829 830 active_resellers_r = await db.execute( 831 select(func.count(Reseller.id)).where(Reseller.is_active == True) # noqa: E712 832 ) 833 active_resellers = active_resellers_r.scalar_one() or 0 834 suspended_resellers = total_resellers - active_resellers 835 836 # User counts 837 total_users_r = await db.execute(select(func.count(User.id))) 838 total_users = total_users_r.scalar_one() or 0 839 840 active_users_r = await db.execute( 841 select(func.count(User.id)).where(User.is_active == True) # noqa: E712 842 ) 843 active_users = active_users_r.scalar_one() or 0 844 845 suspended_users = total_users - active_users 846 847 # Users by role 848 roles_r = await db.execute( 849 select(User.role, func.count(User.id)).group_by(User.role) 850 ) 851 by_role = {row[0]: row[1] for row in roles_r} 852 853 # Session counts 854 total_sessions_r = await db.execute(select(func.count(Session.id))) 855 total_sessions = total_sessions_r.scalar_one() or 0 856 857 today_start = datetime.now(timezone.utc).replace( 858 hour=0, minute=0, second=0, microsecond=0 859 ) 860 today_sessions_r = await db.execute( 861 select(func.count(Session.id)).where(Session.created_at >= today_start) 862 ) 863 sessions_today = today_sessions_r.scalar_one() or 0 864 865 active_now_r = await db.execute( 866 select(func.count(Session.id)).where( 867 Session.status.in_(("running", "pending")) 868 ) 869 ) 870 active_now = active_now_r.scalar_one() or 0 871 872 queued_r = await db.execute( 873 select(func.count(Session.id)).where(Session.status == "queued") 874 ) 875 queued = queued_r.scalar_one() or 0 876 877 # Usage this month 878 month_start = datetime.now(timezone.utc).replace( 879 day=1, hour=0, minute=0, second=0, microsecond=0 880 ) 881 month_sessions_r = await db.execute( 882 select(func.count(Session.id)).where(Session.created_at >= month_start) 883 ) 884 month_sessions = month_sessions_r.scalar_one() or 0 885 886 month_cost_r = await db.execute( 887 select(func.sum(Session.cumulative_cost_usd)).where( 888 Session.created_at >= month_start 889 ) 890 ) 891 month_cost = float(month_cost_r.scalar_one() or 0.0) 892 893 month_input_r = await db.execute( 894 select(func.sum(Session.cumulative_input_tokens)).where( 895 Session.created_at >= month_start 896 ) 897 ) 898 month_input_tokens = int(month_input_r.scalar_one() or 0) 899 900 month_output_r = await db.execute( 901 select(func.sum(Session.cumulative_output_tokens)).where( 902 Session.created_at >= month_start 903 ) 904 ) 905 month_output_tokens = int(month_output_r.scalar_one() or 0) 906 907 avg_cost = round(month_cost / month_sessions, 6) if month_sessions > 0 else 0.0 908 909 # Top models this month 910 models_r = await db.execute( 911 select(Session.model, func.count(Session.id).label("cnt")) 912 .where(Session.created_at >= month_start, Session.model.is_not(None)) 913 .group_by(Session.model) 914 .order_by(func.count(Session.id).desc()) 915 .limit(5) 916 ) 917 top_models = [{"model": row[0], "sessions": row[1]} for row in models_r] 918 919 return PlatformStats( 920 platform={ 921 "version": version, 922 "uptime_seconds": 0, 923 }, 924 resellers={ 925 "total": total_resellers, 926 "active": active_resellers, 927 "suspended": suspended_resellers, 928 }, 929 users={ 930 "total": total_users, 931 "active": active_users, 932 "suspended": suspended_users, 933 "by_role": by_role, 934 }, 935 sessions={ 936 "total": total_sessions, 937 "today": sessions_today, 938 "active_now": active_now, 939 "queued": queued, 940 }, 941 usage_this_month={ 942 "total_sessions": month_sessions, 943 "cost_usd": round(month_cost, 6), 944 "input_tokens": month_input_tokens, 945 "output_tokens": month_output_tokens, 946 "avg_cost_usd": avg_cost, 947 "top_models": top_models, 948 }, 949 capacity={ 950 "global_max_concurrent": 4, 951 "active": active_now, 952 "redis_memory_mb": 0, 953 "disk_usage_gb": 0, 954 }, 955 ) 956 957 958 @router.get("/usage", response_model=UsageResponse) 959 async def platform_usage( 960 period: str = Query(default="month", description="'day', 'week', 'month', or 'custom'"), 961 start: Optional[datetime] = Query(default=None), 962 end: Optional[datetime] = Query(default=None), 963 group_by: Optional[str] = Query(default=None, description="'reseller' or 'user'"), 964 reseller_id: Optional[str] = Query(default=None), 965 db: AsyncSession = Depends(get_db), 966 auth: AuthContext = Depends(_require_admin), 967 ) -> UsageResponse: 968 """Aggregate usage report across all resellers.""" 969 now = datetime.now(timezone.utc) 970 971 if period == "day": 972 period_start = now - timedelta(days=1) 973 elif period == "week": 974 period_start = now - timedelta(weeks=1) 975 elif period == "month": 976 period_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) 977 elif period == "custom" and start and end: 978 period_start = start 979 now = end 980 else: 981 period_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) 982 983 query = select(Session).where(Session.created_at >= period_start, Session.created_at <= now) 984 if reseller_id: 985 # Filter to sessions belonging to users of this reseller 986 user_ids_r = await db.execute( 987 select(User.id).where(User.reseller_id == reseller_id) 988 ) 989 user_ids = [row[0] for row in user_ids_r] 990 if user_ids: 991 query = query.where(Session.user_id.in_(user_ids)) 992 else: 993 # Reseller has no users — return empty immediately 994 return UsageResponse( 995 period=UsagePeriod(start=period_start, end=now), 996 totals=UsageTotals(), 997 ) 998 999 result = await db.execute(query) 1000 sessions = list(result.scalars().all()) 1001 1002 total_sessions = len(sessions) 1003 total_cost = sum(s.cumulative_cost_usd or 0.0 for s in sessions) 1004 total_input = sum(s.cumulative_input_tokens or 0 for s in sessions) 1005 total_output = sum(s.cumulative_output_tokens or 0 for s in sessions) 1006 unique_users = len({s.user_id for s in sessions}) 1007 1008 return UsageResponse( 1009 period=UsagePeriod(start=period_start, end=now), 1010 totals=UsageTotals( 1011 sessions=total_sessions, 1012 input_tokens=total_input, 1013 output_tokens=total_output, 1014 cost_usd=round(total_cost, 6), 1015 active_users=unique_users, 1016 ), 1017 ) 1018 1019 1020 @router.get("/audit", response_model=AuditLogResponse) 1021 async def audit_log( 1022 page: int = Query(default=1, ge=1), 1023 per_page: int = Query(default=50, ge=1, le=200), 1024 reseller_id: Optional[str] = Query(default=None), 1025 action: Optional[str] = Query(default=None), 1026 start: Optional[datetime] = Query(default=None), 1027 end: Optional[datetime] = Query(default=None), 1028 db: AsyncSession = Depends(get_db), 1029 auth: AuthContext = Depends(_require_admin), 1030 ) -> AuditLogResponse: 1031 """List API key audit log entries with optional filtering.""" 1032 query = select(APIKeyAuditLog) 1033 1034 if reseller_id: 1035 query = query.where(APIKeyAuditLog.reseller_id == reseller_id) 1036 if action: 1037 query = query.where(APIKeyAuditLog.action == action) 1038 if start: 1039 query = query.where(APIKeyAuditLog.timestamp >= start) 1040 if end: 1041 query = query.where(APIKeyAuditLog.timestamp <= end) 1042 1043 count_result = await db.execute( 1044 select(func.count()).select_from(query.subquery()) 1045 ) 1046 total = count_result.scalar_one() 1047 1048 offset = (page - 1) * per_page 1049 paged_result = await db.execute( 1050 query.order_by(APIKeyAuditLog.timestamp.desc()).offset(offset).limit(per_page) 1051 ) 1052 entries_raw = list(paged_result.scalars().all()) 1053 1054 # Batch-load API key names and reseller names for display 1055 key_ids = {e.api_key_id for e in entries_raw if e.api_key_id} 1056 key_names: dict[str, str] = {} 1057 if key_ids: 1058 keys_r = await db.execute( 1059 select(APIKey.id, APIKey.name).where(APIKey.id.in_(key_ids)) 1060 ) 1061 for row in keys_r: 1062 key_names[row[0]] = row[1] 1063 1064 res_ids = {e.reseller_id for e in entries_raw if e.reseller_id} 1065 res_names: dict[str, str] = {} 1066 if res_ids: 1067 res_r = await db.execute( 1068 select(Reseller.id, Reseller.name).where(Reseller.id.in_(res_ids)) 1069 ) 1070 for row in res_r: 1071 res_names[row[0]] = row[1] 1072 1073 entries = [ 1074 AuditLogEntry( 1075 id=e.id, 1076 timestamp=e.timestamp, 1077 api_key_name=key_names.get(e.api_key_id) if e.api_key_id else None, 1078 reseller_name=res_names.get(e.reseller_id) if e.reseller_id else None, 1079 action=e.action, 1080 target_user=e.target_user_id, 1081 ip_address=e.ip_address, 1082 status_code=e.status_code, 1083 error=e.error, 1084 ) 1085 for e in entries_raw 1086 ] 1087 1088 total_pages = math.ceil(total / per_page) if total > 0 else 1 1089 return AuditLogResponse( 1090 entries=entries, 1091 pagination=PaginationInfo( 1092 page=page, 1093 per_page=per_page, 1094 total=total, 1095 total_pages=total_pages, 1096 ), 1097 ) 1098 1099 1100 # ============================================================================= 1101 # Data retention 1102 # ============================================================================= 1103 1104 @router.get("/retention", response_model=RetentionConfigResponse) 1105 async def get_retention_config( 1106 db: AsyncSession = Depends(get_db), 1107 auth: AuthContext = Depends(_require_admin), 1108 ) -> RetentionConfigResponse: 1109 """Return current data retention configuration (days per table).""" 1110 from ...services.data_retention_service import data_retention_service 1111 config = await data_retention_service.get_retention_config(db) 1112 return RetentionConfigResponse(**config) 1113 1114 1115 @router.put("/retention", response_model=RetentionConfigResponse) 1116 async def update_retention_config( 1117 body: UpdateRetentionRequest, 1118 db: AsyncSession = Depends(get_db), 1119 auth: AuthContext = Depends(_require_admin), 1120 ) -> RetentionConfigResponse: 1121 """Update data retention periods. Values in days (minimum 1).""" 1122 from ...services.data_retention_service import data_retention_service 1123 updates = body.model_dump(exclude_none=True) 1124 if not updates: 1125 config = await data_retention_service.get_retention_config(db) 1126 else: 1127 config = await data_retention_service.update_retention_config( 1128 db, updates, updated_by=auth.user_id, 1129 ) 1130 return RetentionConfigResponse(**config) 1131 1132 1133 @router.post("/retention/run", response_model=RetentionRunResponse) 1134 async def run_retention( 1135 db: AsyncSession = Depends(get_db), 1136 auth: AuthContext = Depends(_require_admin), 1137 ) -> RetentionRunResponse: 1138 """Manually trigger a data retention purge and return results.""" 1139 from ...services.data_retention_service import data_retention_service 1140 results = await data_retention_service.run_all(db) 1141 total = results.pop("total_purged", 0) 1142 return RetentionRunResponse(total_purged=total, tables=results)