from sqlalchemy import select, func from sqlalchemy.ext.asyncio import AsyncSession from ..db_models import AnimeTheme from ..schemas import StorageStatsResponse from ..config import downloader_settings from ...db_models import Opening class StorageTrackerService: """Service for tracking S3 storage usage from DB (without scanning S3).""" def __init__(self, db: AsyncSession): self.db = db async def get_stats(self) -> StorageStatsResponse: """Calculate storage stats from database.""" # Sum file sizes from downloaded themes result = await self.db.execute( select(func.coalesce(func.sum(AnimeTheme.file_size_bytes), 0)) .where(AnimeTheme.file_size_bytes.isnot(None)) ) used_bytes = result.scalar() or 0 # Count openings in the main Opening table result = await self.db.execute( select(func.count(Opening.id)) ) openings_count = result.scalar() or 0 limit_bytes = downloader_settings.s3_storage_limit_bytes available_bytes = max(0, limit_bytes - used_bytes) used_percent = (used_bytes / limit_bytes * 100) if limit_bytes > 0 else 0 return StorageStatsResponse( used_bytes=used_bytes, limit_bytes=limit_bytes, used_percent=round(used_percent, 2), available_bytes=available_bytes, can_download=used_bytes < limit_bytes, openings_count=openings_count, ) async def get_estimated_queue_size(self) -> int: """Get estimated size of pending downloads in queue.""" from ..db_models import DownloadTask, DownloadStatus result = await self.db.execute( select(func.coalesce(func.sum(DownloadTask.estimated_size_bytes), 0)) .where(DownloadTask.status.in_([ DownloadStatus.QUEUED, DownloadStatus.DOWNLOADING, DownloadStatus.CONVERTING, DownloadStatus.UPLOADING, ])) ) return result.scalar() or 0