Files
anime-qize/backend/app/video_generator.py

550 lines
23 KiB
Python
Raw Normal View History

2025-12-30 17:37:14 +03:00
import uuid
import subprocess
import tempfile
from pathlib import Path
from typing import Optional
from .config import settings
from .storage import storage
from .models import VideoMode, QuizItem, GenerateRequest
class VideoGenerator:
"""FFmpeg-based video generator for anime quiz videos."""
FONT_PATH = "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf"
def __init__(self, request: GenerateRequest):
self.request = request
self.mode = request.mode
self.questions = request.questions
self.audio_duration = request.audio_duration
self.continue_audio = request.continue_audio
if self.mode == VideoMode.SHORTS:
self.width = settings.shorts_width
self.height = settings.shorts_height
else:
self.width = settings.full_width
self.height = settings.full_height
self.fps = 30
self.temp_dir = Path(tempfile.mkdtemp(prefix="quiz_"))
self.temp_files: list[Path] = []
def _run_ffmpeg(self, args: list[str], check: bool = True) -> subprocess.CompletedProcess:
"""Run FFmpeg command with given arguments."""
cmd = ["ffmpeg", "-y", "-hide_banner", "-loglevel", "error"] + args
return subprocess.run(cmd, capture_output=True, text=True, check=check)
def _get_temp_path(self, suffix: str = ".mp4") -> Path:
"""Generate a temporary file path."""
path = self.temp_dir / f"temp_{uuid.uuid4().hex[:8]}{suffix}"
self.temp_files.append(path)
return path
def _escape_text(self, text: str) -> str:
"""Escape special characters for FFmpeg drawtext filter."""
text = text.replace("\\", "\\\\")
text = text.replace("'", "'\\''")
text = text.replace(":", "\\:")
text = text.replace("%", "\\%")
return text
def _get_background_path(self) -> Path:
"""Get background video path from S3 or generate solid color fallback."""
if self.request.background_video:
bg_path = storage.get_background_file(self.request.background_video)
if bg_path and bg_path.exists():
return bg_path
# Get first available background
backgrounds = storage.list_background_videos()
if backgrounds:
bg_path = storage.get_background_file(backgrounds[0])
if bg_path and bg_path.exists():
return bg_path
# Create a solid color fallback background
return self._create_solid_background()
def _create_solid_background(self) -> Path:
"""Create a solid color background video as fallback."""
output_path = self._get_temp_path(suffix="_bg.mp4")
# Create 10 second loop of dark gradient background
args = [
"-f", "lavfi",
"-i", f"color=c=0x1a1a2e:s={self.width}x{self.height}:d=10:r={self.fps}",
"-c:v", "libx264",
"-preset", "ultrafast",
"-crf", "23",
str(output_path)
]
result = self._run_ffmpeg(args, check=False)
if result.returncode != 0:
raise RuntimeError(f"FFmpeg error creating solid background: {result.stderr}")
return output_path
def _get_difficulty_color(self, difficulty: str) -> str:
"""Get color for difficulty badge."""
colors = {
"easy": "green",
"medium": "orange",
"hard": "red"
}
return colors.get(difficulty.lower(), "white")
def _create_question_scene(self, question: QuizItem, question_num: int) -> Path:
"""Create the question scene with audio and countdown."""
output_path = self._get_temp_path()
scene_duration = self.audio_duration + settings.audio_buffer
bg_path = self._get_background_path()
audio_path = storage.get_audio_file(question.opening_file)
if not audio_path:
raise RuntimeError(f"Audio file not found: {question.opening_file}")
# Font sizes based on mode
title_fontsize = 72 if self.mode == VideoMode.SHORTS else 56
diff_fontsize = 56 if self.mode == VideoMode.SHORTS else 42
countdown_fontsize = 120 if self.mode == VideoMode.SHORTS else 80
# Escape texts
question_text = self._escape_text(f"#{question_num}")
subtitle_text = self._escape_text("Guess the Anime Opening")
difficulty_text = self._escape_text(question.difficulty.upper())
diff_color = self._get_difficulty_color(question.difficulty)
# Calculate positions
title_y = int(self.height * 0.12)
subtitle_y = int(self.height * 0.20)
diff_y = int(self.height * 0.35)
countdown_y = int(self.height * 0.70)
# Build video filter
video_filter = f"""
[0:v]scale={self.width}:{self.height}:force_original_aspect_ratio=increase,
crop={self.width}:{self.height},
setsar=1,
fps={self.fps}[bg];
[bg]drawtext=fontfile={self.FONT_PATH}:text='{question_text}':fontsize={title_fontsize}:fontcolor=yellow:borderw=3:bordercolor=black:x=(w-tw)/2:y={title_y},
drawtext=fontfile={self.FONT_PATH}:text='{subtitle_text}':fontsize={title_fontsize}:fontcolor=yellow:borderw=3:bordercolor=black:x=(w-tw)/2:y={subtitle_y},
drawtext=fontfile={self.FONT_PATH}:text='{difficulty_text}':fontsize={diff_fontsize}:fontcolor={diff_color}:borderw=2:bordercolor=black:x=(w-tw)/2:y={diff_y},
drawtext=fontfile={self.FONT_PATH}:text='%{{eif\\:{int(self.audio_duration)}-floor(t)\\:d}}':fontsize={countdown_fontsize}:fontcolor=yellow:borderw=3:bordercolor=black:x=(w-tw)/2:y={countdown_y}:enable='lt(t,{int(self.audio_duration)})'[v]
""".replace("\n", "").strip()
# Build audio filter with fade in and optional fade out
audio_fade_out_start = scene_duration - settings.audio_fade_duration
if self.continue_audio:
audio_filter = f"[1:a]afade=t=in:d={settings.audio_fade_duration}[a]"
else:
audio_filter = f"[1:a]afade=t=in:d={settings.audio_fade_duration},afade=t=out:st={audio_fade_out_start}:d={settings.audio_fade_duration}[a]"
# Build FFmpeg command
args = [
"-stream_loop", "-1",
"-i", str(bg_path),
"-ss", str(question.start_time),
"-t", str(scene_duration),
"-i", str(audio_path),
"-filter_complex", f"{video_filter};{audio_filter}",
"-map", "[v]",
"-map", "[a]",
"-t", str(scene_duration),
"-c:v", "libx264",
"-preset", "medium",
"-crf", "23",
"-c:a", "aac",
"-b:a", "192k",
str(output_path)
]
result = self._run_ffmpeg(args, check=False)
if result.returncode != 0:
raise RuntimeError(f"FFmpeg error in question scene: {result.stderr}")
return output_path
def _get_transition_sound_path(self) -> Optional[Path]:
"""Get transition sound path from S3."""
if self.request.transition_sound:
return storage.get_transition_file(self.request.transition_sound)
return None
def _create_answer_scene(self, question: QuizItem) -> Path:
"""Create the answer reveal scene with continuing audio."""
output_path = self._get_temp_path()
duration = settings.answer_duration
bg_path = self._get_background_path()
audio_path = storage.get_audio_file(question.opening_file)
transition_path = self._get_transition_sound_path()
if not audio_path:
raise RuntimeError(f"Audio file not found: {question.opening_file}")
# Calculate audio start position based on continue_audio setting
if self.continue_audio:
question_scene_duration = self.audio_duration + settings.audio_buffer
audio_start = question.start_time + question_scene_duration
else:
audio_start = question.start_time
audio_fade_out_start = duration - settings.audio_fade_duration
# Font sizes based on mode
answer_fontsize = 64 if self.mode == VideoMode.SHORTS else 48
label_fontsize = 48 if self.mode == VideoMode.SHORTS else 36
# Escape texts
label_text = self._escape_text("Anime:")
anime_text = self._escape_text(question.anime)
# Calculate positions
label_y = int(self.height * 0.25)
anime_y = int(self.height * 0.32)
# Check for poster from S3
poster_path = None
if question.poster:
poster_path = storage.get_poster_file(question.poster)
# Build audio filter - no fade in if continuing from question scene
if self.continue_audio:
base_audio_filter = f"[1:a]afade=t=out:st={audio_fade_out_start}:d={settings.audio_fade_duration}"
else:
base_audio_filter = f"[1:a]afade=t=in:d={settings.audio_fade_duration},afade=t=out:st={audio_fade_out_start}:d={settings.audio_fade_duration}"
# Build inputs and audio filter based on whether we have transition sound
if poster_path:
if transition_path:
audio_filter = f"{base_audio_filter}[music];[2:a]anull[sfx];[music][sfx]amix=inputs=2:duration=longest[a]"
inputs = [
"-loop", "1", "-i", str(poster_path),
"-ss", str(audio_start), "-t", str(duration), "-i", str(audio_path),
"-i", str(transition_path),
]
else:
audio_filter = f"{base_audio_filter}[a]"
inputs = [
"-loop", "1", "-i", str(poster_path),
"-ss", str(audio_start), "-t", str(duration), "-i", str(audio_path),
]
video_filter = f"""
[0:v]scale={self.width}:{self.height}:force_original_aspect_ratio=increase,
crop={self.width}:{self.height},
setsar=1,
fps={self.fps},
drawtext=fontfile={self.FONT_PATH}:text='{label_text}':fontsize={label_fontsize}:fontcolor=cyan:borderw=2:bordercolor=black:x=(w-tw)/2:y={label_y},
drawtext=fontfile={self.FONT_PATH}:text='{anime_text}':fontsize={answer_fontsize}:fontcolor=cyan:borderw=3:bordercolor=black:x=(w-tw)/2:y={anime_y},
fade=t=in:d=0.3[v];
{audio_filter}
""".replace("\n", "").strip()
else:
if transition_path:
audio_filter = f"{base_audio_filter}[music];[2:a]anull[sfx];[music][sfx]amix=inputs=2:duration=longest[a]"
inputs = [
"-stream_loop", "-1", "-i", str(bg_path),
"-ss", str(audio_start), "-t", str(duration), "-i", str(audio_path),
"-i", str(transition_path),
]
else:
audio_filter = f"{base_audio_filter}[a]"
inputs = [
"-stream_loop", "-1", "-i", str(bg_path),
"-ss", str(audio_start), "-t", str(duration), "-i", str(audio_path),
]
video_filter = f"""
[0:v]scale={self.width}:{self.height}:force_original_aspect_ratio=increase,
crop={self.width}:{self.height},
setsar=1,
fps={self.fps},
drawtext=fontfile={self.FONT_PATH}:text='{label_text}':fontsize={label_fontsize}:fontcolor=cyan:borderw=2:bordercolor=black:x=(w-tw)/2:y={label_y},
drawtext=fontfile={self.FONT_PATH}:text='{anime_text}':fontsize={answer_fontsize}:fontcolor=cyan:borderw=3:bordercolor=black:x=(w-tw)/2:y={anime_y},
fade=t=in:d=0.3[v];
{audio_filter}
""".replace("\n", "").strip()
args = inputs + [
"-filter_complex", video_filter,
"-map", "[v]",
"-map", "[a]",
"-t", str(duration),
"-c:v", "libx264",
"-preset", "medium",
"-crf", "23",
"-c:a", "aac",
"-b:a", "192k",
str(output_path)
]
result = self._run_ffmpeg(args, check=False)
if result.returncode != 0:
raise RuntimeError(f"FFmpeg error in answer scene: {result.stderr}")
return output_path
def _create_combined_scene(self, question: QuizItem, question_num: int) -> Path:
"""Create combined question + answer scene with continuous audio."""
output_path = self._get_temp_path()
question_duration = self.audio_duration + settings.audio_buffer
answer_duration = settings.answer_duration
total_duration = question_duration + answer_duration
bg_path = self._get_background_path()
audio_path = storage.get_audio_file(question.opening_file)
if not audio_path:
raise RuntimeError(f"Audio file not found: {question.opening_file}")
# Font sizes based on mode
title_fontsize = 72 if self.mode == VideoMode.SHORTS else 56
diff_fontsize = 56 if self.mode == VideoMode.SHORTS else 42
countdown_fontsize = 120 if self.mode == VideoMode.SHORTS else 80
answer_fontsize = 64 if self.mode == VideoMode.SHORTS else 48
label_fontsize = 48 if self.mode == VideoMode.SHORTS else 36
# Escape texts
question_text = self._escape_text(f"#{question_num}")
subtitle_text = self._escape_text("Guess the Anime Opening")
difficulty_text = self._escape_text(question.difficulty.upper())
diff_color = self._get_difficulty_color(question.difficulty)
label_text = self._escape_text("Anime:")
anime_text = self._escape_text(question.anime)
# Calculate positions
title_y = int(self.height * 0.12)
subtitle_y = int(self.height * 0.20)
diff_y = int(self.height * 0.35)
countdown_y = int(self.height * 0.70)
label_y = int(self.height * 0.25)
anime_y = int(self.height * 0.32)
# Check for poster
poster_path = None
if question.poster:
poster_path = storage.get_poster_file(question.poster)
# Audio filter - fade in at start, fade out at end
audio_fade_out_start = total_duration - settings.audio_fade_duration
audio_filter = f"[a_in]afade=t=in:d={settings.audio_fade_duration},afade=t=out:st={audio_fade_out_start}:d={settings.audio_fade_duration}[a]"
if poster_path and poster_path.exists():
# Build filter with poster for answer phase
# Question phase: show background with countdown (0 to question_duration)
# Answer phase: show poster with anime title (question_duration to total_duration)
video_filter = f"""
[0:v]scale={self.width}:{self.height}:force_original_aspect_ratio=increase,crop={self.width}:{self.height},setsar=1,fps={self.fps},
drawtext=fontfile={self.FONT_PATH}:text='{question_text}':fontsize={title_fontsize}:fontcolor=yellow:borderw=3:bordercolor=black:x=(w-tw)/2:y={title_y}:enable='lt(t,{question_duration})',
drawtext=fontfile={self.FONT_PATH}:text='{subtitle_text}':fontsize={title_fontsize}:fontcolor=yellow:borderw=3:bordercolor=black:x=(w-tw)/2:y={subtitle_y}:enable='lt(t,{question_duration})',
drawtext=fontfile={self.FONT_PATH}:text='{difficulty_text}':fontsize={diff_fontsize}:fontcolor={diff_color}:borderw=2:bordercolor=black:x=(w-tw)/2:y={diff_y}:enable='lt(t,{question_duration})',
drawtext=fontfile={self.FONT_PATH}:text='%{{eif\\:{int(self.audio_duration)}-floor(t)\\:d}}':fontsize={countdown_fontsize}:fontcolor=yellow:borderw=3:bordercolor=black:x=(w-tw)/2:y={countdown_y}:enable='lt(t,{int(self.audio_duration)})'[bg_out];
[2:v]scale={self.width}:{self.height}:force_original_aspect_ratio=increase,crop={self.width}:{self.height},setsar=1,fps={self.fps},
drawtext=fontfile={self.FONT_PATH}:text='{label_text}':fontsize={label_fontsize}:fontcolor=cyan:borderw=2:bordercolor=black:x=(w-tw)/2:y={label_y},
drawtext=fontfile={self.FONT_PATH}:text='{anime_text}':fontsize={answer_fontsize}:fontcolor=cyan:borderw=3:bordercolor=black:x=(w-tw)/2:y={anime_y}[poster_out];
[bg_out][poster_out]overlay=enable='gte(t,{question_duration})':shortest=1[v];
[1:a]anull[a_in];
{audio_filter}
""".replace("\n", "").strip()
args = [
"-stream_loop", "-1",
"-i", str(bg_path),
"-ss", str(question.start_time),
"-t", str(total_duration),
"-i", str(audio_path),
"-loop", "1",
"-i", str(poster_path),
"-filter_complex", video_filter,
"-map", "[v]",
"-map", "[a]",
"-t", str(total_duration),
"-c:v", "libx264",
"-preset", "medium",
"-crf", "23",
"-c:a", "aac",
"-b:a", "192k",
str(output_path)
]
else:
# No poster - just use background for both phases
video_filter = f"""
[0:v]scale={self.width}:{self.height}:force_original_aspect_ratio=increase,crop={self.width}:{self.height},setsar=1,fps={self.fps},
drawtext=fontfile={self.FONT_PATH}:text='{question_text}':fontsize={title_fontsize}:fontcolor=yellow:borderw=3:bordercolor=black:x=(w-tw)/2:y={title_y}:enable='lt(t,{question_duration})',
drawtext=fontfile={self.FONT_PATH}:text='{subtitle_text}':fontsize={title_fontsize}:fontcolor=yellow:borderw=3:bordercolor=black:x=(w-tw)/2:y={subtitle_y}:enable='lt(t,{question_duration})',
drawtext=fontfile={self.FONT_PATH}:text='{difficulty_text}':fontsize={diff_fontsize}:fontcolor={diff_color}:borderw=2:bordercolor=black:x=(w-tw)/2:y={diff_y}:enable='lt(t,{question_duration})',
drawtext=fontfile={self.FONT_PATH}:text='%{{eif\\:{int(self.audio_duration)}-floor(t)\\:d}}':fontsize={countdown_fontsize}:fontcolor=yellow:borderw=3:bordercolor=black:x=(w-tw)/2:y={countdown_y}:enable='lt(t,{int(self.audio_duration)})',
drawtext=fontfile={self.FONT_PATH}:text='{label_text}':fontsize={label_fontsize}:fontcolor=cyan:borderw=2:bordercolor=black:x=(w-tw)/2:y={label_y}:enable='gte(t,{question_duration})',
drawtext=fontfile={self.FONT_PATH}:text='{anime_text}':fontsize={answer_fontsize}:fontcolor=cyan:borderw=3:bordercolor=black:x=(w-tw)/2:y={anime_y}:enable='gte(t,{question_duration})'[v];
[1:a]anull[a_in];
{audio_filter}
""".replace("\n", "").strip()
args = [
"-stream_loop", "-1",
"-i", str(bg_path),
"-ss", str(question.start_time),
"-t", str(total_duration),
"-i", str(audio_path),
"-filter_complex", video_filter,
"-map", "[v]",
"-map", "[a]",
"-t", str(total_duration),
"-c:v", "libx264",
"-preset", "medium",
"-crf", "23",
"-c:a", "aac",
"-b:a", "192k",
str(output_path)
]
result = self._run_ffmpeg(args, check=False)
if result.returncode != 0:
raise RuntimeError(f"FFmpeg error in combined scene: {result.stderr}")
return output_path
def _create_final_screen(self) -> Path:
"""Create final CTA screen for full video mode."""
output_path = self._get_temp_path()
duration = settings.final_screen_duration
bg_path = self._get_background_path()
# Escape texts
title_text = self._escape_text("How many did you guess?")
cta_text = self._escape_text("Subscribe for more anime quizzes!")
# Calculate positions
title_y = int(self.height * 0.35)
cta_y = int(self.height * 0.55)
video_filter = f"""
[0:v]scale={self.width}:{self.height}:force_original_aspect_ratio=increase,
crop={self.width}:{self.height},
setsar=1,
fps={self.fps},
drawtext=fontfile={self.FONT_PATH}:text='{title_text}':fontsize=56:fontcolor=yellow:borderw=3:bordercolor=black:x=(w-tw)/2:y={title_y},
drawtext=fontfile={self.FONT_PATH}:text='{cta_text}':fontsize=40:fontcolor=white:borderw=2:bordercolor=black:x=(w-tw)/2:y={cta_y},
fade=t=in:d=0.3,
fade=t=out:st={duration - 0.5}:d=0.5[v]
""".replace("\n", "").strip()
args = [
"-stream_loop", "-1",
"-i", str(bg_path),
"-filter_complex", video_filter,
"-map", "[v]",
"-t", str(duration),
"-c:v", "libx264",
"-preset", "medium",
"-crf", "23",
"-an",
str(output_path)
]
result = self._run_ffmpeg(args, check=False)
if result.returncode != 0:
raise RuntimeError(f"FFmpeg error in final screen: {result.stderr}")
return output_path
def _concatenate_scenes(self, scene_files: list[Path]) -> Path:
"""Concatenate all scenes into final video."""
output_filename = f"quiz_{self.mode.value}_{uuid.uuid4().hex[:8]}.mp4"
output_path = settings.output_path / output_filename
# Create concat list file
concat_file = self._get_temp_path(suffix=".txt")
with open(concat_file, "w") as f:
for scene in scene_files:
f.write(f"file '{scene}'\n")
# Re-encode for consistent output
args = [
"-f", "concat",
"-safe", "0",
"-i", str(concat_file),
"-c:v", "libx264",
"-preset", "medium",
"-crf", "23",
"-c:a", "aac",
"-b:a", "192k",
"-movflags", "+faststart",
str(output_path)
]
result = self._run_ffmpeg(args, check=False)
if result.returncode != 0:
raise RuntimeError(f"FFmpeg error in concatenation: {result.stderr}")
return output_path
def _cleanup(self):
"""Remove temporary files and directory."""
for path in self.temp_files:
try:
if path.exists():
path.unlink()
except Exception:
pass
try:
if self.temp_dir.exists():
self.temp_dir.rmdir()
except Exception:
pass
def generate(self) -> Path:
"""Generate the complete quiz video."""
try:
scene_files = []
for i, question in enumerate(self.questions, 1):
if self.continue_audio:
# Create combined scene with continuous audio
combined_scene = self._create_combined_scene(question, i)
scene_files.append(combined_scene)
else:
# Question scene
q_scene = self._create_question_scene(question, i)
scene_files.append(q_scene)
# Answer scene
a_scene = self._create_answer_scene(question)
scene_files.append(a_scene)
# Final screen for full video mode
if self.mode == VideoMode.FULL:
final = self._create_final_screen()
scene_files.append(final)
# Concatenate all scenes
output_path = self._concatenate_scenes(scene_files)
return output_path
finally:
self._cleanup()
def check_ffmpeg() -> bool:
"""Check if FFmpeg is available."""
try:
result = subprocess.run(
["ffmpeg", "-version"],
capture_output=True,
text=True,
timeout=5,
)
return result.returncode == 0
except Exception:
return False