Files

227 lines
7.2 KiB
Python
Raw Permalink Normal View History

2025-12-14 02:38:35 +07:00
from typing import Annotated
2025-12-19 02:07:25 +07:00
from datetime import datetime
2025-12-14 02:38:35 +07:00
2025-12-18 17:15:21 +07:00
from fastapi import Depends, HTTPException, status, Header
2025-12-14 02:38:35 +07:00
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
2025-12-18 17:15:21 +07:00
from app.core.config import settings
2025-12-14 02:38:35 +07:00
from app.core.database import get_db
from app.core.security import decode_access_token
2025-12-19 02:07:25 +07:00
from app.models import User, Participant, Marathon, UserRole, ParticipantRole, AdminLog, AdminActionType
2025-12-14 02:38:35 +07:00
security = HTTPBearer()
async def get_current_user(
credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)],
db: Annotated[AsyncSession, Depends(get_db)],
) -> User:
token = credentials.credentials
payload = decode_access_token(token)
if payload is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token",
headers={"WWW-Authenticate": "Bearer"},
)
user_id = payload.get("sub")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token payload",
)
result = await db.execute(select(User).where(User.id == int(user_id)))
user = result.scalar_one_or_none()
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found",
)
2025-12-19 02:07:25 +07:00
# Check if user is banned
if user.is_banned:
# Auto-unban if ban expired
if user.banned_until and datetime.utcnow() > user.banned_until:
# Save ban info for logging before clearing
old_ban_reason = user.ban_reason
old_banned_until = user.banned_until.isoformat() if user.banned_until else None
user.is_banned = False
user.banned_at = None
user.banned_until = None
user.banned_by_id = None
user.ban_reason = None
# Log system auto-unban action
log = AdminLog(
admin_id=None, # System action, no admin
action=AdminActionType.USER_AUTO_UNBAN.value,
target_type="user",
target_id=user.id,
details={
"nickname": user.nickname,
"reason": old_ban_reason,
"banned_until": old_banned_until,
"system": True,
},
ip_address=None,
)
db.add(log)
await db.commit()
await db.refresh(user)
else:
# Still banned - return ban info in error
ban_info = {
"banned_at": user.banned_at.isoformat() if user.banned_at else None,
"banned_until": user.banned_until.isoformat() if user.banned_until else None,
"reason": user.ban_reason,
}
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=ban_info,
)
2025-12-14 02:38:35 +07:00
return user
2025-12-14 20:21:56 +07:00
def require_admin(user: User) -> User:
"""Check if user is admin"""
if not user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin access required",
)
return user
2025-12-19 02:07:25 +07:00
def require_admin_with_2fa(user: User) -> User:
"""Check if user is admin with Telegram linked (2FA enabled)"""
if not user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin access required",
)
if not user.telegram_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Для доступа к админ-панели необходимо привязать Telegram в профиле",
)
return user
2025-12-14 20:21:56 +07:00
async def get_participant(
db: AsyncSession,
user_id: int,
marathon_id: int,
) -> Participant | None:
"""Get participant record for user in marathon"""
result = await db.execute(
select(Participant).where(
Participant.user_id == user_id,
Participant.marathon_id == marathon_id,
)
)
return result.scalar_one_or_none()
async def require_participant(
db: AsyncSession,
user_id: int,
marathon_id: int,
) -> Participant:
"""Require user to be participant of marathon"""
participant = await get_participant(db, user_id, marathon_id)
if not participant:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You are not a participant of this marathon",
)
return participant
async def require_organizer(
db: AsyncSession,
user: User,
marathon_id: int,
) -> Participant:
"""Require user to be organizer of marathon (or admin)"""
if user.is_admin:
# Admins can act as organizers
participant = await get_participant(db, user.id, marathon_id)
if participant:
return participant
# Create virtual participant for admin
result = await db.execute(select(Marathon).where(Marathon.id == marathon_id))
marathon = result.scalar_one_or_none()
if not marathon:
raise HTTPException(status_code=404, detail="Marathon not found")
# Return a temporary object for admin
return Participant(
user_id=user.id,
marathon_id=marathon_id,
role=ParticipantRole.ORGANIZER.value
)
participant = await get_participant(db, user.id, marathon_id)
if not participant:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You are not a participant of this marathon",
)
if not participant.is_organizer:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only organizers can perform this action",
)
return participant
async def require_creator(
db: AsyncSession,
user: User,
marathon_id: int,
) -> Marathon:
"""Require user to be creator of marathon (or admin)"""
result = await db.execute(select(Marathon).where(Marathon.id == marathon_id))
marathon = result.scalar_one_or_none()
if not marathon:
raise HTTPException(status_code=404, detail="Marathon not found")
if not user.is_admin and marathon.creator_id != user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only the creator can perform this action",
)
return marathon
2025-12-14 02:38:35 +07:00
# Type aliases for cleaner dependency injection
CurrentUser = Annotated[User, Depends(get_current_user)]
DbSession = Annotated[AsyncSession, Depends(get_db)]
2025-12-18 17:15:21 +07:00
async def verify_bot_secret(
x_bot_secret: str | None = Header(None, alias="X-Bot-Secret")
) -> None:
"""Verify that request comes from trusted bot using secret key."""
if not settings.BOT_API_SECRET:
# If secret is not configured, skip check (for development)
return
if x_bot_secret != settings.BOT_API_SECRET:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid or missing bot secret"
)
BotSecretDep = Annotated[None, Depends(verify_bot_secret)]