from datetime import datetime, timedelta import secrets from fastapi import APIRouter, HTTPException, status, Request from sqlalchemy import select from sqlalchemy.orm import selectinload from app.api.deps import DbSession, CurrentUser from app.core.security import verify_password, get_password_hash, create_access_token from app.core.rate_limit import limiter from app.models import User, UserRole, Admin2FASession from app.schemas import UserRegister, UserLogin, TokenResponse, UserPrivate, LoginResponse from app.services.telegram_notifier import telegram_notifier router = APIRouter(prefix="/auth", tags=["auth"]) @router.post("/register", response_model=TokenResponse) @limiter.limit("5/minute") async def register(request: Request, data: UserRegister, db: DbSession): # Check if login already exists result = await db.execute(select(User).where(User.login == data.login.lower())) if result.scalar_one_or_none(): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Login already registered", ) # Create user user = User( login=data.login.lower(), password_hash=get_password_hash(data.password), nickname=data.nickname, ) db.add(user) await db.commit() await db.refresh(user) # Generate token access_token = create_access_token(subject=user.id) return TokenResponse( access_token=access_token, user=UserPrivate.model_validate(user), ) @router.post("/login", response_model=LoginResponse) @limiter.limit("10/minute") async def login(request: Request, data: UserLogin, db: DbSession): # Find user result = await db.execute( select(User) .where(User.login == data.login.lower()) .options( selectinload(User.equipped_frame), selectinload(User.equipped_title), selectinload(User.equipped_name_color), selectinload(User.equipped_background), ) ) user = result.scalar_one_or_none() if not user or not verify_password(data.password, user.password_hash): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect login or password", ) # Check if user is banned if user.is_banned: # Return full ban info like in deps.py ban_info = { "banned_at": user.banned_at.isoformat() if user.banned_at else None, "banned_until": user.banned_until.isoformat() if user.banned_until else None, "reason": user.ban_reason, } raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=ban_info, ) # If admin with Telegram linked, require 2FA if user.role == UserRole.ADMIN.value and user.telegram_id: # Generate 6-digit code code = "".join([str(secrets.randbelow(10)) for _ in range(6)]) # Create 2FA session (expires in 5 minutes) session = Admin2FASession( user_id=user.id, code=code, expires_at=datetime.utcnow() + timedelta(minutes=5), ) db.add(session) await db.commit() await db.refresh(session) # Send code to Telegram message = f"πŸ” Код подтвСрТдСния для Π²Ρ…ΠΎΠ΄Π° Π² Π°Π΄ΠΌΠΈΠ½ΠΊΡƒ\n\nΠ’Π°Ρˆ ΠΊΠΎΠ΄: {code}\n\nКод дСйствитСлСн 5 ΠΌΠΈΠ½ΡƒΡ‚." sent = await telegram_notifier.send_message(user.telegram_id, message) if sent: session.telegram_sent = True await db.commit() return LoginResponse( requires_2fa=True, two_factor_session_id=session.id, ) # Regular user or admin without Telegram - generate token immediately # Admin without Telegram can login but admin panel will check for Telegram access_token = create_access_token(subject=user.id) return LoginResponse( access_token=access_token, user=UserPrivate.model_validate(user), ) @router.post("/2fa/verify", response_model=TokenResponse) @limiter.limit("5/minute") async def verify_2fa(request: Request, session_id: int, code: str, db: DbSession): """Verify 2FA code and return JWT token.""" # Find session result = await db.execute( select(Admin2FASession).where(Admin2FASession.id == session_id) ) session = result.scalar_one_or_none() if not session: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid session", ) if session.is_verified: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Session already verified", ) if datetime.utcnow() > session.expires_at: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Code expired", ) if session.code != code: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid code", ) # Mark as verified session.is_verified = True await db.commit() # Get user result = await db.execute( select(User) .where(User.id == session.user_id) .options( selectinload(User.equipped_frame), selectinload(User.equipped_title), selectinload(User.equipped_name_color), selectinload(User.equipped_background), ) ) user = result.scalar_one_or_none() if not user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="User not found", ) # Generate token access_token = create_access_token(subject=user.id) return TokenResponse( access_token=access_token, user=UserPrivate.model_validate(user), ) @router.get("/me", response_model=UserPrivate) async def get_me(current_user: CurrentUser): """Get current user's full profile (including private data)""" return UserPrivate.model_validate(current_user)