Files
mamonov.ep cc11f0b773 Remove Shikimori API, use AnimeThemes only, switch to WebM format
- Remove ShikimoriService, use AnimeThemes API for search
- Replace shikimori_id with animethemes_slug as primary identifier
- Remove FFmpeg MP3 conversion, download WebM directly
- Add .webm support in storage and upload endpoints
- Update frontend to use animethemes_slug

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2026-01-12 11:22:46 +03:00

228 lines
7.3 KiB
Python

import asyncio
from fastapi import APIRouter, Depends, HTTPException, Query, BackgroundTasks
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from typing import Optional
# Lock to prevent multiple workers running simultaneously
_worker_lock = asyncio.Lock()
_worker_running = False
from ..database import get_db
from .schemas import (
SearchResponse,
AnimeDetailResponse,
ThemeInfo,
AddToQueueRequest,
AddAllThemesRequest,
QueueStatusResponse,
StorageStatsResponse,
)
from .db_models import Anime, AnimeTheme, DownloadTask, DownloadStatus
from .services.animethemes import AnimeThemesService
from .services.downloader import DownloadService
from .services.storage_tracker import StorageTrackerService
router = APIRouter(prefix="/downloader", tags=["openings-downloader"])
# ============== Search ==============
@router.get("/search", response_model=SearchResponse)
async def search_anime(
query: str = Query(..., min_length=1, description="Search query"),
limit: int = Query(20, ge=1, le=50, description="Maximum results"),
db: AsyncSession = Depends(get_db),
):
"""Search anime via AnimeThemes API."""
service = AnimeThemesService()
results = await service.search(query, limit=limit)
return SearchResponse(results=results, total=len(results))
# ============== Anime Detail ==============
@router.get("/anime/{slug:path}", response_model=AnimeDetailResponse)
async def get_anime_detail(
slug: str,
db: AsyncSession = Depends(get_db),
):
"""Get anime details with available themes from AnimeThemes."""
animethemes_service = AnimeThemesService()
# Get or create anime record
anime = await animethemes_service.get_or_create_anime(db, slug)
if not anime:
raise HTTPException(status_code=404, detail=f"Anime not found: {slug}")
# Fetch themes from AnimeThemes API
themes = await animethemes_service.fetch_themes(db, anime)
# Get download status for each theme
theme_ids = [t.id for t in themes]
if theme_ids:
result = await db.execute(
select(DownloadTask)
.where(DownloadTask.theme_id.in_(theme_ids))
)
tasks = {t.theme_id: t for t in result.scalars().all()}
else:
tasks = {}
# Build response
theme_infos = []
for theme in themes:
task = tasks.get(theme.id)
theme_infos.append(ThemeInfo(
id=theme.id,
theme_type=theme.theme_type,
sequence=theme.sequence,
full_name=theme.full_name,
song_title=theme.song_title,
artist=theme.artist,
video_url=theme.animethemes_video_url,
is_downloaded=theme.is_downloaded,
download_status=task.status if task else None,
file_size_bytes=theme.file_size_bytes,
))
return AnimeDetailResponse(
id=anime.id,
animethemes_slug=anime.animethemes_slug,
title_english=anime.title_english,
year=anime.year,
poster_url=anime.poster_url,
themes=theme_infos,
)
# ============== Download Queue ==============
@router.post("/queue/add", response_model=QueueStatusResponse)
async def add_to_queue(
request: AddToQueueRequest,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db),
):
"""Add specific themes to download queue."""
storage_service = StorageTrackerService(db)
# Check storage limit
stats = await storage_service.get_stats()
if not stats.can_download:
raise HTTPException(
status_code=400,
detail=f"Storage limit exceeded ({stats.used_bytes}/{stats.limit_bytes} bytes)"
)
download_service = DownloadService(db)
added = await download_service.add_to_queue(request.theme_ids)
if added > 0:
# Trigger worker in background
background_tasks.add_task(_run_download_worker)
return await download_service.get_queue_status(worker_running=_worker_running)
@router.post("/queue/add-all", response_model=QueueStatusResponse)
async def add_all_anime_themes(
request: AddAllThemesRequest,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db),
):
"""Add all themes from an anime to download queue."""
storage_service = StorageTrackerService(db)
stats = await storage_service.get_stats()
if not stats.can_download:
raise HTTPException(status_code=400, detail="Storage limit exceeded")
download_service = DownloadService(db)
added = await download_service.add_all_anime_themes(request.anime_id)
if added > 0:
background_tasks.add_task(_run_download_worker)
return await download_service.get_queue_status(worker_running=_worker_running)
@router.get("/queue", response_model=QueueStatusResponse)
async def get_queue_status(db: AsyncSession = Depends(get_db)):
"""Get current download queue status."""
download_service = DownloadService(db)
return await download_service.get_queue_status(worker_running=_worker_running)
@router.delete("/queue/{task_id}")
async def cancel_task(task_id: int, db: AsyncSession = Depends(get_db)):
"""Cancel a queued task (not downloading)."""
download_service = DownloadService(db)
success = await download_service.cancel_task(task_id)
if not success:
raise HTTPException(status_code=400, detail="Cannot cancel task (not queued or not found)")
return {"message": "Task cancelled"}
@router.post("/queue/{task_id}/retry")
async def retry_task(
task_id: int,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db),
):
"""Retry a failed task."""
download_service = DownloadService(db)
success = await download_service.retry_task(task_id)
if not success:
raise HTTPException(status_code=400, detail="Cannot retry task (not failed or not found)")
background_tasks.add_task(_run_download_worker)
return {"message": "Task queued for retry"}
@router.delete("/queue/clear")
async def clear_completed_tasks(
include_failed: bool = Query(False, description="Also clear failed tasks"),
db: AsyncSession = Depends(get_db),
):
"""Clear completed (and optionally failed) tasks from the queue."""
download_service = DownloadService(db)
deleted_count = await download_service.clear_completed_tasks(include_failed=include_failed)
return {"message": f"Cleared {deleted_count} tasks", "deleted_count": deleted_count}
# ============== Storage ==============
@router.get("/storage", response_model=StorageStatsResponse)
async def get_storage_stats(db: AsyncSession = Depends(get_db)):
"""Get S3 storage usage stats (calculated from DB, not scanning S3)."""
storage_service = StorageTrackerService(db)
return await storage_service.get_stats()
# ============== Background Worker ==============
async def _run_download_worker():
"""Background task to process download queue."""
global _worker_running
# Skip if another worker is already running
if _worker_running:
return
async with _worker_lock:
if _worker_running:
return
_worker_running = True
try:
from ..database import async_session_maker
async with async_session_maker() as db:
download_service = DownloadService(db)
await download_service.process_queue()
finally:
_worker_running = False