auth.py
1 """Authentication API - wallet-based auth.""" 2 3 import secrets 4 import time 5 from datetime import datetime, timedelta 6 from typing import Optional 7 8 from fastapi import APIRouter, HTTPException, Depends 9 from pydantic import BaseModel 10 from jose import jwt 11 12 from config import get_settings 13 14 router = APIRouter() 15 settings = get_settings() 16 17 # In-memory challenge store (use Redis in production) 18 _challenges: dict[str, tuple[str, float]] = {} 19 20 21 class ChallengeRequest(BaseModel): 22 """Request for auth challenge.""" 23 24 address: str 25 26 27 class ChallengeResponse(BaseModel): 28 """Auth challenge response.""" 29 30 challenge: str 31 expires_at: int 32 33 34 class VerifyRequest(BaseModel): 35 """Signature verification request.""" 36 37 address: str 38 challenge: str 39 signature: str 40 41 42 class TokenResponse(BaseModel): 43 """JWT token response.""" 44 45 access_token: str 46 token_type: str = "bearer" 47 expires_in: int 48 49 50 class UserInfo(BaseModel): 51 """Current user info.""" 52 53 address: str 54 chain: str # alpha or delta based on prefix 55 56 57 @router.post("/challenge", response_model=ChallengeResponse) 58 async def get_challenge(request: ChallengeRequest): 59 """Generate authentication challenge for wallet signing.""" 60 # Generate random challenge 61 challenge = secrets.token_hex(32) 62 expires_at = int(time.time()) + 300 # 5 minutes 63 64 # Store challenge 65 _challenges[request.address] = (challenge, expires_at) 66 67 return ChallengeResponse(challenge=challenge, expires_at=expires_at) 68 69 70 @router.post("/verify", response_model=TokenResponse) 71 async def verify_signature(request: VerifyRequest): 72 """Verify wallet signature and issue JWT.""" 73 # Check challenge exists and not expired 74 if request.address not in _challenges: 75 raise HTTPException(status_code=400, detail="No challenge found for address") 76 77 challenge, expires_at = _challenges[request.address] 78 if time.time() > expires_at: 79 del _challenges[request.address] 80 raise HTTPException(status_code=400, detail="Challenge expired") 81 82 if challenge != request.challenge: 83 raise HTTPException(status_code=400, detail="Challenge mismatch") 84 85 # TODO: Verify signature using wallet-core or SDK 86 # For now, accept any signature (development mode) 87 # In production: verify signature matches challenge signed by address 88 89 # Clean up challenge 90 del _challenges[request.address] 91 92 # Determine chain from address prefix 93 chain = "alpha" if request.address.startswith("ax1") else "delta" 94 95 # Generate JWT 96 expire = datetime.utcnow() + timedelta(minutes=settings.jwt_expire_minutes) 97 token_data = { 98 "sub": request.address, 99 "chain": chain, 100 "exp": expire, 101 } 102 103 access_token = jwt.encode( 104 token_data, settings.jwt_secret, algorithm=settings.jwt_algorithm 105 ) 106 107 return TokenResponse( 108 access_token=access_token, 109 expires_in=settings.jwt_expire_minutes * 60, 110 ) 111 112 113 def get_current_user(token: str) -> UserInfo: 114 """Decode JWT and return current user info.""" 115 try: 116 payload = jwt.decode( 117 token, settings.jwt_secret, algorithms=[settings.jwt_algorithm] 118 ) 119 address = payload.get("sub") 120 chain = payload.get("chain", "alpha") 121 if address is None: 122 raise HTTPException(status_code=401, detail="Invalid token") 123 return UserInfo(address=address, chain=chain) 124 except jwt.JWTError: 125 raise HTTPException(status_code=401, detail="Invalid token") 126 127 128 @router.get("/me", response_model=UserInfo) 129 async def get_me(authorization: Optional[str] = None): 130 """Get current authenticated user info.""" 131 if not authorization or not authorization.startswith("Bearer "): 132 raise HTTPException(status_code=401, detail="Not authenticated") 133 134 token = authorization.replace("Bearer ", "") 135 return get_current_user(token)