from datetime import datetime from fastapi import APIRouter, HTTPException, Query, Request from sqlalchemy import select, func from sqlalchemy.orm import selectinload from pydantic import BaseModel, Field from app.api.deps import DbSession, CurrentUser, require_admin_with_2fa from app.models import User, UserRole, Marathon, MarathonStatus, Participant, Game, AdminLog, AdminActionType, StaticContent from app.schemas import ( UserPublic, MessageResponse, AdminUserResponse, BanUserRequest, AdminResetPasswordRequest, AdminLogResponse, AdminLogsListResponse, BroadcastRequest, BroadcastResponse, StaticContentResponse, StaticContentUpdate, StaticContentCreate, DashboardStats ) from app.core.security import get_password_hash from app.services.telegram_notifier import telegram_notifier from app.core.rate_limit import limiter router = APIRouter(prefix="/admin", tags=["admin"]) class SetUserRole(BaseModel): role: str = Field(..., pattern="^(user|admin)$") class AdminMarathonResponse(BaseModel): id: int title: str status: str creator: UserPublic participants_count: int games_count: int start_date: str | None end_date: str | None created_at: str class Config: from_attributes = True # ============ Helper Functions ============ async def log_admin_action( db, admin_id: int, action: str, target_type: str, target_id: int, details: dict | None = None, ip_address: str | None = None ): """Log an admin action.""" log = AdminLog( admin_id=admin_id, action=action, target_type=target_type, target_id=target_id, details=details, ip_address=ip_address, ) db.add(log) await db.commit() @router.get("/users", response_model=list[AdminUserResponse]) async def list_users( current_user: CurrentUser, db: DbSession, skip: int = Query(0, ge=0), limit: int = Query(50, ge=1, le=100), search: str | None = None, banned_only: bool = False, ): """List all users. Admin only.""" require_admin_with_2fa(current_user) query = select(User).order_by(User.created_at.desc()) if search: query = query.where( (User.login.ilike(f"%{search}%")) | (User.nickname.ilike(f"%{search}%")) ) if banned_only: query = query.where(User.is_banned == True) query = query.offset(skip).limit(limit) result = await db.execute(query) users = result.scalars().all() response = [] for user in users: # Count marathons user participates in marathons_count = await db.scalar( select(func.count()).select_from(Participant).where(Participant.user_id == user.id) ) response.append(AdminUserResponse( id=user.id, login=user.login, nickname=user.nickname, role=user.role, avatar_url=user.avatar_url, telegram_id=user.telegram_id, telegram_username=user.telegram_username, marathons_count=marathons_count, created_at=user.created_at.isoformat(), is_banned=user.is_banned, banned_at=user.banned_at.isoformat() if user.banned_at else None, banned_until=user.banned_until.isoformat() if user.banned_until else None, ban_reason=user.ban_reason, )) return response @router.get("/users/{user_id}", response_model=AdminUserResponse) async def get_user(user_id: int, current_user: CurrentUser, db: DbSession): """Get user details. Admin only.""" require_admin_with_2fa(current_user) result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=404, detail="User not found") marathons_count = await db.scalar( select(func.count()).select_from(Participant).where(Participant.user_id == user.id) ) return AdminUserResponse( id=user.id, login=user.login, nickname=user.nickname, role=user.role, avatar_url=user.avatar_url, telegram_id=user.telegram_id, telegram_username=user.telegram_username, marathons_count=marathons_count, created_at=user.created_at.isoformat(), is_banned=user.is_banned, banned_at=user.banned_at.isoformat() if user.banned_at else None, banned_until=user.banned_until.isoformat() if user.banned_until else None, ban_reason=user.ban_reason, ) @router.patch("/users/{user_id}/role", response_model=AdminUserResponse) async def set_user_role( user_id: int, data: SetUserRole, current_user: CurrentUser, db: DbSession, request: Request, ): """Set user's global role. Admin only.""" require_admin_with_2fa(current_user) # Cannot change own role if user_id == current_user.id: raise HTTPException(status_code=400, detail="Cannot change your own role") result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=404, detail="User not found") old_role = user.role user.role = data.role await db.commit() await db.refresh(user) # Log action await log_admin_action( db, current_user.id, AdminActionType.USER_ROLE_CHANGE.value, "user", user_id, {"old_role": old_role, "new_role": data.role, "nickname": user.nickname}, request.client.host if request.client else None ) marathons_count = await db.scalar( select(func.count()).select_from(Participant).where(Participant.user_id == user.id) ) return AdminUserResponse( id=user.id, login=user.login, nickname=user.nickname, role=user.role, avatar_url=user.avatar_url, telegram_id=user.telegram_id, telegram_username=user.telegram_username, marathons_count=marathons_count, created_at=user.created_at.isoformat(), is_banned=user.is_banned, banned_at=user.banned_at.isoformat() if user.banned_at else None, banned_until=user.banned_until.isoformat() if user.banned_until else None, ban_reason=user.ban_reason, ) @router.delete("/users/{user_id}", response_model=MessageResponse) async def delete_user(user_id: int, current_user: CurrentUser, db: DbSession): """Delete a user. Admin only.""" require_admin_with_2fa(current_user) # Cannot delete yourself if user_id == current_user.id: raise HTTPException(status_code=400, detail="Cannot delete yourself") result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=404, detail="User not found") # Cannot delete another admin if user.role == UserRole.ADMIN.value: raise HTTPException(status_code=400, detail="Cannot delete another admin") await db.delete(user) await db.commit() return MessageResponse(message="User deleted") @router.get("/marathons", response_model=list[AdminMarathonResponse]) async def list_marathons( current_user: CurrentUser, db: DbSession, skip: int = Query(0, ge=0), limit: int = Query(50, ge=1, le=100), search: str | None = None, ): """List all marathons. Admin only.""" require_admin_with_2fa(current_user) query = ( select(Marathon) .options(selectinload(Marathon.creator)) .order_by(Marathon.created_at.desc()) ) if search: query = query.where(Marathon.title.ilike(f"%{search}%")) query = query.offset(skip).limit(limit) result = await db.execute(query) marathons = result.scalars().all() response = [] for marathon in marathons: participants_count = await db.scalar( select(func.count()).select_from(Participant).where(Participant.marathon_id == marathon.id) ) games_count = await db.scalar( select(func.count()).select_from(Game).where(Game.marathon_id == marathon.id) ) response.append(AdminMarathonResponse( id=marathon.id, title=marathon.title, status=marathon.status, creator=UserPublic.model_validate(marathon.creator), participants_count=participants_count, games_count=games_count, start_date=marathon.start_date.isoformat() if marathon.start_date else None, end_date=marathon.end_date.isoformat() if marathon.end_date else None, created_at=marathon.created_at.isoformat(), )) return response @router.delete("/marathons/{marathon_id}", response_model=MessageResponse) async def delete_marathon(marathon_id: int, current_user: CurrentUser, db: DbSession, request: Request): """Delete a marathon. Admin only.""" require_admin_with_2fa(current_user) result = await db.execute(select(Marathon).where(Marathon.id == marathon_id)) marathon = result.scalar_one_or_none() if not marathon: raise HTTPException(status_code=404, detail="Marathon not found") marathon_title = marathon.title await db.delete(marathon) await db.commit() # Log action await log_admin_action( db, current_user.id, AdminActionType.MARATHON_DELETE.value, "marathon", marathon_id, {"title": marathon_title}, request.client.host if request.client else None ) return MessageResponse(message="Marathon deleted") @router.get("/stats") async def get_stats(current_user: CurrentUser, db: DbSession): """Get platform statistics. Admin only.""" require_admin_with_2fa(current_user) users_count = await db.scalar(select(func.count()).select_from(User)) marathons_count = await db.scalar(select(func.count()).select_from(Marathon)) games_count = await db.scalar(select(func.count()).select_from(Game)) participants_count = await db.scalar(select(func.count()).select_from(Participant)) return { "users_count": users_count, "marathons_count": marathons_count, "games_count": games_count, "total_participations": participants_count, } # ============ Ban/Unban Users ============ @router.post("/users/{user_id}/ban", response_model=AdminUserResponse) @limiter.limit("10/minute") async def ban_user( request: Request, user_id: int, data: BanUserRequest, current_user: CurrentUser, db: DbSession, ): """Ban a user. Admin only.""" require_admin_with_2fa(current_user) if user_id == current_user.id: raise HTTPException(status_code=400, detail="Cannot ban yourself") result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=404, detail="User not found") if user.role == UserRole.ADMIN.value: raise HTTPException(status_code=400, detail="Cannot ban another admin") if user.is_banned: raise HTTPException(status_code=400, detail="User is already banned") user.is_banned = True user.banned_at = datetime.utcnow() # Normalize to naive datetime (remove tzinfo) to match banned_at user.banned_until = data.banned_until.replace(tzinfo=None) if data.banned_until else None user.banned_by_id = current_user.id user.ban_reason = data.reason await db.commit() await db.refresh(user) # Log action await log_admin_action( db, current_user.id, AdminActionType.USER_BAN.value, "user", user_id, {"nickname": user.nickname, "reason": data.reason}, request.client.host if request.client else None ) marathons_count = await db.scalar( select(func.count()).select_from(Participant).where(Participant.user_id == user.id) ) return AdminUserResponse( id=user.id, login=user.login, nickname=user.nickname, role=user.role, avatar_url=user.avatar_url, telegram_id=user.telegram_id, telegram_username=user.telegram_username, marathons_count=marathons_count, created_at=user.created_at.isoformat(), is_banned=user.is_banned, banned_at=user.banned_at.isoformat() if user.banned_at else None, banned_until=user.banned_until.isoformat() if user.banned_until else None, ban_reason=user.ban_reason, ) @router.post("/users/{user_id}/unban", response_model=AdminUserResponse) async def unban_user( request: Request, user_id: int, current_user: CurrentUser, db: DbSession, ): """Unban a user. Admin only.""" require_admin_with_2fa(current_user) result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=404, detail="User not found") if not user.is_banned: raise HTTPException(status_code=400, detail="User is not banned") user.is_banned = False user.banned_at = None user.banned_until = None user.banned_by_id = None user.ban_reason = None await db.commit() await db.refresh(user) # Log action await log_admin_action( db, current_user.id, AdminActionType.USER_UNBAN.value, "user", user_id, {"nickname": user.nickname}, request.client.host if request.client else None ) marathons_count = await db.scalar( select(func.count()).select_from(Participant).where(Participant.user_id == user.id) ) return AdminUserResponse( id=user.id, login=user.login, nickname=user.nickname, role=user.role, avatar_url=user.avatar_url, telegram_id=user.telegram_id, telegram_username=user.telegram_username, marathons_count=marathons_count, created_at=user.created_at.isoformat(), is_banned=user.is_banned, banned_at=None, banned_until=None, ban_reason=None, ) # ============ Reset Password ============ @router.post("/users/{user_id}/reset-password", response_model=AdminUserResponse) async def reset_user_password( request: Request, user_id: int, data: AdminResetPasswordRequest, current_user: CurrentUser, db: DbSession, ): """Reset user password. Admin only.""" require_admin_with_2fa(current_user) result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=404, detail="User not found") # Hash and save new password user.password_hash = get_password_hash(data.new_password) await db.commit() await db.refresh(user) # Log action await log_admin_action( db, current_user.id, AdminActionType.USER_PASSWORD_RESET.value, "user", user_id, {"nickname": user.nickname}, request.client.host if request.client else None ) # Notify user via Telegram if linked if user.telegram_id: await telegram_notifier.send_message( user.telegram_id, "🔐 Ваш пароль был сброшен\n\n" "Администратор установил вам новый пароль. " "Если это были не вы, свяжитесь с поддержкой." ) marathons_count = await db.scalar( select(func.count()).select_from(Participant).where(Participant.user_id == user.id) ) return AdminUserResponse( id=user.id, login=user.login, nickname=user.nickname, role=user.role, avatar_url=user.avatar_url, telegram_id=user.telegram_id, telegram_username=user.telegram_username, marathons_count=marathons_count, created_at=user.created_at.isoformat(), is_banned=user.is_banned, banned_at=user.banned_at.isoformat() if user.banned_at else None, banned_until=user.banned_until.isoformat() if user.banned_until else None, ban_reason=user.ban_reason, ) # ============ Force Finish Marathon ============ @router.post("/marathons/{marathon_id}/force-finish", response_model=MessageResponse) async def force_finish_marathon( request: Request, marathon_id: int, current_user: CurrentUser, db: DbSession, ): """Force finish a marathon. Admin only.""" require_admin_with_2fa(current_user) result = await db.execute(select(Marathon).where(Marathon.id == marathon_id)) marathon = result.scalar_one_or_none() if not marathon: raise HTTPException(status_code=404, detail="Marathon not found") if marathon.status == MarathonStatus.FINISHED.value: raise HTTPException(status_code=400, detail="Marathon is already finished") old_status = marathon.status marathon.status = MarathonStatus.FINISHED.value marathon.end_date = datetime.utcnow() await db.commit() # Log action await log_admin_action( db, current_user.id, AdminActionType.MARATHON_FORCE_FINISH.value, "marathon", marathon_id, {"title": marathon.title, "old_status": old_status}, request.client.host if request.client else None ) # Notify participants await telegram_notifier.notify_marathon_finish(db, marathon_id, marathon.title) return MessageResponse(message="Marathon finished") # ============ Admin Logs ============ @router.get("/logs", response_model=AdminLogsListResponse) async def get_logs( current_user: CurrentUser, db: DbSession, skip: int = Query(0, ge=0), limit: int = Query(50, ge=1, le=100), action: str | None = None, admin_id: int | None = None, ): """Get admin action logs. Admin only.""" require_admin_with_2fa(current_user) query = ( select(AdminLog) .options(selectinload(AdminLog.admin)) .order_by(AdminLog.created_at.desc()) ) if action: query = query.where(AdminLog.action == action) if admin_id: query = query.where(AdminLog.admin_id == admin_id) # Get total count count_query = select(func.count()).select_from(AdminLog) if action: count_query = count_query.where(AdminLog.action == action) if admin_id: count_query = count_query.where(AdminLog.admin_id == admin_id) total = await db.scalar(count_query) query = query.offset(skip).limit(limit) result = await db.execute(query) logs = result.scalars().all() return AdminLogsListResponse( logs=[ AdminLogResponse( id=log.id, admin_id=log.admin_id, admin_nickname=log.admin.nickname if log.admin else None, action=log.action, target_type=log.target_type, target_id=log.target_id, details=log.details, ip_address=log.ip_address, created_at=log.created_at, ) for log in logs ], total=total or 0, ) # ============ Broadcast ============ @router.post("/broadcast/all", response_model=BroadcastResponse) @limiter.limit("1/minute") async def broadcast_to_all( request: Request, data: BroadcastRequest, current_user: CurrentUser, db: DbSession, ): """Send broadcast message to all users with Telegram linked. Admin only.""" require_admin_with_2fa(current_user) # Get all users with telegram_id result = await db.execute( select(User).where(User.telegram_id.isnot(None)) ) users = result.scalars().all() total_count = len(users) sent_count = 0 for user in users: if await telegram_notifier.send_message(user.telegram_id, data.message): sent_count += 1 # Log action await log_admin_action( db, current_user.id, AdminActionType.BROADCAST_ALL.value, "broadcast", 0, {"message": data.message[:100], "sent": sent_count, "total": total_count}, request.client.host if request.client else None ) return BroadcastResponse(sent_count=sent_count, total_count=total_count) @router.post("/broadcast/marathon/{marathon_id}", response_model=BroadcastResponse) @limiter.limit("3/minute") async def broadcast_to_marathon( request: Request, marathon_id: int, data: BroadcastRequest, current_user: CurrentUser, db: DbSession, ): """Send broadcast message to marathon participants. Admin only.""" require_admin_with_2fa(current_user) # Check marathon exists result = await db.execute(select(Marathon).where(Marathon.id == marathon_id)) marathon = result.scalar_one_or_none() if not marathon: raise HTTPException(status_code=404, detail="Marathon not found") # Get participants count total_result = await db.execute( select(User) .join(Participant, Participant.user_id == User.id) .where( Participant.marathon_id == marathon_id, User.telegram_id.isnot(None) ) ) users = total_result.scalars().all() total_count = len(users) sent_count = await telegram_notifier.notify_marathon_participants( db, marathon_id, data.message ) # Log action await log_admin_action( db, current_user.id, AdminActionType.BROADCAST_MARATHON.value, "marathon", marathon_id, {"title": marathon.title, "message": data.message[:100], "sent": sent_count, "total": total_count}, request.client.host if request.client else None ) return BroadcastResponse(sent_count=sent_count, total_count=total_count) # ============ Static Content ============ @router.get("/content", response_model=list[StaticContentResponse]) async def list_content(current_user: CurrentUser, db: DbSession): """List all static content. Admin only.""" require_admin_with_2fa(current_user) result = await db.execute( select(StaticContent).order_by(StaticContent.key) ) return result.scalars().all() @router.get("/content/{key}", response_model=StaticContentResponse) async def get_content(key: str, current_user: CurrentUser, db: DbSession): """Get static content by key. Admin only.""" require_admin_with_2fa(current_user) result = await db.execute( select(StaticContent).where(StaticContent.key == key) ) content = result.scalar_one_or_none() if not content: raise HTTPException(status_code=404, detail="Content not found") return content @router.put("/content/{key}", response_model=StaticContentResponse) async def update_content( request: Request, key: str, data: StaticContentUpdate, current_user: CurrentUser, db: DbSession, ): """Update static content. Admin only.""" require_admin_with_2fa(current_user) result = await db.execute( select(StaticContent).where(StaticContent.key == key) ) content = result.scalar_one_or_none() if not content: raise HTTPException(status_code=404, detail="Content not found") content.title = data.title content.content = data.content content.updated_by_id = current_user.id content.updated_at = datetime.utcnow() await db.commit() await db.refresh(content) # Log action await log_admin_action( db, current_user.id, AdminActionType.CONTENT_UPDATE.value, "content", content.id, {"key": key, "title": data.title}, request.client.host if request.client else None ) return content @router.post("/content", response_model=StaticContentResponse) async def create_content( request: Request, data: StaticContentCreate, current_user: CurrentUser, db: DbSession, ): """Create static content. Admin only.""" require_admin_with_2fa(current_user) # Check if key exists result = await db.execute( select(StaticContent).where(StaticContent.key == data.key) ) if result.scalar_one_or_none(): raise HTTPException(status_code=400, detail="Content with this key already exists") content = StaticContent( key=data.key, title=data.title, content=data.content, updated_by_id=current_user.id, ) db.add(content) await db.commit() await db.refresh(content) return content @router.delete("/content/{key}", response_model=MessageResponse) async def delete_content( key: str, request: Request, current_user: CurrentUser, db: DbSession, ): """Delete static content. Admin only.""" require_admin_with_2fa(current_user) result = await db.execute( select(StaticContent).where(StaticContent.key == key) ) content = result.scalar_one_or_none() if not content: raise HTTPException(status_code=404, detail="Content not found") await db.delete(content) await db.commit() # Log action await log_admin_action( db, current_user.id, AdminActionType.CONTENT_UPDATE.value, "static_content", content.id, {"action": "delete", "key": key}, request.client.host if request.client else None ) return {"message": f"Content '{key}' deleted successfully"} # ============ Dashboard ============ @router.get("/dashboard", response_model=DashboardStats) async def get_dashboard(current_user: CurrentUser, db: DbSession): """Get dashboard statistics. Admin only.""" require_admin_with_2fa(current_user) users_count = await db.scalar(select(func.count()).select_from(User)) banned_users_count = await db.scalar( select(func.count()).select_from(User).where(User.is_banned == True) ) marathons_count = await db.scalar(select(func.count()).select_from(Marathon)) active_marathons_count = await db.scalar( select(func.count()).select_from(Marathon).where(Marathon.status == MarathonStatus.ACTIVE.value) ) games_count = await db.scalar(select(func.count()).select_from(Game)) total_participations = await db.scalar(select(func.count()).select_from(Participant)) # Get recent logs result = await db.execute( select(AdminLog) .options(selectinload(AdminLog.admin)) .order_by(AdminLog.created_at.desc()) .limit(10) ) recent_logs = result.scalars().all() return DashboardStats( users_count=users_count or 0, banned_users_count=banned_users_count or 0, marathons_count=marathons_count or 0, active_marathons_count=active_marathons_count or 0, games_count=games_count or 0, total_participations=total_participations or 0, recent_logs=[ AdminLogResponse( id=log.id, admin_id=log.admin_id, admin_nickname=log.admin.nickname if log.admin else None, action=log.action, target_type=log.target_type, target_id=log.target_id, details=log.details, ip_address=log.ip_address, created_at=log.created_at, ) for log in recent_logs ], )