/ backend / api / auth.py
auth.py
  1  """Authentication API - wallet-based auth."""
  2  
  3  import secrets
  4  import time
  5  from datetime import datetime, timedelta
  6  from typing import Optional
  7  
  8  from fastapi import APIRouter, HTTPException, Depends
  9  from pydantic import BaseModel
 10  from jose import jwt
 11  
 12  from config import get_settings
 13  
 14  router = APIRouter()
 15  settings = get_settings()
 16  
 17  # In-memory challenge store (use Redis in production)
 18  _challenges: dict[str, tuple[str, float]] = {}
 19  
 20  
 21  class ChallengeRequest(BaseModel):
 22      """Request for auth challenge."""
 23  
 24      address: str
 25  
 26  
 27  class ChallengeResponse(BaseModel):
 28      """Auth challenge response."""
 29  
 30      challenge: str
 31      expires_at: int
 32  
 33  
 34  class VerifyRequest(BaseModel):
 35      """Signature verification request."""
 36  
 37      address: str
 38      challenge: str
 39      signature: str
 40  
 41  
 42  class TokenResponse(BaseModel):
 43      """JWT token response."""
 44  
 45      access_token: str
 46      token_type: str = "bearer"
 47      expires_in: int
 48  
 49  
 50  class UserInfo(BaseModel):
 51      """Current user info."""
 52  
 53      address: str
 54      chain: str  # alpha or delta based on prefix
 55  
 56  
 57  @router.post("/challenge", response_model=ChallengeResponse)
 58  async def get_challenge(request: ChallengeRequest):
 59      """Generate authentication challenge for wallet signing."""
 60      # Generate random challenge
 61      challenge = secrets.token_hex(32)
 62      expires_at = int(time.time()) + 300  # 5 minutes
 63  
 64      # Store challenge
 65      _challenges[request.address] = (challenge, expires_at)
 66  
 67      return ChallengeResponse(challenge=challenge, expires_at=expires_at)
 68  
 69  
 70  @router.post("/verify", response_model=TokenResponse)
 71  async def verify_signature(request: VerifyRequest):
 72      """Verify wallet signature and issue JWT."""
 73      # Check challenge exists and not expired
 74      if request.address not in _challenges:
 75          raise HTTPException(status_code=400, detail="No challenge found for address")
 76  
 77      challenge, expires_at = _challenges[request.address]
 78      if time.time() > expires_at:
 79          del _challenges[request.address]
 80          raise HTTPException(status_code=400, detail="Challenge expired")
 81  
 82      if challenge != request.challenge:
 83          raise HTTPException(status_code=400, detail="Challenge mismatch")
 84  
 85      # TODO: Verify signature using wallet-core or SDK
 86      # For now, accept any signature (development mode)
 87      # In production: verify signature matches challenge signed by address
 88  
 89      # Clean up challenge
 90      del _challenges[request.address]
 91  
 92      # Determine chain from address prefix
 93      chain = "alpha" if request.address.startswith("ax1") else "delta"
 94  
 95      # Generate JWT
 96      expire = datetime.utcnow() + timedelta(minutes=settings.jwt_expire_minutes)
 97      token_data = {
 98          "sub": request.address,
 99          "chain": chain,
100          "exp": expire,
101      }
102  
103      access_token = jwt.encode(
104          token_data, settings.jwt_secret, algorithm=settings.jwt_algorithm
105      )
106  
107      return TokenResponse(
108          access_token=access_token,
109          expires_in=settings.jwt_expire_minutes * 60,
110      )
111  
112  
113  def get_current_user(token: str) -> UserInfo:
114      """Decode JWT and return current user info."""
115      try:
116          payload = jwt.decode(
117              token, settings.jwt_secret, algorithms=[settings.jwt_algorithm]
118          )
119          address = payload.get("sub")
120          chain = payload.get("chain", "alpha")
121          if address is None:
122              raise HTTPException(status_code=401, detail="Invalid token")
123          return UserInfo(address=address, chain=chain)
124      except jwt.JWTError:
125          raise HTTPException(status_code=401, detail="Invalid token")
126  
127  
128  @router.get("/me", response_model=UserInfo)
129  async def get_me(authorization: Optional[str] = None):
130      """Get current authenticated user info."""
131      if not authorization or not authorization.startswith("Bearer "):
132          raise HTTPException(status_code=401, detail="Not authenticated")
133  
134      token = authorization.replace("Bearer ", "")
135      return get_current_user(token)