Files
game-marathon/backend/app/services/consumables.py

321 lines
9.4 KiB
Python
Raw Normal View History

2026-01-05 07:15:50 +07:00
"""
Consumables Service - handles consumable items usage (skip, shield, boost, reroll)
"""
2026-01-08 06:51:15 +07:00
from datetime import datetime
2026-01-05 07:15:50 +07:00
from fastapi import HTTPException
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.models import (
User, Participant, Marathon, Assignment, AssignmentStatus,
ShopItem, UserInventory, ConsumableUsage, ConsumableType
)
class ConsumablesService:
"""Service for consumable items"""
# Boost settings
BOOST_MULTIPLIER = 1.5
async def use_skip(
self,
db: AsyncSession,
user: User,
participant: Participant,
marathon: Marathon,
assignment: Assignment,
) -> dict:
"""
Use a Skip to bypass current assignment without penalty.
- No streak loss
- No drop penalty
- Assignment marked as dropped but without negative effects
Returns: dict with result info
Raises:
HTTPException: If skips not allowed or limit reached
"""
# Check marathon settings
if not marathon.allow_skips:
raise HTTPException(status_code=400, detail="Skips are not allowed in this marathon")
if marathon.max_skips_per_participant is not None:
if participant.skips_used >= marathon.max_skips_per_participant:
raise HTTPException(
status_code=400,
detail=f"Skip limit reached ({marathon.max_skips_per_participant} per participant)"
)
# Check assignment is active
if assignment.status != AssignmentStatus.ACTIVE.value:
raise HTTPException(status_code=400, detail="Can only skip active assignments")
# Consume skip from inventory
item = await self._consume_item(db, user, ConsumableType.SKIP.value)
# Mark assignment as dropped (but without penalty)
assignment.status = AssignmentStatus.DROPPED.value
assignment.completed_at = datetime.utcnow()
# Note: We do NOT increase drop_count or reset streak
# Track skip usage
participant.skips_used += 1
# Log usage
usage = ConsumableUsage(
user_id=user.id,
item_id=item.id,
marathon_id=marathon.id,
assignment_id=assignment.id,
effect_data={
"type": "skip",
"skipped_without_penalty": True,
},
)
db.add(usage)
return {
"success": True,
"skipped": True,
"penalty": 0,
"streak_preserved": True,
}
async def use_shield(
self,
db: AsyncSession,
user: User,
participant: Participant,
marathon: Marathon,
) -> dict:
"""
Activate a Shield - protects from next drop penalty.
- Next drop will not cause point penalty
- Streak is preserved on next drop
Returns: dict with result info
Raises:
HTTPException: If consumables not allowed or shield already active
"""
if not marathon.allow_consumables:
raise HTTPException(status_code=400, detail="Consumables are not allowed in this marathon")
if participant.has_shield:
raise HTTPException(status_code=400, detail="Shield is already active")
# Consume shield from inventory
item = await self._consume_item(db, user, ConsumableType.SHIELD.value)
# Activate shield
participant.has_shield = True
# Log usage
usage = ConsumableUsage(
user_id=user.id,
item_id=item.id,
marathon_id=marathon.id,
effect_data={
"type": "shield",
"activated": True,
},
)
db.add(usage)
return {
"success": True,
"shield_activated": True,
}
async def use_boost(
self,
db: AsyncSession,
user: User,
participant: Participant,
marathon: Marathon,
) -> dict:
"""
2026-01-08 06:51:15 +07:00
Activate a Boost - multiplies points for NEXT complete only.
2026-01-05 07:15:50 +07:00
2026-01-08 06:51:15 +07:00
- Points for next completed challenge are multiplied by BOOST_MULTIPLIER
- One-time use (consumed on next complete)
2026-01-05 07:15:50 +07:00
Returns: dict with result info
Raises:
HTTPException: If consumables not allowed or boost already active
"""
if not marathon.allow_consumables:
raise HTTPException(status_code=400, detail="Consumables are not allowed in this marathon")
if participant.has_active_boost:
2026-01-08 06:51:15 +07:00
raise HTTPException(status_code=400, detail="Boost is already activated")
2026-01-05 07:15:50 +07:00
# Consume boost from inventory
item = await self._consume_item(db, user, ConsumableType.BOOST.value)
2026-01-08 06:51:15 +07:00
# Activate boost (one-time use)
participant.has_active_boost = True
2026-01-05 07:15:50 +07:00
# Log usage
usage = ConsumableUsage(
user_id=user.id,
item_id=item.id,
marathon_id=marathon.id,
effect_data={
"type": "boost",
"multiplier": self.BOOST_MULTIPLIER,
2026-01-08 06:51:15 +07:00
"one_time": True,
2026-01-05 07:15:50 +07:00
},
)
db.add(usage)
return {
"success": True,
"boost_activated": True,
"multiplier": self.BOOST_MULTIPLIER,
}
async def use_reroll(
self,
db: AsyncSession,
user: User,
participant: Participant,
marathon: Marathon,
assignment: Assignment,
) -> dict:
"""
Use a Reroll - discard current assignment and spin again.
- Current assignment is cancelled (not dropped)
- User can spin the wheel again
- No penalty
Returns: dict with result info
Raises:
HTTPException: If consumables not allowed or assignment not active
"""
if not marathon.allow_consumables:
raise HTTPException(status_code=400, detail="Consumables are not allowed in this marathon")
if assignment.status != AssignmentStatus.ACTIVE.value:
raise HTTPException(status_code=400, detail="Can only reroll active assignments")
# Consume reroll from inventory
item = await self._consume_item(db, user, ConsumableType.REROLL.value)
# Cancel current assignment
old_challenge_id = assignment.challenge_id
old_game_id = assignment.game_id
assignment.status = AssignmentStatus.DROPPED.value
assignment.completed_at = datetime.utcnow()
# Note: We do NOT increase drop_count (this is a reroll, not a real drop)
# Log usage
usage = ConsumableUsage(
user_id=user.id,
item_id=item.id,
marathon_id=marathon.id,
assignment_id=assignment.id,
effect_data={
"type": "reroll",
"rerolled_from_challenge_id": old_challenge_id,
"rerolled_from_game_id": old_game_id,
},
)
db.add(usage)
return {
"success": True,
"rerolled": True,
"can_spin_again": True,
}
async def _consume_item(
self,
db: AsyncSession,
user: User,
item_code: str,
) -> ShopItem:
"""
Consume 1 unit of a consumable from user's inventory.
Returns: The consumed ShopItem
Raises:
HTTPException: If user doesn't have the item
"""
result = await db.execute(
select(UserInventory)
.options(selectinload(UserInventory.item))
.join(ShopItem)
.where(
UserInventory.user_id == user.id,
ShopItem.code == item_code,
UserInventory.quantity > 0,
)
)
inv_item = result.scalar_one_or_none()
if not inv_item:
raise HTTPException(
status_code=400,
detail=f"You don't have any {item_code} in your inventory"
)
# Decrease quantity
inv_item.quantity -= 1
return inv_item.item
async def get_consumable_count(
self,
db: AsyncSession,
user_id: int,
item_code: str,
) -> int:
"""Get how many of a consumable user has"""
result = await db.execute(
select(UserInventory.quantity)
.join(ShopItem)
.where(
UserInventory.user_id == user_id,
ShopItem.code == item_code,
)
)
quantity = result.scalar_one_or_none()
return quantity or 0
2026-01-08 06:51:15 +07:00
def consume_shield(self, participant: Participant) -> bool:
2026-01-05 07:15:50 +07:00
"""
Consume shield when dropping (called from wheel.py).
Returns: True if shield was consumed, False otherwise
"""
if participant.has_shield:
participant.has_shield = False
return True
return False
2026-01-08 06:51:15 +07:00
def consume_boost_on_complete(self, participant: Participant) -> float:
2026-01-05 07:15:50 +07:00
"""
2026-01-08 06:51:15 +07:00
Consume boost when completing assignment (called from wheel.py).
One-time use - boost is consumed after single complete.
2026-01-05 07:15:50 +07:00
2026-01-08 06:51:15 +07:00
Returns: Multiplier value (BOOST_MULTIPLIER if boost was active, 1.0 otherwise)
2026-01-05 07:15:50 +07:00
"""
2026-01-08 06:51:15 +07:00
if participant.has_active_boost:
participant.has_active_boost = False
return self.BOOST_MULTIPLIER
return 1.0
2026-01-05 07:15:50 +07:00
# Singleton instance
consumables_service = ConsumablesService()