Files
tg_bot_language/services/task_service.py

288 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import random
from datetime import datetime
from typing import List, Dict, Optional
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from database.models import Task, Vocabulary
from services.ai_service import ai_service
class TaskService:
"""Сервис для работы с заданиями"""
@staticmethod
async def generate_translation_tasks(
session: AsyncSession,
user_id: int,
count: int = 5
) -> List[Dict]:
"""
Генерация заданий на перевод слов
Args:
session: Сессия базы данных
user_id: ID пользователя
count: Количество заданий
Returns:
Список заданий
"""
# Получаем слова пользователя
result = await session.execute(
select(Vocabulary)
.where(Vocabulary.user_id == user_id)
.order_by(Vocabulary.last_reviewed.asc().nullsfirst())
.limit(count * 2) # Берем больше, чтобы было из чего выбрать
)
words = list(result.scalars().all())
if not words:
return []
# Выбираем случайные слова
selected_words = random.sample(words, min(count, len(words)))
tasks = []
for word in selected_words:
# Случайно выбираем направление перевода
direction = random.choice(['en_to_ru', 'ru_to_en'])
if direction == 'en_to_ru':
task = {
'type': 'translate_to_ru',
'word_id': word.id,
'question': f"Переведи слово: <b>{word.word_original}</b>",
'word': word.word_original,
'correct_answer': word.word_translation,
'transcription': word.transcription
}
else:
task = {
'type': 'translate_to_en',
'word_id': word.id,
'question': f"Переведи слово: <b>{word.word_translation}</b>",
'word': word.word_translation,
'correct_answer': word.word_original,
'transcription': word.transcription
}
tasks.append(task)
return tasks
@staticmethod
async def generate_mixed_tasks(
session: AsyncSession,
user_id: int,
count: int = 5,
learning_lang: str = 'en',
translation_lang: str = 'ru'
) -> List[Dict]:
"""
Генерация заданий разных типов (переводы + заполнение пропусков)
Args:
session: Сессия базы данных
user_id: ID пользователя
count: Количество заданий
Returns:
Список заданий разных типов
"""
# Получаем слова пользователя
result = await session.execute(
select(Vocabulary)
.where(Vocabulary.user_id == user_id)
.order_by(Vocabulary.last_reviewed.asc().nullsfirst())
.limit(count * 2)
)
words = list(result.scalars().all())
if not words:
return []
# Выбираем случайные слова
selected_words = random.sample(words, min(count, len(words)))
tasks = []
for word in selected_words:
# Случайно выбираем тип задания
task_type = random.choice(['translate', 'fill_in'])
if task_type == 'translate':
# Задание на перевод между языком обучения и языком перевода
direction = random.choice(['learn_to_tr', 'tr_to_learn'])
# Локализация фразы "Переведи слово"
if translation_lang == 'en':
prompt = "Translate the word:"
elif translation_lang == 'ja':
prompt = "単語を訳してください:"
else:
prompt = "Переведи слово:"
if direction == 'learn_to_tr':
task = {
'type': f'translate_to_{translation_lang}',
'word_id': word.id,
'question': f"{prompt} <b>{word.word_original}</b>",
'word': word.word_original,
'correct_answer': word.word_translation,
'transcription': word.transcription
}
else:
task = {
'type': f'translate_to_{learning_lang}',
'word_id': word.id,
'question': f"{prompt} <b>{word.word_translation}</b>",
'word': word.word_translation,
'correct_answer': word.word_original,
'transcription': word.transcription
}
else:
# Задание на заполнение пропуска
# Генерируем предложение с пропуском через AI
sentence_data = await ai_service.generate_fill_in_sentence(
word.word_original,
learning_lang=learning_lang,
translation_lang=translation_lang
)
# Локализация заголовка
if translation_lang == 'en':
fill_title = "Fill in the blank in the sentence:"
elif translation_lang == 'ja':
fill_title = "文の空欄を埋めてください:"
else:
fill_title = "Заполни пропуск в предложении:"
task = {
'type': 'fill_in',
'word_id': word.id,
'question': (
f"{fill_title}\n\n"
f"<b>{sentence_data['sentence']}</b>\n\n"
f"<i>{sentence_data.get('translation', '')}</i>"
),
'word': word.word_original,
'correct_answer': sentence_data['answer'],
'sentence': sentence_data['sentence']
}
tasks.append(task)
return tasks
@staticmethod
async def save_task_result(
session: AsyncSession,
user_id: int,
task_type: str,
content: Dict,
user_answer: str,
correct_answer: str,
is_correct: bool,
ai_feedback: Optional[str] = None
) -> Task:
"""
Сохранение результата выполнения задания
Args:
session: Сессия базы данных
user_id: ID пользователя
task_type: Тип задания
content: Содержимое задания
user_answer: Ответ пользователя
correct_answer: Правильный ответ
is_correct: Правильность ответа
ai_feedback: Обратная связь от AI
Returns:
Сохраненное задание
"""
task = Task(
user_id=user_id,
task_type=task_type,
content=content,
user_answer=user_answer,
correct_answer=correct_answer,
is_correct=is_correct,
ai_feedback=ai_feedback,
completed_at=datetime.utcnow()
)
session.add(task)
await session.commit()
await session.refresh(task)
return task
@staticmethod
async def update_word_statistics(
session: AsyncSession,
word_id: int,
is_correct: bool
):
"""
Обновление статистики слова
Args:
session: Сессия базы данных
word_id: ID слова
is_correct: Правильность ответа
"""
result = await session.execute(
select(Vocabulary).where(Vocabulary.id == word_id)
)
word = result.scalar_one_or_none()
if word:
word.times_reviewed += 1
if is_correct:
word.correct_answers += 1
word.last_reviewed = datetime.utcnow()
await session.commit()
@staticmethod
async def get_user_stats(session: AsyncSession, user_id: int) -> Dict:
"""
Получение статистики пользователя
Args:
session: Сессия базы данных
user_id: ID пользователя
Returns:
Статистика пользователя
"""
# Количество слов
words_result = await session.execute(
select(Vocabulary).where(Vocabulary.user_id == user_id)
)
words = list(words_result.scalars().all())
total_words = len(words)
# Количество выполненных заданий
tasks_result = await session.execute(
select(Task).where(Task.user_id == user_id)
)
tasks = list(tasks_result.scalars().all())
total_tasks = len(tasks)
# Правильные ответы
correct_tasks = len([t for t in tasks if t.is_correct])
accuracy = int((correct_tasks / total_tasks * 100)) if total_tasks > 0 else 0
# Слова с повторениями
reviewed_words = len([w for w in words if w.times_reviewed > 0])
return {
'total_words': total_words,
'reviewed_words': reviewed_words,
'total_tasks': total_tasks,
'correct_tasks': correct_tasks,
'accuracy': accuracy
}