Files
anime-qize/backend/app/video_generator.py
2025-12-30 17:37:14 +03:00

550 lines
23 KiB
Python

import uuid
import subprocess
import tempfile
from pathlib import Path
from typing import Optional
from .config import settings
from .storage import storage
from .models import VideoMode, QuizItem, GenerateRequest
class VideoGenerator:
"""FFmpeg-based video generator for anime quiz videos."""
FONT_PATH = "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf"
def __init__(self, request: GenerateRequest):
self.request = request
self.mode = request.mode
self.questions = request.questions
self.audio_duration = request.audio_duration
self.continue_audio = request.continue_audio
if self.mode == VideoMode.SHORTS:
self.width = settings.shorts_width
self.height = settings.shorts_height
else:
self.width = settings.full_width
self.height = settings.full_height
self.fps = 30
self.temp_dir = Path(tempfile.mkdtemp(prefix="quiz_"))
self.temp_files: list[Path] = []
def _run_ffmpeg(self, args: list[str], check: bool = True) -> subprocess.CompletedProcess:
"""Run FFmpeg command with given arguments."""
cmd = ["ffmpeg", "-y", "-hide_banner", "-loglevel", "error"] + args
return subprocess.run(cmd, capture_output=True, text=True, check=check)
def _get_temp_path(self, suffix: str = ".mp4") -> Path:
"""Generate a temporary file path."""
path = self.temp_dir / f"temp_{uuid.uuid4().hex[:8]}{suffix}"
self.temp_files.append(path)
return path
def _escape_text(self, text: str) -> str:
"""Escape special characters for FFmpeg drawtext filter."""
text = text.replace("\\", "\\\\")
text = text.replace("'", "'\\''")
text = text.replace(":", "\\:")
text = text.replace("%", "\\%")
return text
def _get_background_path(self) -> Path:
"""Get background video path from S3 or generate solid color fallback."""
if self.request.background_video:
bg_path = storage.get_background_file(self.request.background_video)
if bg_path and bg_path.exists():
return bg_path
# Get first available background
backgrounds = storage.list_background_videos()
if backgrounds:
bg_path = storage.get_background_file(backgrounds[0])
if bg_path and bg_path.exists():
return bg_path
# Create a solid color fallback background
return self._create_solid_background()
def _create_solid_background(self) -> Path:
"""Create a solid color background video as fallback."""
output_path = self._get_temp_path(suffix="_bg.mp4")
# Create 10 second loop of dark gradient background
args = [
"-f", "lavfi",
"-i", f"color=c=0x1a1a2e:s={self.width}x{self.height}:d=10:r={self.fps}",
"-c:v", "libx264",
"-preset", "ultrafast",
"-crf", "23",
str(output_path)
]
result = self._run_ffmpeg(args, check=False)
if result.returncode != 0:
raise RuntimeError(f"FFmpeg error creating solid background: {result.stderr}")
return output_path
def _get_difficulty_color(self, difficulty: str) -> str:
"""Get color for difficulty badge."""
colors = {
"easy": "green",
"medium": "orange",
"hard": "red"
}
return colors.get(difficulty.lower(), "white")
def _create_question_scene(self, question: QuizItem, question_num: int) -> Path:
"""Create the question scene with audio and countdown."""
output_path = self._get_temp_path()
scene_duration = self.audio_duration + settings.audio_buffer
bg_path = self._get_background_path()
audio_path = storage.get_audio_file(question.opening_file)
if not audio_path:
raise RuntimeError(f"Audio file not found: {question.opening_file}")
# Font sizes based on mode
title_fontsize = 72 if self.mode == VideoMode.SHORTS else 56
diff_fontsize = 56 if self.mode == VideoMode.SHORTS else 42
countdown_fontsize = 120 if self.mode == VideoMode.SHORTS else 80
# Escape texts
question_text = self._escape_text(f"#{question_num}")
subtitle_text = self._escape_text("Guess the Anime Opening")
difficulty_text = self._escape_text(question.difficulty.upper())
diff_color = self._get_difficulty_color(question.difficulty)
# Calculate positions
title_y = int(self.height * 0.12)
subtitle_y = int(self.height * 0.20)
diff_y = int(self.height * 0.35)
countdown_y = int(self.height * 0.70)
# Build video filter
video_filter = f"""
[0:v]scale={self.width}:{self.height}:force_original_aspect_ratio=increase,
crop={self.width}:{self.height},
setsar=1,
fps={self.fps}[bg];
[bg]drawtext=fontfile={self.FONT_PATH}:text='{question_text}':fontsize={title_fontsize}:fontcolor=yellow:borderw=3:bordercolor=black:x=(w-tw)/2:y={title_y},
drawtext=fontfile={self.FONT_PATH}:text='{subtitle_text}':fontsize={title_fontsize}:fontcolor=yellow:borderw=3:bordercolor=black:x=(w-tw)/2:y={subtitle_y},
drawtext=fontfile={self.FONT_PATH}:text='{difficulty_text}':fontsize={diff_fontsize}:fontcolor={diff_color}:borderw=2:bordercolor=black:x=(w-tw)/2:y={diff_y},
drawtext=fontfile={self.FONT_PATH}:text='%{{eif\\:{int(self.audio_duration)}-floor(t)\\:d}}':fontsize={countdown_fontsize}:fontcolor=yellow:borderw=3:bordercolor=black:x=(w-tw)/2:y={countdown_y}:enable='lt(t,{int(self.audio_duration)})'[v]
""".replace("\n", "").strip()
# Build audio filter with fade in and optional fade out
audio_fade_out_start = scene_duration - settings.audio_fade_duration
if self.continue_audio:
audio_filter = f"[1:a]afade=t=in:d={settings.audio_fade_duration}[a]"
else:
audio_filter = f"[1:a]afade=t=in:d={settings.audio_fade_duration},afade=t=out:st={audio_fade_out_start}:d={settings.audio_fade_duration}[a]"
# Build FFmpeg command
args = [
"-stream_loop", "-1",
"-i", str(bg_path),
"-ss", str(question.start_time),
"-t", str(scene_duration),
"-i", str(audio_path),
"-filter_complex", f"{video_filter};{audio_filter}",
"-map", "[v]",
"-map", "[a]",
"-t", str(scene_duration),
"-c:v", "libx264",
"-preset", "medium",
"-crf", "23",
"-c:a", "aac",
"-b:a", "192k",
str(output_path)
]
result = self._run_ffmpeg(args, check=False)
if result.returncode != 0:
raise RuntimeError(f"FFmpeg error in question scene: {result.stderr}")
return output_path
def _get_transition_sound_path(self) -> Optional[Path]:
"""Get transition sound path from S3."""
if self.request.transition_sound:
return storage.get_transition_file(self.request.transition_sound)
return None
def _create_answer_scene(self, question: QuizItem) -> Path:
"""Create the answer reveal scene with continuing audio."""
output_path = self._get_temp_path()
duration = settings.answer_duration
bg_path = self._get_background_path()
audio_path = storage.get_audio_file(question.opening_file)
transition_path = self._get_transition_sound_path()
if not audio_path:
raise RuntimeError(f"Audio file not found: {question.opening_file}")
# Calculate audio start position based on continue_audio setting
if self.continue_audio:
question_scene_duration = self.audio_duration + settings.audio_buffer
audio_start = question.start_time + question_scene_duration
else:
audio_start = question.start_time
audio_fade_out_start = duration - settings.audio_fade_duration
# Font sizes based on mode
answer_fontsize = 64 if self.mode == VideoMode.SHORTS else 48
label_fontsize = 48 if self.mode == VideoMode.SHORTS else 36
# Escape texts
label_text = self._escape_text("Anime:")
anime_text = self._escape_text(question.anime)
# Calculate positions
label_y = int(self.height * 0.25)
anime_y = int(self.height * 0.32)
# Check for poster from S3
poster_path = None
if question.poster:
poster_path = storage.get_poster_file(question.poster)
# Build audio filter - no fade in if continuing from question scene
if self.continue_audio:
base_audio_filter = f"[1:a]afade=t=out:st={audio_fade_out_start}:d={settings.audio_fade_duration}"
else:
base_audio_filter = f"[1:a]afade=t=in:d={settings.audio_fade_duration},afade=t=out:st={audio_fade_out_start}:d={settings.audio_fade_duration}"
# Build inputs and audio filter based on whether we have transition sound
if poster_path:
if transition_path:
audio_filter = f"{base_audio_filter}[music];[2:a]anull[sfx];[music][sfx]amix=inputs=2:duration=longest[a]"
inputs = [
"-loop", "1", "-i", str(poster_path),
"-ss", str(audio_start), "-t", str(duration), "-i", str(audio_path),
"-i", str(transition_path),
]
else:
audio_filter = f"{base_audio_filter}[a]"
inputs = [
"-loop", "1", "-i", str(poster_path),
"-ss", str(audio_start), "-t", str(duration), "-i", str(audio_path),
]
video_filter = f"""
[0:v]scale={self.width}:{self.height}:force_original_aspect_ratio=increase,
crop={self.width}:{self.height},
setsar=1,
fps={self.fps},
drawtext=fontfile={self.FONT_PATH}:text='{label_text}':fontsize={label_fontsize}:fontcolor=cyan:borderw=2:bordercolor=black:x=(w-tw)/2:y={label_y},
drawtext=fontfile={self.FONT_PATH}:text='{anime_text}':fontsize={answer_fontsize}:fontcolor=cyan:borderw=3:bordercolor=black:x=(w-tw)/2:y={anime_y},
fade=t=in:d=0.3[v];
{audio_filter}
""".replace("\n", "").strip()
else:
if transition_path:
audio_filter = f"{base_audio_filter}[music];[2:a]anull[sfx];[music][sfx]amix=inputs=2:duration=longest[a]"
inputs = [
"-stream_loop", "-1", "-i", str(bg_path),
"-ss", str(audio_start), "-t", str(duration), "-i", str(audio_path),
"-i", str(transition_path),
]
else:
audio_filter = f"{base_audio_filter}[a]"
inputs = [
"-stream_loop", "-1", "-i", str(bg_path),
"-ss", str(audio_start), "-t", str(duration), "-i", str(audio_path),
]
video_filter = f"""
[0:v]scale={self.width}:{self.height}:force_original_aspect_ratio=increase,
crop={self.width}:{self.height},
setsar=1,
fps={self.fps},
drawtext=fontfile={self.FONT_PATH}:text='{label_text}':fontsize={label_fontsize}:fontcolor=cyan:borderw=2:bordercolor=black:x=(w-tw)/2:y={label_y},
drawtext=fontfile={self.FONT_PATH}:text='{anime_text}':fontsize={answer_fontsize}:fontcolor=cyan:borderw=3:bordercolor=black:x=(w-tw)/2:y={anime_y},
fade=t=in:d=0.3[v];
{audio_filter}
""".replace("\n", "").strip()
args = inputs + [
"-filter_complex", video_filter,
"-map", "[v]",
"-map", "[a]",
"-t", str(duration),
"-c:v", "libx264",
"-preset", "medium",
"-crf", "23",
"-c:a", "aac",
"-b:a", "192k",
str(output_path)
]
result = self._run_ffmpeg(args, check=False)
if result.returncode != 0:
raise RuntimeError(f"FFmpeg error in answer scene: {result.stderr}")
return output_path
def _create_combined_scene(self, question: QuizItem, question_num: int) -> Path:
"""Create combined question + answer scene with continuous audio."""
output_path = self._get_temp_path()
question_duration = self.audio_duration + settings.audio_buffer
answer_duration = settings.answer_duration
total_duration = question_duration + answer_duration
bg_path = self._get_background_path()
audio_path = storage.get_audio_file(question.opening_file)
if not audio_path:
raise RuntimeError(f"Audio file not found: {question.opening_file}")
# Font sizes based on mode
title_fontsize = 72 if self.mode == VideoMode.SHORTS else 56
diff_fontsize = 56 if self.mode == VideoMode.SHORTS else 42
countdown_fontsize = 120 if self.mode == VideoMode.SHORTS else 80
answer_fontsize = 64 if self.mode == VideoMode.SHORTS else 48
label_fontsize = 48 if self.mode == VideoMode.SHORTS else 36
# Escape texts
question_text = self._escape_text(f"#{question_num}")
subtitle_text = self._escape_text("Guess the Anime Opening")
difficulty_text = self._escape_text(question.difficulty.upper())
diff_color = self._get_difficulty_color(question.difficulty)
label_text = self._escape_text("Anime:")
anime_text = self._escape_text(question.anime)
# Calculate positions
title_y = int(self.height * 0.12)
subtitle_y = int(self.height * 0.20)
diff_y = int(self.height * 0.35)
countdown_y = int(self.height * 0.70)
label_y = int(self.height * 0.25)
anime_y = int(self.height * 0.32)
# Check for poster
poster_path = None
if question.poster:
poster_path = storage.get_poster_file(question.poster)
# Audio filter - fade in at start, fade out at end
audio_fade_out_start = total_duration - settings.audio_fade_duration
audio_filter = f"[a_in]afade=t=in:d={settings.audio_fade_duration},afade=t=out:st={audio_fade_out_start}:d={settings.audio_fade_duration}[a]"
if poster_path and poster_path.exists():
# Build filter with poster for answer phase
# Question phase: show background with countdown (0 to question_duration)
# Answer phase: show poster with anime title (question_duration to total_duration)
video_filter = f"""
[0:v]scale={self.width}:{self.height}:force_original_aspect_ratio=increase,crop={self.width}:{self.height},setsar=1,fps={self.fps},
drawtext=fontfile={self.FONT_PATH}:text='{question_text}':fontsize={title_fontsize}:fontcolor=yellow:borderw=3:bordercolor=black:x=(w-tw)/2:y={title_y}:enable='lt(t,{question_duration})',
drawtext=fontfile={self.FONT_PATH}:text='{subtitle_text}':fontsize={title_fontsize}:fontcolor=yellow:borderw=3:bordercolor=black:x=(w-tw)/2:y={subtitle_y}:enable='lt(t,{question_duration})',
drawtext=fontfile={self.FONT_PATH}:text='{difficulty_text}':fontsize={diff_fontsize}:fontcolor={diff_color}:borderw=2:bordercolor=black:x=(w-tw)/2:y={diff_y}:enable='lt(t,{question_duration})',
drawtext=fontfile={self.FONT_PATH}:text='%{{eif\\:{int(self.audio_duration)}-floor(t)\\:d}}':fontsize={countdown_fontsize}:fontcolor=yellow:borderw=3:bordercolor=black:x=(w-tw)/2:y={countdown_y}:enable='lt(t,{int(self.audio_duration)})'[bg_out];
[2:v]scale={self.width}:{self.height}:force_original_aspect_ratio=increase,crop={self.width}:{self.height},setsar=1,fps={self.fps},
drawtext=fontfile={self.FONT_PATH}:text='{label_text}':fontsize={label_fontsize}:fontcolor=cyan:borderw=2:bordercolor=black:x=(w-tw)/2:y={label_y},
drawtext=fontfile={self.FONT_PATH}:text='{anime_text}':fontsize={answer_fontsize}:fontcolor=cyan:borderw=3:bordercolor=black:x=(w-tw)/2:y={anime_y}[poster_out];
[bg_out][poster_out]overlay=enable='gte(t,{question_duration})':shortest=1[v];
[1:a]anull[a_in];
{audio_filter}
""".replace("\n", "").strip()
args = [
"-stream_loop", "-1",
"-i", str(bg_path),
"-ss", str(question.start_time),
"-t", str(total_duration),
"-i", str(audio_path),
"-loop", "1",
"-i", str(poster_path),
"-filter_complex", video_filter,
"-map", "[v]",
"-map", "[a]",
"-t", str(total_duration),
"-c:v", "libx264",
"-preset", "medium",
"-crf", "23",
"-c:a", "aac",
"-b:a", "192k",
str(output_path)
]
else:
# No poster - just use background for both phases
video_filter = f"""
[0:v]scale={self.width}:{self.height}:force_original_aspect_ratio=increase,crop={self.width}:{self.height},setsar=1,fps={self.fps},
drawtext=fontfile={self.FONT_PATH}:text='{question_text}':fontsize={title_fontsize}:fontcolor=yellow:borderw=3:bordercolor=black:x=(w-tw)/2:y={title_y}:enable='lt(t,{question_duration})',
drawtext=fontfile={self.FONT_PATH}:text='{subtitle_text}':fontsize={title_fontsize}:fontcolor=yellow:borderw=3:bordercolor=black:x=(w-tw)/2:y={subtitle_y}:enable='lt(t,{question_duration})',
drawtext=fontfile={self.FONT_PATH}:text='{difficulty_text}':fontsize={diff_fontsize}:fontcolor={diff_color}:borderw=2:bordercolor=black:x=(w-tw)/2:y={diff_y}:enable='lt(t,{question_duration})',
drawtext=fontfile={self.FONT_PATH}:text='%{{eif\\:{int(self.audio_duration)}-floor(t)\\:d}}':fontsize={countdown_fontsize}:fontcolor=yellow:borderw=3:bordercolor=black:x=(w-tw)/2:y={countdown_y}:enable='lt(t,{int(self.audio_duration)})',
drawtext=fontfile={self.FONT_PATH}:text='{label_text}':fontsize={label_fontsize}:fontcolor=cyan:borderw=2:bordercolor=black:x=(w-tw)/2:y={label_y}:enable='gte(t,{question_duration})',
drawtext=fontfile={self.FONT_PATH}:text='{anime_text}':fontsize={answer_fontsize}:fontcolor=cyan:borderw=3:bordercolor=black:x=(w-tw)/2:y={anime_y}:enable='gte(t,{question_duration})'[v];
[1:a]anull[a_in];
{audio_filter}
""".replace("\n", "").strip()
args = [
"-stream_loop", "-1",
"-i", str(bg_path),
"-ss", str(question.start_time),
"-t", str(total_duration),
"-i", str(audio_path),
"-filter_complex", video_filter,
"-map", "[v]",
"-map", "[a]",
"-t", str(total_duration),
"-c:v", "libx264",
"-preset", "medium",
"-crf", "23",
"-c:a", "aac",
"-b:a", "192k",
str(output_path)
]
result = self._run_ffmpeg(args, check=False)
if result.returncode != 0:
raise RuntimeError(f"FFmpeg error in combined scene: {result.stderr}")
return output_path
def _create_final_screen(self) -> Path:
"""Create final CTA screen for full video mode."""
output_path = self._get_temp_path()
duration = settings.final_screen_duration
bg_path = self._get_background_path()
# Escape texts
title_text = self._escape_text("How many did you guess?")
cta_text = self._escape_text("Subscribe for more anime quizzes!")
# Calculate positions
title_y = int(self.height * 0.35)
cta_y = int(self.height * 0.55)
video_filter = f"""
[0:v]scale={self.width}:{self.height}:force_original_aspect_ratio=increase,
crop={self.width}:{self.height},
setsar=1,
fps={self.fps},
drawtext=fontfile={self.FONT_PATH}:text='{title_text}':fontsize=56:fontcolor=yellow:borderw=3:bordercolor=black:x=(w-tw)/2:y={title_y},
drawtext=fontfile={self.FONT_PATH}:text='{cta_text}':fontsize=40:fontcolor=white:borderw=2:bordercolor=black:x=(w-tw)/2:y={cta_y},
fade=t=in:d=0.3,
fade=t=out:st={duration - 0.5}:d=0.5[v]
""".replace("\n", "").strip()
args = [
"-stream_loop", "-1",
"-i", str(bg_path),
"-filter_complex", video_filter,
"-map", "[v]",
"-t", str(duration),
"-c:v", "libx264",
"-preset", "medium",
"-crf", "23",
"-an",
str(output_path)
]
result = self._run_ffmpeg(args, check=False)
if result.returncode != 0:
raise RuntimeError(f"FFmpeg error in final screen: {result.stderr}")
return output_path
def _concatenate_scenes(self, scene_files: list[Path]) -> Path:
"""Concatenate all scenes into final video."""
output_filename = f"quiz_{self.mode.value}_{uuid.uuid4().hex[:8]}.mp4"
output_path = settings.output_path / output_filename
# Create concat list file
concat_file = self._get_temp_path(suffix=".txt")
with open(concat_file, "w") as f:
for scene in scene_files:
f.write(f"file '{scene}'\n")
# Re-encode for consistent output
args = [
"-f", "concat",
"-safe", "0",
"-i", str(concat_file),
"-c:v", "libx264",
"-preset", "medium",
"-crf", "23",
"-c:a", "aac",
"-b:a", "192k",
"-movflags", "+faststart",
str(output_path)
]
result = self._run_ffmpeg(args, check=False)
if result.returncode != 0:
raise RuntimeError(f"FFmpeg error in concatenation: {result.stderr}")
return output_path
def _cleanup(self):
"""Remove temporary files and directory."""
for path in self.temp_files:
try:
if path.exists():
path.unlink()
except Exception:
pass
try:
if self.temp_dir.exists():
self.temp_dir.rmdir()
except Exception:
pass
def generate(self) -> Path:
"""Generate the complete quiz video."""
try:
scene_files = []
for i, question in enumerate(self.questions, 1):
if self.continue_audio:
# Create combined scene with continuous audio
combined_scene = self._create_combined_scene(question, i)
scene_files.append(combined_scene)
else:
# Question scene
q_scene = self._create_question_scene(question, i)
scene_files.append(q_scene)
# Answer scene
a_scene = self._create_answer_scene(question)
scene_files.append(a_scene)
# Final screen for full video mode
if self.mode == VideoMode.FULL:
final = self._create_final_screen()
scene_files.append(final)
# Concatenate all scenes
output_path = self._concatenate_scenes(scene_files)
return output_path
finally:
self._cleanup()
def check_ffmpeg() -> bool:
"""Check if FFmpeg is available."""
try:
result = subprocess.run(
["ffmpeg", "-version"],
capture_output=True,
text=True,
timeout=5,
)
return result.returncode == 0
except Exception:
return False