Files
anime-qize/backend/app/openings_downloader/services/downloader.py

305 lines
11 KiB
Python
Raw Normal View History

2026-01-10 11:06:45 +03:00
import asyncio
import tempfile
import re
from pathlib import Path
from typing import List
from datetime import datetime, timezone
import httpx
from sqlalchemy import select, update
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from ..db_models import AnimeTheme, DownloadTask, DownloadStatus, Anime
from ..schemas import QueueStatusResponse, QueueTaskResponse
from ..config import downloader_settings
from ...storage import storage
from ...db_models import Opening
class DownloadService:
"""Service for downloading and converting anime themes."""
def __init__(self, db: AsyncSession):
self.db = db
async def add_to_queue(self, theme_ids: List[int]) -> int:
"""Add themes to download queue (idempotent). Returns number of tasks added."""
added = 0
for theme_id in theme_ids:
# Check if already in queue
existing = await self.db.execute(
select(DownloadTask).where(DownloadTask.theme_id == theme_id)
)
if existing.scalar_one_or_none():
continue
# Check if theme exists and is not already downloaded
result = await self.db.execute(
select(AnimeTheme).where(AnimeTheme.id == theme_id)
)
theme = result.scalar_one_or_none()
if not theme:
continue
# Skip if already downloaded
if theme.audio_s3_key:
continue
# Skip if no video URL available
if not theme.animethemes_video_url:
continue
task = DownloadTask(
theme_id=theme_id,
status=DownloadStatus.QUEUED,
estimated_size_bytes=downloader_settings.default_estimated_size_bytes,
)
self.db.add(task)
added += 1
await self.db.commit()
return added
async def add_all_anime_themes(self, anime_id: int) -> int:
"""Add all themes from anime to queue. Returns number of tasks added."""
result = await self.db.execute(
select(AnimeTheme)
.where(AnimeTheme.anime_id == anime_id)
.where(AnimeTheme.audio_s3_key.is_(None))
.where(AnimeTheme.animethemes_video_url.isnot(None))
)
themes = result.scalars().all()
return await self.add_to_queue([t.id for t in themes])
async def get_queue_status(self, worker_running: bool = False) -> QueueStatusResponse:
"""Get current queue status."""
result = await self.db.execute(
select(DownloadTask)
.options(selectinload(DownloadTask.theme).selectinload(AnimeTheme.anime))
.order_by(DownloadTask.created_at.desc())
)
tasks = result.scalars().all()
task_responses = []
total_queued = 0
total_downloading = 0
total_done = 0
total_failed = 0
estimated_queue_size = 0
for task in tasks:
theme = task.theme
anime = theme.anime
anime_title = anime.title_russian or anime.title_english or "Unknown"
task_responses.append(QueueTaskResponse(
id=task.id,
theme_id=theme.id,
anime_title=anime_title,
theme_name=theme.full_name,
song_title=theme.song_title,
status=task.status,
progress_percent=task.progress_percent,
error_message=task.error_message,
estimated_size_bytes=task.estimated_size_bytes,
created_at=task.created_at,
started_at=task.started_at,
completed_at=task.completed_at,
))
if task.status == DownloadStatus.QUEUED:
total_queued += 1
estimated_queue_size += task.estimated_size_bytes
elif task.status in (DownloadStatus.DOWNLOADING, DownloadStatus.CONVERTING, DownloadStatus.UPLOADING):
total_downloading += 1
estimated_queue_size += task.estimated_size_bytes
elif task.status == DownloadStatus.DONE:
total_done += 1
elif task.status == DownloadStatus.FAILED:
total_failed += 1
return QueueStatusResponse(
tasks=task_responses,
total_queued=total_queued,
total_downloading=total_downloading,
total_done=total_done,
total_failed=total_failed,
estimated_queue_size_bytes=estimated_queue_size,
worker_running=worker_running,
)
async def process_queue(self) -> None:
"""Process download queue (called as background task)."""
while True:
# Get next queued task
result = await self.db.execute(
select(DownloadTask)
.where(DownloadTask.status == DownloadStatus.QUEUED)
.order_by(DownloadTask.created_at)
.limit(1)
)
task = result.scalar_one_or_none()
if not task:
break
await self._process_task(task)
async def _process_task(self, task: DownloadTask) -> None:
"""Process a single download task."""
try:
# Update status to downloading
task.status = DownloadStatus.DOWNLOADING
task.started_at = datetime.now(timezone.utc)
task.progress_percent = 10
await self.db.commit()
# Get theme info with anime
result = await self.db.execute(
select(AnimeTheme)
.options(selectinload(AnimeTheme.anime))
.where(AnimeTheme.id == task.theme_id)
)
theme = result.scalar_one()
anime = theme.anime
if not theme.animethemes_video_url:
raise ValueError("No video URL available")
# Download WebM file directly (no conversion needed)
2026-01-10 11:06:45 +03:00
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_path = Path(tmp_dir)
webm_file = tmp_path / "audio.webm"
2026-01-10 11:06:45 +03:00
# Stream download WebM file
async with httpx.AsyncClient() as client:
async with client.stream(
"GET",
theme.animethemes_video_url,
timeout=downloader_settings.download_timeout_seconds,
follow_redirects=True,
) as response:
response.raise_for_status()
with open(webm_file, "wb") as f:
async for chunk in response.aiter_bytes(chunk_size=8192):
f.write(chunk)
task.progress_percent = 70
task.status = DownloadStatus.UPLOADING
await self.db.commit()
# Generate safe S3 key
anime_name = self._sanitize_filename(
anime.title_english or f"anime_{anime.animethemes_slug}"
2026-01-10 11:06:45 +03:00
)
theme_name = theme.full_name
song_part = f"_{self._sanitize_filename(theme.song_title)}" if theme.song_title else ""
s3_key = f"audio/{anime_name}_{theme_name}{song_part}.webm"
2026-01-10 11:06:45 +03:00
# Read file and upload to S3
file_data = webm_file.read_bytes()
2026-01-10 11:06:45 +03:00
file_size = len(file_data)
success = storage.upload_file(s3_key, file_data, "video/webm")
2026-01-10 11:06:45 +03:00
if not success:
raise RuntimeError("Failed to upload to S3")
# Update theme with file info
theme.audio_s3_key = s3_key
theme.file_size_bytes = file_size
# Create Opening entity in main table
opening = Opening(
anime_name=anime.title_english or f"Anime {anime.animethemes_slug}",
2026-01-10 11:06:45 +03:00
op_number=theme_name,
song_name=theme.song_title,
audio_file=s3_key.replace("audio/", ""),
)
self.db.add(opening)
await self.db.flush()
theme.opening_id = opening.id
# Mark task as done
task.status = DownloadStatus.DONE
task.progress_percent = 100
task.completed_at = datetime.now(timezone.utc)
task.estimated_size_bytes = file_size
await self.db.commit()
except Exception as e:
task.status = DownloadStatus.FAILED
task.error_message = str(e)[:1000]
task.progress_percent = 0
await self.db.commit()
def _sanitize_filename(self, name: str) -> str:
"""Sanitize string for use in filename."""
if not name:
return "unknown"
# Remove or replace problematic characters
sanitized = re.sub(r'[<>:"/\\|?*]', '', name)
sanitized = sanitized.replace(' ', '_')
# Limit length
return sanitized[:100]
async def cancel_task(self, task_id: int) -> bool:
"""Cancel a queued task. Returns True if cancelled."""
result = await self.db.execute(
select(DownloadTask).where(DownloadTask.id == task_id)
)
task = result.scalar_one_or_none()
if not task or task.status != DownloadStatus.QUEUED:
return False
await self.db.delete(task)
await self.db.commit()
return True
async def retry_task(self, task_id: int) -> bool:
"""Retry a failed task. Returns True if requeued."""
result = await self.db.execute(
update(DownloadTask)
.where(DownloadTask.id == task_id)
.where(DownloadTask.status == DownloadStatus.FAILED)
.values(
status=DownloadStatus.QUEUED,
error_message=None,
progress_percent=0,
started_at=None,
completed_at=None,
)
.returning(DownloadTask.id)
)
updated = result.scalar_one_or_none()
await self.db.commit()
return updated is not None
async def clear_completed_tasks(self, include_failed: bool = False) -> int:
"""Clear completed (and optionally failed) tasks. Returns number of deleted tasks."""
from sqlalchemy import delete
statuses = [DownloadStatus.DONE]
if include_failed:
statuses.append(DownloadStatus.FAILED)
result = await self.db.execute(
delete(DownloadTask)
.where(DownloadTask.status.in_(statuses))
.returning(DownloadTask.id)
)
deleted_ids = result.scalars().all()
await self.db.commit()
return len(deleted_ids)