Files
game-marathon/backend/app/api/v1/widgets.py

424 lines
14 KiB
Python
Raw Normal View History

import secrets
from datetime import datetime
from fastapi import APIRouter, HTTPException, status, Query
from sqlalchemy import select, func
from sqlalchemy.orm import selectinload
from app.api.deps import DbSession, CurrentUser, require_participant
from app.models import (
WidgetToken, Participant, Marathon, Assignment, AssignmentStatus,
BonusAssignment, BonusAssignmentStatus,
)
from app.schemas.widget import (
WidgetTokenResponse,
WidgetTokenListItem,
WidgetLeaderboardEntry,
WidgetLeaderboardResponse,
WidgetCurrentResponse,
WidgetProgressResponse,
)
from app.schemas.common import MessageResponse
from app.core.config import settings
router = APIRouter(prefix="/widgets", tags=["widgets"])
def get_avatar_url(user) -> str | None:
"""Get avatar URL - through backend API if user has avatar, else telegram"""
if user.avatar_path:
return f"/api/v1/users/{user.id}/avatar"
return user.telegram_avatar_url
def generate_widget_token() -> str:
"""Generate a secure widget token"""
return f"wgt_{secrets.token_urlsafe(32)}"
def build_widget_urls(marathon_id: int, token: str) -> dict[str, str]:
"""Build widget URLs for the token"""
base_url = settings.FRONTEND_URL or "http://localhost:5173"
params = f"marathon={marathon_id}&token={token}"
return {
"leaderboard": f"{base_url}/widget/leaderboard?{params}",
"current": f"{base_url}/widget/current?{params}",
"progress": f"{base_url}/widget/progress?{params}",
}
# === Token management (authenticated) ===
@router.post("/marathons/{marathon_id}/token", response_model=WidgetTokenResponse)
async def create_widget_token(
marathon_id: int,
current_user: CurrentUser,
db: DbSession,
):
"""Create a widget token for the current user in a marathon"""
participant = await require_participant(db, current_user.id, marathon_id)
# Check if user already has an active token
existing = await db.scalar(
select(WidgetToken).where(
WidgetToken.participant_id == participant.id,
WidgetToken.marathon_id == marathon_id,
WidgetToken.is_active == True,
)
)
if existing:
# Return existing token
return WidgetTokenResponse(
id=existing.id,
token=existing.token,
created_at=existing.created_at,
expires_at=existing.expires_at,
is_active=existing.is_active,
urls=build_widget_urls(marathon_id, existing.token),
)
# Create new token
token = generate_widget_token()
widget_token = WidgetToken(
token=token,
participant_id=participant.id,
marathon_id=marathon_id,
)
db.add(widget_token)
await db.commit()
await db.refresh(widget_token)
return WidgetTokenResponse(
id=widget_token.id,
token=widget_token.token,
created_at=widget_token.created_at,
expires_at=widget_token.expires_at,
is_active=widget_token.is_active,
urls=build_widget_urls(marathon_id, widget_token.token),
)
@router.get("/marathons/{marathon_id}/tokens", response_model=list[WidgetTokenListItem])
async def list_widget_tokens(
marathon_id: int,
current_user: CurrentUser,
db: DbSession,
):
"""List all widget tokens for the current user in a marathon"""
participant = await require_participant(db, current_user.id, marathon_id)
result = await db.execute(
select(WidgetToken)
.where(
WidgetToken.participant_id == participant.id,
WidgetToken.marathon_id == marathon_id,
)
.order_by(WidgetToken.created_at.desc())
)
tokens = result.scalars().all()
return [
WidgetTokenListItem(
id=t.id,
token=t.token,
created_at=t.created_at,
is_active=t.is_active,
)
for t in tokens
]
@router.delete("/tokens/{token_id}", response_model=MessageResponse)
async def revoke_widget_token(
token_id: int,
current_user: CurrentUser,
db: DbSession,
):
"""Revoke a widget token"""
result = await db.execute(
select(WidgetToken)
.options(selectinload(WidgetToken.participant))
.where(WidgetToken.id == token_id)
)
widget_token = result.scalar_one_or_none()
if not widget_token:
raise HTTPException(status_code=404, detail="Token not found")
if widget_token.participant.user_id != current_user.id and not current_user.is_admin:
raise HTTPException(status_code=403, detail="Not authorized to revoke this token")
widget_token.is_active = False
await db.commit()
return MessageResponse(message="Token revoked")
@router.post("/tokens/{token_id}/regenerate", response_model=WidgetTokenResponse)
async def regenerate_widget_token(
token_id: int,
current_user: CurrentUser,
db: DbSession,
):
"""Regenerate a widget token (deactivates old, creates new)"""
result = await db.execute(
select(WidgetToken)
.options(selectinload(WidgetToken.participant))
.where(WidgetToken.id == token_id)
)
old_token = result.scalar_one_or_none()
if not old_token:
raise HTTPException(status_code=404, detail="Token not found")
if old_token.participant.user_id != current_user.id and not current_user.is_admin:
raise HTTPException(status_code=403, detail="Not authorized")
# Deactivate old token
old_token.is_active = False
# Create new token
new_token = WidgetToken(
token=generate_widget_token(),
participant_id=old_token.participant_id,
marathon_id=old_token.marathon_id,
)
db.add(new_token)
await db.commit()
await db.refresh(new_token)
return WidgetTokenResponse(
id=new_token.id,
token=new_token.token,
created_at=new_token.created_at,
expires_at=new_token.expires_at,
is_active=new_token.is_active,
urls=build_widget_urls(new_token.marathon_id, new_token.token),
)
# === Public widget endpoints (authenticated via widget token) ===
async def validate_widget_token(token: str, marathon_id: int, db) -> WidgetToken:
"""Validate widget token and return it"""
result = await db.execute(
select(WidgetToken)
.options(
selectinload(WidgetToken.participant).selectinload(Participant.user),
selectinload(WidgetToken.marathon),
)
.where(
WidgetToken.token == token,
WidgetToken.marathon_id == marathon_id,
WidgetToken.is_active == True,
)
)
widget_token = result.scalar_one_or_none()
if not widget_token:
raise HTTPException(status_code=401, detail="Invalid widget token")
if widget_token.expires_at and widget_token.expires_at < datetime.utcnow():
raise HTTPException(status_code=401, detail="Widget token expired")
return widget_token
@router.get("/data/leaderboard", response_model=WidgetLeaderboardResponse)
async def widget_leaderboard(
marathon: int = Query(..., description="Marathon ID"),
token: str = Query(..., description="Widget token"),
count: int = Query(5, ge=1, le=50, description="Number of participants"),
db: DbSession = None,
):
"""Get leaderboard data for widget"""
widget_token = await validate_widget_token(token, marathon, db)
current_participant = widget_token.participant
# Get all participants ordered by points
result = await db.execute(
select(Participant)
.options(selectinload(Participant.user))
.where(Participant.marathon_id == marathon)
.order_by(Participant.total_points.desc())
)
all_participants = result.scalars().all()
total_participants = len(all_participants)
current_user_rank = None
# Find current user rank and build entries
entries = []
for rank, p in enumerate(all_participants, 1):
if p.id == current_participant.id:
current_user_rank = rank
if rank <= count:
user = p.user
entries.append(WidgetLeaderboardEntry(
rank=rank,
nickname=user.nickname,
avatar_url=get_avatar_url(user),
total_points=p.total_points,
current_streak=p.current_streak,
is_current_user=(p.id == current_participant.id),
))
return WidgetLeaderboardResponse(
entries=entries,
current_user_rank=current_user_rank,
total_participants=total_participants,
marathon_title=widget_token.marathon.title,
)
@router.get("/data/current", response_model=WidgetCurrentResponse)
async def widget_current_assignment(
marathon: int = Query(..., description="Marathon ID"),
token: str = Query(..., description="Widget token"),
db: DbSession = None,
):
"""Get current assignment data for widget"""
widget_token = await validate_widget_token(token, marathon, db)
participant = widget_token.participant
# Get active assignment
result = await db.execute(
select(Assignment)
.options(
selectinload(Assignment.challenge),
selectinload(Assignment.game),
)
.where(
Assignment.participant_id == participant.id,
Assignment.status.in_([
AssignmentStatus.ACTIVE.value,
AssignmentStatus.RETURNED.value,
]),
)
.order_by(Assignment.started_at.desc())
.limit(1)
)
assignment = result.scalar_one_or_none()
if not assignment:
return WidgetCurrentResponse(has_assignment=False)
# Determine assignment type and details
if assignment.is_playthrough:
game = assignment.game
assignment_type = "playthrough"
challenge_title = "Прохождение"
challenge_description = game.playthrough_description
points = game.playthrough_points
difficulty = None
# Count bonus challenges
bonus_result = await db.execute(
select(func.count()).select_from(BonusAssignment)
.where(BonusAssignment.main_assignment_id == assignment.id)
)
bonus_total = bonus_result.scalar() or 0
completed_result = await db.execute(
select(func.count()).select_from(BonusAssignment)
.where(
BonusAssignment.main_assignment_id == assignment.id,
BonusAssignment.status == BonusAssignmentStatus.COMPLETED.value,
)
)
bonus_completed = completed_result.scalar() or 0
game_title = game.title
game_cover_url = f"/api/v1/games/{game.id}/cover" if game.cover_path else None
else:
challenge = assignment.challenge
assignment_type = "challenge"
challenge_title = challenge.title
challenge_description = challenge.description
points = challenge.points
difficulty = challenge.difficulty
bonus_completed = None
bonus_total = None
game = challenge.game if hasattr(challenge, 'game') else None
if not game:
# Load game via challenge
from app.models import Game
game_result = await db.execute(
select(Game).where(Game.id == challenge.game_id)
)
game = game_result.scalar_one_or_none()
game_title = game.title if game else None
game_cover_url = f"/api/v1/games/{game.id}/cover" if game and game.cover_path else None
return WidgetCurrentResponse(
has_assignment=True,
game_title=game_title,
game_cover_url=game_cover_url,
assignment_type=assignment_type,
challenge_title=challenge_title,
challenge_description=challenge_description,
points=points,
difficulty=difficulty,
bonus_completed=bonus_completed,
bonus_total=bonus_total,
)
@router.get("/data/progress", response_model=WidgetProgressResponse)
async def widget_progress(
marathon: int = Query(..., description="Marathon ID"),
token: str = Query(..., description="Widget token"),
db: DbSession = None,
):
"""Get participant progress data for widget"""
widget_token = await validate_widget_token(token, marathon, db)
participant = widget_token.participant
user = participant.user
# Calculate rank
result = await db.execute(
select(func.count())
.select_from(Participant)
.where(
Participant.marathon_id == marathon,
Participant.total_points > participant.total_points,
)
)
higher_count = result.scalar() or 0
rank = higher_count + 1
# Count completed and dropped assignments
completed_result = await db.execute(
select(func.count())
.select_from(Assignment)
.where(
Assignment.participant_id == participant.id,
Assignment.status == AssignmentStatus.COMPLETED.value,
)
)
completed_count = completed_result.scalar() or 0
dropped_result = await db.execute(
select(func.count())
.select_from(Assignment)
.where(
Assignment.participant_id == participant.id,
Assignment.status == AssignmentStatus.DROPPED.value,
)
)
dropped_count = dropped_result.scalar() or 0
return WidgetProgressResponse(
nickname=user.nickname,
avatar_url=get_avatar_url(user),
rank=rank,
total_points=participant.total_points,
current_streak=participant.current_streak,
completed_count=completed_count,
dropped_count=dropped_count,
marathon_title=widget_token.marathon.title,
)