import asyncio import tempfile import re from pathlib import Path from typing import List from datetime import datetime, timezone import httpx from sqlalchemy import select, update from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from ..db_models import AnimeTheme, DownloadTask, DownloadStatus, Anime from ..schemas import QueueStatusResponse, QueueTaskResponse from ..config import downloader_settings from ...storage import storage from ...db_models import Opening class DownloadService: """Service for downloading and converting anime themes.""" def __init__(self, db: AsyncSession): self.db = db async def add_to_queue(self, theme_ids: List[int]) -> int: """Add themes to download queue (idempotent). Returns number of tasks added.""" added = 0 for theme_id in theme_ids: # Check if already in queue existing = await self.db.execute( select(DownloadTask).where(DownloadTask.theme_id == theme_id) ) if existing.scalar_one_or_none(): continue # Check if theme exists and is not already downloaded result = await self.db.execute( select(AnimeTheme).where(AnimeTheme.id == theme_id) ) theme = result.scalar_one_or_none() if not theme: continue # Skip if already downloaded if theme.audio_s3_key: continue # Skip if no video URL available if not theme.animethemes_video_url: continue task = DownloadTask( theme_id=theme_id, status=DownloadStatus.QUEUED, estimated_size_bytes=downloader_settings.default_estimated_size_bytes, ) self.db.add(task) added += 1 await self.db.commit() return added async def add_all_anime_themes(self, anime_id: int) -> int: """Add all themes from anime to queue. Returns number of tasks added.""" result = await self.db.execute( select(AnimeTheme) .where(AnimeTheme.anime_id == anime_id) .where(AnimeTheme.audio_s3_key.is_(None)) .where(AnimeTheme.animethemes_video_url.isnot(None)) ) themes = result.scalars().all() return await self.add_to_queue([t.id for t in themes]) async def get_queue_status(self, worker_running: bool = False) -> QueueStatusResponse: """Get current queue status.""" result = await self.db.execute( select(DownloadTask) .options(selectinload(DownloadTask.theme).selectinload(AnimeTheme.anime)) .order_by(DownloadTask.created_at.desc()) ) tasks = result.scalars().all() task_responses = [] total_queued = 0 total_downloading = 0 total_done = 0 total_failed = 0 estimated_queue_size = 0 for task in tasks: theme = task.theme anime = theme.anime anime_title = anime.title_russian or anime.title_english or "Unknown" task_responses.append(QueueTaskResponse( id=task.id, theme_id=theme.id, anime_title=anime_title, theme_name=theme.full_name, song_title=theme.song_title, status=task.status, progress_percent=task.progress_percent, error_message=task.error_message, estimated_size_bytes=task.estimated_size_bytes, created_at=task.created_at, started_at=task.started_at, completed_at=task.completed_at, )) if task.status == DownloadStatus.QUEUED: total_queued += 1 estimated_queue_size += task.estimated_size_bytes elif task.status in (DownloadStatus.DOWNLOADING, DownloadStatus.CONVERTING, DownloadStatus.UPLOADING): total_downloading += 1 estimated_queue_size += task.estimated_size_bytes elif task.status == DownloadStatus.DONE: total_done += 1 elif task.status == DownloadStatus.FAILED: total_failed += 1 return QueueStatusResponse( tasks=task_responses, total_queued=total_queued, total_downloading=total_downloading, total_done=total_done, total_failed=total_failed, estimated_queue_size_bytes=estimated_queue_size, worker_running=worker_running, ) async def process_queue(self) -> None: """Process download queue (called as background task).""" while True: # Get next queued task result = await self.db.execute( select(DownloadTask) .where(DownloadTask.status == DownloadStatus.QUEUED) .order_by(DownloadTask.created_at) .limit(1) ) task = result.scalar_one_or_none() if not task: break await self._process_task(task) async def _process_task(self, task: DownloadTask) -> None: """Process a single download task.""" try: # Update status to downloading task.status = DownloadStatus.DOWNLOADING task.started_at = datetime.now(timezone.utc) task.progress_percent = 10 await self.db.commit() # Get theme info with anime result = await self.db.execute( select(AnimeTheme) .options(selectinload(AnimeTheme.anime)) .where(AnimeTheme.id == task.theme_id) ) theme = result.scalar_one() anime = theme.anime if not theme.animethemes_video_url: raise ValueError("No video URL available") # Download WebM file directly (no conversion needed) with tempfile.TemporaryDirectory() as tmp_dir: tmp_path = Path(tmp_dir) webm_file = tmp_path / "audio.webm" # Stream download WebM file async with httpx.AsyncClient() as client: async with client.stream( "GET", theme.animethemes_video_url, timeout=downloader_settings.download_timeout_seconds, follow_redirects=True, ) as response: response.raise_for_status() with open(webm_file, "wb") as f: async for chunk in response.aiter_bytes(chunk_size=8192): f.write(chunk) task.progress_percent = 70 task.status = DownloadStatus.UPLOADING await self.db.commit() # Generate safe S3 key anime_name = self._sanitize_filename( anime.title_english or f"anime_{anime.animethemes_slug}" ) theme_name = theme.full_name song_part = f"_{self._sanitize_filename(theme.song_title)}" if theme.song_title else "" s3_key = f"audio/{anime_name}_{theme_name}{song_part}.webm" # Read file and upload to S3 file_data = webm_file.read_bytes() file_size = len(file_data) success = storage.upload_file(s3_key, file_data, "video/webm") if not success: raise RuntimeError("Failed to upload to S3") # Update theme with file info theme.audio_s3_key = s3_key theme.file_size_bytes = file_size # Create Opening entity in main table opening = Opening( anime_name=anime.title_english or f"Anime {anime.animethemes_slug}", op_number=theme_name, song_name=theme.song_title, audio_file=s3_key.replace("audio/", ""), ) self.db.add(opening) await self.db.flush() theme.opening_id = opening.id # Mark task as done task.status = DownloadStatus.DONE task.progress_percent = 100 task.completed_at = datetime.now(timezone.utc) task.estimated_size_bytes = file_size await self.db.commit() except Exception as e: task.status = DownloadStatus.FAILED task.error_message = str(e)[:1000] task.progress_percent = 0 await self.db.commit() def _sanitize_filename(self, name: str) -> str: """Sanitize string for use in filename.""" if not name: return "unknown" # Remove or replace problematic characters sanitized = re.sub(r'[<>:"/\\|?*]', '', name) sanitized = sanitized.replace(' ', '_') # Limit length return sanitized[:100] async def cancel_task(self, task_id: int) -> bool: """Cancel a queued task. Returns True if cancelled.""" result = await self.db.execute( select(DownloadTask).where(DownloadTask.id == task_id) ) task = result.scalar_one_or_none() if not task or task.status != DownloadStatus.QUEUED: return False await self.db.delete(task) await self.db.commit() return True async def retry_task(self, task_id: int) -> bool: """Retry a failed task. Returns True if requeued.""" result = await self.db.execute( update(DownloadTask) .where(DownloadTask.id == task_id) .where(DownloadTask.status == DownloadStatus.FAILED) .values( status=DownloadStatus.QUEUED, error_message=None, progress_percent=0, started_at=None, completed_at=None, ) .returning(DownloadTask.id) ) updated = result.scalar_one_or_none() await self.db.commit() return updated is not None async def clear_completed_tasks(self, include_failed: bool = False) -> int: """Clear completed (and optionally failed) tasks. Returns number of deleted tasks.""" from sqlalchemy import delete statuses = [DownloadStatus.DONE] if include_failed: statuses.append(DownloadStatus.FAILED) result = await self.db.execute( delete(DownloadTask) .where(DownloadTask.status.in_(statuses)) .returning(DownloadTask.id) ) deleted_ids = result.scalars().all() await self.db.commit() return len(deleted_ids)