298 lines
9.5 KiB
Python
298 lines
9.5 KiB
Python
|
|
"""
|
||
|
|
Shop Service - handles shop items, purchases, and inventory management
|
||
|
|
"""
|
||
|
|
from datetime import datetime
|
||
|
|
from fastapi import HTTPException
|
||
|
|
from sqlalchemy import select, update
|
||
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
|
from sqlalchemy.orm import selectinload
|
||
|
|
|
||
|
|
from app.models import User, ShopItem, UserInventory, ShopItemType
|
||
|
|
from app.services.coins import coins_service
|
||
|
|
|
||
|
|
|
||
|
|
class ShopService:
|
||
|
|
"""Service for shop operations"""
|
||
|
|
|
||
|
|
async def get_available_items(
|
||
|
|
self,
|
||
|
|
db: AsyncSession,
|
||
|
|
item_type: str | None = None,
|
||
|
|
include_unavailable: bool = False,
|
||
|
|
) -> list[ShopItem]:
|
||
|
|
"""
|
||
|
|
Get list of shop items.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
item_type: Filter by item type (frame, title, etc.)
|
||
|
|
include_unavailable: Include inactive/out of stock items
|
||
|
|
"""
|
||
|
|
query = select(ShopItem)
|
||
|
|
|
||
|
|
if item_type:
|
||
|
|
query = query.where(ShopItem.item_type == item_type)
|
||
|
|
|
||
|
|
if not include_unavailable:
|
||
|
|
now = datetime.utcnow()
|
||
|
|
query = query.where(
|
||
|
|
ShopItem.is_active == True,
|
||
|
|
(ShopItem.available_from.is_(None)) | (ShopItem.available_from <= now),
|
||
|
|
(ShopItem.available_until.is_(None)) | (ShopItem.available_until >= now),
|
||
|
|
(ShopItem.stock_remaining.is_(None)) | (ShopItem.stock_remaining > 0),
|
||
|
|
)
|
||
|
|
|
||
|
|
query = query.order_by(ShopItem.price.asc())
|
||
|
|
result = await db.execute(query)
|
||
|
|
return list(result.scalars().all())
|
||
|
|
|
||
|
|
async def get_item_by_id(self, db: AsyncSession, item_id: int) -> ShopItem | None:
|
||
|
|
"""Get shop item by ID"""
|
||
|
|
result = await db.execute(select(ShopItem).where(ShopItem.id == item_id))
|
||
|
|
return result.scalar_one_or_none()
|
||
|
|
|
||
|
|
async def get_item_by_code(self, db: AsyncSession, code: str) -> ShopItem | None:
|
||
|
|
"""Get shop item by code"""
|
||
|
|
result = await db.execute(select(ShopItem).where(ShopItem.code == code))
|
||
|
|
return result.scalar_one_or_none()
|
||
|
|
|
||
|
|
async def purchase_item(
|
||
|
|
self,
|
||
|
|
db: AsyncSession,
|
||
|
|
user: User,
|
||
|
|
item_id: int,
|
||
|
|
quantity: int = 1,
|
||
|
|
) -> tuple[UserInventory, int]:
|
||
|
|
"""
|
||
|
|
Purchase an item from the shop.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
user: The purchasing user
|
||
|
|
item_id: ID of item to purchase
|
||
|
|
quantity: Number to purchase (only for consumables)
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Tuple of (inventory item, total cost)
|
||
|
|
|
||
|
|
Raises:
|
||
|
|
HTTPException: If item not found, not available, or insufficient funds
|
||
|
|
"""
|
||
|
|
# Get item
|
||
|
|
item = await self.get_item_by_id(db, item_id)
|
||
|
|
if not item:
|
||
|
|
raise HTTPException(status_code=404, detail="Item not found")
|
||
|
|
|
||
|
|
# Check availability
|
||
|
|
if not item.is_available:
|
||
|
|
raise HTTPException(status_code=400, detail="Item is not available")
|
||
|
|
|
||
|
|
# For non-consumables, quantity is always 1
|
||
|
|
if item.item_type != ShopItemType.CONSUMABLE.value:
|
||
|
|
quantity = 1
|
||
|
|
|
||
|
|
# Check if already owned
|
||
|
|
existing = await db.execute(
|
||
|
|
select(UserInventory).where(
|
||
|
|
UserInventory.user_id == user.id,
|
||
|
|
UserInventory.item_id == item.id,
|
||
|
|
)
|
||
|
|
)
|
||
|
|
if existing.scalar_one_or_none():
|
||
|
|
raise HTTPException(status_code=400, detail="You already own this item")
|
||
|
|
|
||
|
|
# Check stock
|
||
|
|
if item.stock_remaining is not None and item.stock_remaining < quantity:
|
||
|
|
raise HTTPException(status_code=400, detail="Not enough stock available")
|
||
|
|
|
||
|
|
# Calculate total cost
|
||
|
|
total_cost = item.price * quantity
|
||
|
|
|
||
|
|
# Check balance
|
||
|
|
if user.coins_balance < total_cost:
|
||
|
|
raise HTTPException(status_code=400, detail="Not enough coins")
|
||
|
|
|
||
|
|
# Deduct coins
|
||
|
|
success = await coins_service.spend_coins(
|
||
|
|
db, user, total_cost,
|
||
|
|
f"Purchase: {item.name} x{quantity}",
|
||
|
|
"shop_item", item.id,
|
||
|
|
)
|
||
|
|
if not success:
|
||
|
|
raise HTTPException(status_code=400, detail="Payment failed")
|
||
|
|
|
||
|
|
# Add to inventory
|
||
|
|
if item.item_type == ShopItemType.CONSUMABLE.value:
|
||
|
|
# For consumables, increase quantity if already exists
|
||
|
|
existing_result = await db.execute(
|
||
|
|
select(UserInventory).where(
|
||
|
|
UserInventory.user_id == user.id,
|
||
|
|
UserInventory.item_id == item.id,
|
||
|
|
)
|
||
|
|
)
|
||
|
|
inv_item = existing_result.scalar_one_or_none()
|
||
|
|
|
||
|
|
if inv_item:
|
||
|
|
inv_item.quantity += quantity
|
||
|
|
else:
|
||
|
|
inv_item = UserInventory(
|
||
|
|
user_id=user.id,
|
||
|
|
item_id=item.id,
|
||
|
|
quantity=quantity,
|
||
|
|
)
|
||
|
|
db.add(inv_item)
|
||
|
|
else:
|
||
|
|
# For cosmetics, create new inventory entry
|
||
|
|
inv_item = UserInventory(
|
||
|
|
user_id=user.id,
|
||
|
|
item_id=item.id,
|
||
|
|
quantity=1,
|
||
|
|
)
|
||
|
|
db.add(inv_item)
|
||
|
|
|
||
|
|
# Decrease stock if limited
|
||
|
|
if item.stock_remaining is not None:
|
||
|
|
item.stock_remaining -= quantity
|
||
|
|
|
||
|
|
await db.flush()
|
||
|
|
return inv_item, total_cost
|
||
|
|
|
||
|
|
async def get_user_inventory(
|
||
|
|
self,
|
||
|
|
db: AsyncSession,
|
||
|
|
user_id: int,
|
||
|
|
item_type: str | None = None,
|
||
|
|
) -> list[UserInventory]:
|
||
|
|
"""Get user's inventory"""
|
||
|
|
query = (
|
||
|
|
select(UserInventory)
|
||
|
|
.options(selectinload(UserInventory.item))
|
||
|
|
.where(UserInventory.user_id == user_id)
|
||
|
|
)
|
||
|
|
|
||
|
|
if item_type:
|
||
|
|
query = query.join(ShopItem).where(ShopItem.item_type == item_type)
|
||
|
|
|
||
|
|
# Exclude empty consumables
|
||
|
|
query = query.where(UserInventory.quantity > 0)
|
||
|
|
|
||
|
|
result = await db.execute(query)
|
||
|
|
return list(result.scalars().all())
|
||
|
|
|
||
|
|
async def get_inventory_item(
|
||
|
|
self,
|
||
|
|
db: AsyncSession,
|
||
|
|
user_id: int,
|
||
|
|
inventory_id: int,
|
||
|
|
) -> UserInventory | None:
|
||
|
|
"""Get specific inventory item"""
|
||
|
|
result = await db.execute(
|
||
|
|
select(UserInventory)
|
||
|
|
.options(selectinload(UserInventory.item))
|
||
|
|
.where(
|
||
|
|
UserInventory.id == inventory_id,
|
||
|
|
UserInventory.user_id == user_id,
|
||
|
|
)
|
||
|
|
)
|
||
|
|
return result.scalar_one_or_none()
|
||
|
|
|
||
|
|
async def equip_item(
|
||
|
|
self,
|
||
|
|
db: AsyncSession,
|
||
|
|
user: User,
|
||
|
|
inventory_id: int,
|
||
|
|
) -> ShopItem:
|
||
|
|
"""
|
||
|
|
Equip a cosmetic item from inventory.
|
||
|
|
|
||
|
|
Returns: The equipped item
|
||
|
|
|
||
|
|
Raises:
|
||
|
|
HTTPException: If item not found or is a consumable
|
||
|
|
"""
|
||
|
|
# Get inventory item
|
||
|
|
inv_item = await self.get_inventory_item(db, user.id, inventory_id)
|
||
|
|
if not inv_item:
|
||
|
|
raise HTTPException(status_code=404, detail="Item not found in inventory")
|
||
|
|
|
||
|
|
item = inv_item.item
|
||
|
|
|
||
|
|
if item.item_type == ShopItemType.CONSUMABLE.value:
|
||
|
|
raise HTTPException(status_code=400, detail="Cannot equip consumables")
|
||
|
|
|
||
|
|
# Unequip current item of same type
|
||
|
|
await db.execute(
|
||
|
|
update(UserInventory)
|
||
|
|
.where(
|
||
|
|
UserInventory.user_id == user.id,
|
||
|
|
UserInventory.equipped == True,
|
||
|
|
UserInventory.item_id.in_(
|
||
|
|
select(ShopItem.id).where(ShopItem.item_type == item.item_type)
|
||
|
|
),
|
||
|
|
)
|
||
|
|
.values(equipped=False)
|
||
|
|
)
|
||
|
|
|
||
|
|
# Equip new item
|
||
|
|
inv_item.equipped = True
|
||
|
|
|
||
|
|
# Update user's equipped_*_id
|
||
|
|
if item.item_type == ShopItemType.FRAME.value:
|
||
|
|
user.equipped_frame_id = item.id
|
||
|
|
elif item.item_type == ShopItemType.TITLE.value:
|
||
|
|
user.equipped_title_id = item.id
|
||
|
|
elif item.item_type == ShopItemType.NAME_COLOR.value:
|
||
|
|
user.equipped_name_color_id = item.id
|
||
|
|
elif item.item_type == ShopItemType.BACKGROUND.value:
|
||
|
|
user.equipped_background_id = item.id
|
||
|
|
|
||
|
|
return item
|
||
|
|
|
||
|
|
async def unequip_item(
|
||
|
|
self,
|
||
|
|
db: AsyncSession,
|
||
|
|
user: User,
|
||
|
|
item_type: str,
|
||
|
|
) -> None:
|
||
|
|
"""Unequip item of specified type"""
|
||
|
|
# Unequip from inventory
|
||
|
|
await db.execute(
|
||
|
|
update(UserInventory)
|
||
|
|
.where(
|
||
|
|
UserInventory.user_id == user.id,
|
||
|
|
UserInventory.equipped == True,
|
||
|
|
UserInventory.item_id.in_(
|
||
|
|
select(ShopItem.id).where(ShopItem.item_type == item_type)
|
||
|
|
),
|
||
|
|
)
|
||
|
|
.values(equipped=False)
|
||
|
|
)
|
||
|
|
|
||
|
|
# Clear user's equipped_*_id
|
||
|
|
if item_type == ShopItemType.FRAME.value:
|
||
|
|
user.equipped_frame_id = None
|
||
|
|
elif item_type == ShopItemType.TITLE.value:
|
||
|
|
user.equipped_title_id = None
|
||
|
|
elif item_type == ShopItemType.NAME_COLOR.value:
|
||
|
|
user.equipped_name_color_id = None
|
||
|
|
elif item_type == ShopItemType.BACKGROUND.value:
|
||
|
|
user.equipped_background_id = None
|
||
|
|
|
||
|
|
async def check_user_owns_item(
|
||
|
|
self,
|
||
|
|
db: AsyncSession,
|
||
|
|
user_id: int,
|
||
|
|
item_id: int,
|
||
|
|
) -> bool:
|
||
|
|
"""Check if user owns an item"""
|
||
|
|
result = await db.execute(
|
||
|
|
select(UserInventory).where(
|
||
|
|
UserInventory.user_id == user_id,
|
||
|
|
UserInventory.item_id == item_id,
|
||
|
|
UserInventory.quantity > 0,
|
||
|
|
)
|
||
|
|
)
|
||
|
|
return result.scalar_one_or_none() is not None
|
||
|
|
|
||
|
|
|
||
|
|
# Singleton instance
|
||
|
|
shop_service = ShopService()
|