Files
game-marathon/backend/app/api/v1/auth.py
2025-12-19 02:07:25 +07:00

166 lines
5.2 KiB
Python

from datetime import datetime, timedelta
import secrets
from fastapi import APIRouter, HTTPException, status, Request
from sqlalchemy import select
from app.api.deps import DbSession, CurrentUser
from app.core.security import verify_password, get_password_hash, create_access_token
from app.core.rate_limit import limiter
from app.models import User, UserRole, Admin2FASession
from app.schemas import UserRegister, UserLogin, TokenResponse, UserPrivate, LoginResponse
from app.services.telegram_notifier import telegram_notifier
router = APIRouter(prefix="/auth", tags=["auth"])
@router.post("/register", response_model=TokenResponse)
@limiter.limit("5/minute")
async def register(request: Request, data: UserRegister, db: DbSession):
# Check if login already exists
result = await db.execute(select(User).where(User.login == data.login.lower()))
if result.scalar_one_or_none():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Login already registered",
)
# Create user
user = User(
login=data.login.lower(),
password_hash=get_password_hash(data.password),
nickname=data.nickname,
)
db.add(user)
await db.commit()
await db.refresh(user)
# Generate token
access_token = create_access_token(subject=user.id)
return TokenResponse(
access_token=access_token,
user=UserPrivate.model_validate(user),
)
@router.post("/login", response_model=LoginResponse)
@limiter.limit("10/minute")
async def login(request: Request, data: UserLogin, db: DbSession):
# Find user
result = await db.execute(select(User).where(User.login == data.login.lower()))
user = result.scalar_one_or_none()
if not user or not verify_password(data.password, user.password_hash):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect login or password",
)
# Check if user is banned
if user.is_banned:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Your account has been banned",
)
# If admin with Telegram linked, require 2FA
if user.role == UserRole.ADMIN.value and user.telegram_id:
# Generate 6-digit code
code = "".join([str(secrets.randbelow(10)) for _ in range(6)])
# Create 2FA session (expires in 5 minutes)
session = Admin2FASession(
user_id=user.id,
code=code,
expires_at=datetime.utcnow() + timedelta(minutes=5),
)
db.add(session)
await db.commit()
await db.refresh(session)
# Send code to Telegram
message = f"🔐 <b>Код подтверждения для входа в админку</b>\n\nВаш код: <code>{code}</code>\n\nКод действителен 5 минут."
sent = await telegram_notifier.send_message(user.telegram_id, message)
if sent:
session.telegram_sent = True
await db.commit()
return LoginResponse(
requires_2fa=True,
two_factor_session_id=session.id,
)
# Regular user or admin without Telegram - generate token immediately
# Admin without Telegram can login but admin panel will check for Telegram
access_token = create_access_token(subject=user.id)
return LoginResponse(
access_token=access_token,
user=UserPrivate.model_validate(user),
)
@router.post("/2fa/verify", response_model=TokenResponse)
@limiter.limit("5/minute")
async def verify_2fa(request: Request, session_id: int, code: str, db: DbSession):
"""Verify 2FA code and return JWT token."""
# Find session
result = await db.execute(
select(Admin2FASession).where(Admin2FASession.id == session_id)
)
session = result.scalar_one_or_none()
if not session:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid session",
)
if session.is_verified:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Session already verified",
)
if datetime.utcnow() > session.expires_at:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Code expired",
)
if session.code != code:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid code",
)
# Mark as verified
session.is_verified = True
await db.commit()
# Get user
result = await db.execute(select(User).where(User.id == session.user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="User not found",
)
# Generate token
access_token = create_access_token(subject=user.id)
return TokenResponse(
access_token=access_token,
user=UserPrivate.model_validate(user),
)
@router.get("/me", response_model=UserPrivate)
async def get_me(current_user: CurrentUser):
"""Get current user's full profile (including private data)"""
return UserPrivate.model_validate(current_user)