Files
game-marathon/backend/app/services/consumables.py

324 lines
9.6 KiB
Python
Raw Normal View History

2026-01-05 07:15:50 +07:00
"""
Consumables Service - handles consumable items usage (skip, shield, boost, reroll)
"""
from datetime import datetime, timedelta
from fastapi import HTTPException
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.models import (
User, Participant, Marathon, Assignment, AssignmentStatus,
ShopItem, UserInventory, ConsumableUsage, ConsumableType
)
class ConsumablesService:
"""Service for consumable items"""
# Boost settings
BOOST_DURATION_HOURS = 2
BOOST_MULTIPLIER = 1.5
async def use_skip(
self,
db: AsyncSession,
user: User,
participant: Participant,
marathon: Marathon,
assignment: Assignment,
) -> dict:
"""
Use a Skip to bypass current assignment without penalty.
- No streak loss
- No drop penalty
- Assignment marked as dropped but without negative effects
Returns: dict with result info
Raises:
HTTPException: If skips not allowed or limit reached
"""
# Check marathon settings
if not marathon.allow_skips:
raise HTTPException(status_code=400, detail="Skips are not allowed in this marathon")
if marathon.max_skips_per_participant is not None:
if participant.skips_used >= marathon.max_skips_per_participant:
raise HTTPException(
status_code=400,
detail=f"Skip limit reached ({marathon.max_skips_per_participant} per participant)"
)
# Check assignment is active
if assignment.status != AssignmentStatus.ACTIVE.value:
raise HTTPException(status_code=400, detail="Can only skip active assignments")
# Consume skip from inventory
item = await self._consume_item(db, user, ConsumableType.SKIP.value)
# Mark assignment as dropped (but without penalty)
assignment.status = AssignmentStatus.DROPPED.value
assignment.completed_at = datetime.utcnow()
# Note: We do NOT increase drop_count or reset streak
# Track skip usage
participant.skips_used += 1
# Log usage
usage = ConsumableUsage(
user_id=user.id,
item_id=item.id,
marathon_id=marathon.id,
assignment_id=assignment.id,
effect_data={
"type": "skip",
"skipped_without_penalty": True,
},
)
db.add(usage)
return {
"success": True,
"skipped": True,
"penalty": 0,
"streak_preserved": True,
}
async def use_shield(
self,
db: AsyncSession,
user: User,
participant: Participant,
marathon: Marathon,
) -> dict:
"""
Activate a Shield - protects from next drop penalty.
- Next drop will not cause point penalty
- Streak is preserved on next drop
Returns: dict with result info
Raises:
HTTPException: If consumables not allowed or shield already active
"""
if not marathon.allow_consumables:
raise HTTPException(status_code=400, detail="Consumables are not allowed in this marathon")
if participant.has_shield:
raise HTTPException(status_code=400, detail="Shield is already active")
# Consume shield from inventory
item = await self._consume_item(db, user, ConsumableType.SHIELD.value)
# Activate shield
participant.has_shield = True
# Log usage
usage = ConsumableUsage(
user_id=user.id,
item_id=item.id,
marathon_id=marathon.id,
effect_data={
"type": "shield",
"activated": True,
},
)
db.add(usage)
return {
"success": True,
"shield_activated": True,
}
async def use_boost(
self,
db: AsyncSession,
user: User,
participant: Participant,
marathon: Marathon,
) -> dict:
"""
Activate a Boost - multiplies points for next 2 hours.
- Points for completed challenges are multiplied by BOOST_MULTIPLIER
- Duration: BOOST_DURATION_HOURS
Returns: dict with result info
Raises:
HTTPException: If consumables not allowed or boost already active
"""
if not marathon.allow_consumables:
raise HTTPException(status_code=400, detail="Consumables are not allowed in this marathon")
if participant.has_active_boost:
raise HTTPException(
status_code=400,
detail=f"Boost already active until {participant.active_boost_expires_at}"
)
# Consume boost from inventory
item = await self._consume_item(db, user, ConsumableType.BOOST.value)
# Activate boost
participant.active_boost_multiplier = self.BOOST_MULTIPLIER
participant.active_boost_expires_at = datetime.utcnow() + timedelta(hours=self.BOOST_DURATION_HOURS)
# Log usage
usage = ConsumableUsage(
user_id=user.id,
item_id=item.id,
marathon_id=marathon.id,
effect_data={
"type": "boost",
"multiplier": self.BOOST_MULTIPLIER,
"duration_hours": self.BOOST_DURATION_HOURS,
"expires_at": participant.active_boost_expires_at.isoformat(),
},
)
db.add(usage)
return {
"success": True,
"boost_activated": True,
"multiplier": self.BOOST_MULTIPLIER,
"expires_at": participant.active_boost_expires_at,
}
async def use_reroll(
self,
db: AsyncSession,
user: User,
participant: Participant,
marathon: Marathon,
assignment: Assignment,
) -> dict:
"""
Use a Reroll - discard current assignment and spin again.
- Current assignment is cancelled (not dropped)
- User can spin the wheel again
- No penalty
Returns: dict with result info
Raises:
HTTPException: If consumables not allowed or assignment not active
"""
if not marathon.allow_consumables:
raise HTTPException(status_code=400, detail="Consumables are not allowed in this marathon")
if assignment.status != AssignmentStatus.ACTIVE.value:
raise HTTPException(status_code=400, detail="Can only reroll active assignments")
# Consume reroll from inventory
item = await self._consume_item(db, user, ConsumableType.REROLL.value)
# Cancel current assignment
old_challenge_id = assignment.challenge_id
old_game_id = assignment.game_id
assignment.status = AssignmentStatus.DROPPED.value
assignment.completed_at = datetime.utcnow()
# Note: We do NOT increase drop_count (this is a reroll, not a real drop)
# Log usage
usage = ConsumableUsage(
user_id=user.id,
item_id=item.id,
marathon_id=marathon.id,
assignment_id=assignment.id,
effect_data={
"type": "reroll",
"rerolled_from_challenge_id": old_challenge_id,
"rerolled_from_game_id": old_game_id,
},
)
db.add(usage)
return {
"success": True,
"rerolled": True,
"can_spin_again": True,
}
async def _consume_item(
self,
db: AsyncSession,
user: User,
item_code: str,
) -> ShopItem:
"""
Consume 1 unit of a consumable from user's inventory.
Returns: The consumed ShopItem
Raises:
HTTPException: If user doesn't have the item
"""
result = await db.execute(
select(UserInventory)
.options(selectinload(UserInventory.item))
.join(ShopItem)
.where(
UserInventory.user_id == user.id,
ShopItem.code == item_code,
UserInventory.quantity > 0,
)
)
inv_item = result.scalar_one_or_none()
if not inv_item:
raise HTTPException(
status_code=400,
detail=f"You don't have any {item_code} in your inventory"
)
# Decrease quantity
inv_item.quantity -= 1
return inv_item.item
async def get_consumable_count(
self,
db: AsyncSession,
user_id: int,
item_code: str,
) -> int:
"""Get how many of a consumable user has"""
result = await db.execute(
select(UserInventory.quantity)
.join(ShopItem)
.where(
UserInventory.user_id == user_id,
ShopItem.code == item_code,
)
)
quantity = result.scalar_one_or_none()
return quantity or 0
def consume_shield_on_drop(self, participant: Participant) -> bool:
"""
Consume shield when dropping (called from wheel.py).
Returns: True if shield was consumed, False otherwise
"""
if participant.has_shield:
participant.has_shield = False
return True
return False
def get_active_boost_multiplier(self, participant: Participant) -> float:
"""
Get current boost multiplier for participant.
Returns: Multiplier value (1.0 if no active boost)
"""
return participant.get_boost_multiplier()
# Singleton instance
consumables_service = ConsumablesService()