Files

243 lines
8.3 KiB
Python
Raw Permalink Normal View History

2026-01-10 11:06:45 +03:00
import httpx
import logging
import re
from typing import List, Optional
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from ..db_models import Anime, AnimeTheme, ThemeType
from ..schemas import AnimeSearchResult
2026-01-10 11:06:45 +03:00
logger = logging.getLogger(__name__)
# Shared HTTP client for AnimeThemes API
_animethemes_client: Optional[httpx.AsyncClient] = None
def _get_animethemes_client() -> httpx.AsyncClient:
"""Get or create shared HTTP client for AnimeThemes."""
global _animethemes_client
if _animethemes_client is None or _animethemes_client.is_closed:
_animethemes_client = httpx.AsyncClient(
base_url="https://api.animethemes.moe",
timeout=30.0,
)
return _animethemes_client
class AnimeThemesService:
"""Service for AnimeThemes API (api.animethemes.moe)."""
def __init__(self):
self.client = _get_animethemes_client()
async def search(self, query: str, limit: int = 20) -> List[AnimeSearchResult]:
"""Search anime by query using AnimeThemes API."""
try:
response = await self.client.get(
"/anime",
params={
"q": query,
"include": "images",
"page[size]": limit,
},
)
response.raise_for_status()
data = response.json()
results = []
for anime in data.get("anime", []):
# Get poster URL from images
poster_url = None
images = anime.get("images", [])
if images:
# Prefer large_cover or first available
for img in images:
if img.get("facet") == "Large Cover":
poster_url = img.get("link")
break
if not poster_url and images:
poster_url = images[0].get("link")
results.append(AnimeSearchResult(
animethemes_slug=anime.get("slug"),
title_english=anime.get("name"),
year=anime.get("year"),
poster_url=poster_url,
))
return results
except Exception as e:
logger.error(f"Failed to search AnimeThemes: {e}")
return []
async def get_or_create_anime(self, db: AsyncSession, slug: str) -> Optional[Anime]:
"""Get anime from DB or fetch from AnimeThemes and create."""
# Check if exists (with themes eagerly loaded)
query = (
select(Anime)
.where(Anime.animethemes_slug == slug)
.options(selectinload(Anime.themes))
)
result = await db.execute(query)
anime = result.scalar_one_or_none()
if anime:
return anime
# Fetch from AnimeThemes
try:
response = await self.client.get(
f"/anime/{slug}",
params={"include": "images"},
)
response.raise_for_status()
data = response.json()
except Exception as e:
logger.error(f"Failed to fetch anime from AnimeThemes: {e}")
return None
anime_data = data.get("anime", {})
if not anime_data:
return None
# Get poster URL
poster_url = None
images = anime_data.get("images", [])
if images:
for img in images:
if img.get("facet") == "Large Cover":
poster_url = img.get("link")
break
if not poster_url and images:
poster_url = images[0].get("link")
anime = Anime(
animethemes_slug=slug,
title_english=anime_data.get("name"),
year=anime_data.get("year"),
poster_url=poster_url,
)
db.add(anime)
await db.commit()
await db.refresh(anime)
return anime
2026-01-10 11:06:45 +03:00
async def fetch_themes(self, db: AsyncSession, anime: Anime) -> List[AnimeTheme]:
"""Fetch themes from AnimeThemes API and sync to DB."""
# Always reload anime with themes to avoid lazy loading issues
result = await db.execute(
select(Anime)
.where(Anime.id == anime.id)
.options(selectinload(Anime.themes))
)
anime = result.scalar_one()
current_themes = anime.themes or []
if not anime.animethemes_slug:
logger.warning(f"No AnimeThemes slug for anime {anime.id}")
2026-01-10 11:06:45 +03:00
return current_themes
# Fetch themes from AnimeThemes API
try:
response = await self.client.get(
f"/anime/{anime.animethemes_slug}",
params={
"include": "animethemes.animethemeentries.videos.audio,animethemes.song.artists",
},
)
if response.status_code != 200:
logger.warning(f"AnimeThemes API returned {response.status_code} for {anime.animethemes_slug}")
return current_themes
data = response.json()
except Exception as e:
logger.error(f"Failed to fetch themes from AnimeThemes for {anime.animethemes_slug}: {e}")
return current_themes
anime_data = data.get("anime", {})
themes_data = anime_data.get("animethemes", [])
logger.info(f"AnimeThemes API returned {len(themes_data)} themes for {anime.animethemes_slug}")
# Build dict of existing themes
existing_themes = {
(t.theme_type, t.sequence): t
for t in current_themes
}
for theme_data in themes_data:
# Parse theme type and sequence: "OP1", "ED1", etc.
slug = theme_data.get("slug", "") # e.g., "OP1", "ED1"
match = re.match(r"(OP|ED)(\d*)", slug)
if not match:
continue
theme_type = ThemeType.OP if match.group(1) == "OP" else ThemeType.ED
sequence = int(match.group(2)) if match.group(2) else 1
# Get video URL (prioritize video link for full quality)
2026-01-10 11:06:45 +03:00
video_url = None
entries = theme_data.get("animethemeentries", [])
if entries:
videos = entries[0].get("videos", [])
if videos:
# Get video link (WebM)
video_url = videos[0].get("link")
# Fallback to audio link if no video
2026-01-10 11:06:45 +03:00
if not video_url:
audio = videos[0].get("audio")
if audio:
video_url = audio.get("link")
2026-01-10 11:06:45 +03:00
# Get song info
song_data = theme_data.get("song", {})
song_title = song_data.get("title")
artist = None
artists = song_data.get("artists", [])
if artists:
artist = artists[0].get("name")
key = (theme_type, sequence)
if key in existing_themes:
# Update existing theme (skip if already has video_url)
2026-01-10 11:06:45 +03:00
theme = existing_themes[key]
if not theme.song_title:
theme.song_title = song_title
if not theme.artist:
theme.artist = artist
if video_url and not theme.animethemes_video_url:
2026-01-10 11:06:45 +03:00
theme.animethemes_video_url = video_url
else:
# Create new theme
theme = AnimeTheme(
anime_id=anime.id,
theme_type=theme_type,
sequence=sequence,
song_title=song_title,
artist=artist,
animethemes_video_url=video_url,
)
db.add(theme)
if anime.themes is None:
anime.themes = []
anime.themes.append(theme)
# Add to existing_themes to avoid duplicates from API
existing_themes[key] = theme
2026-01-10 11:06:45 +03:00
await db.commit()
# Reload anime with themes to get fresh data
result = await db.execute(
select(Anime)
.where(Anime.id == anime.id)
.options(selectinload(Anime.themes))
)
refreshed_anime = result.scalar_one()
return refreshed_anime.themes