Files
tg_bot_language/services/task_service.py
mamonov.ep 69c651c031 fix: передача user_id во все вызовы AI сервиса
Исправлено: при выполнении задач использовалась глобальная модель
вместо привязанной к пользователю.

Обновлены все handlers и services для передачи user_id в AI методы.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-08 16:56:31 +03:00

563 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import random
from datetime import datetime
from typing import List, Dict, Optional
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from database.models import Task, Vocabulary, WordTranslation
from services.ai_service import ai_service
class TaskService:
"""Сервис для работы с заданиями"""
@staticmethod
async def generate_translation_tasks(
session: AsyncSession,
user_id: int,
count: int = 5,
learning_lang: str = 'en'
) -> List[Dict]:
"""
Генерация заданий на перевод слов
Args:
session: Сессия базы данных
user_id: ID пользователя
count: Количество заданий
Returns:
Список заданий
"""
# Получаем слова пользователя на изучаемом языке
result = await session.execute(
select(Vocabulary)
.where(Vocabulary.user_id == user_id)
.where(Vocabulary.source_lang == learning_lang)
.order_by(Vocabulary.last_reviewed.asc().nullsfirst())
.limit(count * 2) # Берем больше, чтобы было из чего выбрать
)
words = list(result.scalars().all())
if not words:
return []
# Выбираем случайные слова
selected_words = random.sample(words, min(count, len(words)))
tasks = []
for word in selected_words:
# Случайно выбираем направление перевода
direction = random.choice(['en_to_ru', 'ru_to_en'])
if direction == 'en_to_ru':
task = {
'type': 'translate_to_ru',
'word_id': word.id,
'question': f"Переведи слово: <b>{word.word_original}</b>",
'word': word.word_original,
'correct_answer': word.word_translation,
'transcription': word.transcription
}
else:
task = {
'type': 'translate_to_en',
'word_id': word.id,
'question': f"Переведи слово: <b>{word.word_translation}</b>",
'word': word.word_translation,
'correct_answer': word.word_original,
'transcription': word.transcription
}
tasks.append(task)
return tasks
@staticmethod
async def generate_mixed_tasks(
session: AsyncSession,
user_id: int,
count: int = 5,
learning_lang: str = 'en',
translation_lang: str = 'ru'
) -> List[Dict]:
"""
Генерация заданий разных типов (переводы + заполнение пропусков)
Args:
session: Сессия базы данных
user_id: ID пользователя
count: Количество заданий
Returns:
Список заданий разных типов
"""
# Получаем слова пользователя на изучаемом языке
result = await session.execute(
select(Vocabulary)
.where(Vocabulary.user_id == user_id)
.where(Vocabulary.source_lang == learning_lang)
.order_by(Vocabulary.last_reviewed.asc().nullsfirst())
.limit(count * 2)
)
words = list(result.scalars().all())
if not words:
return []
# Выбираем случайные слова
selected_words = random.sample(words, min(count, len(words)))
tasks = []
for word in selected_words:
# Получаем переводы из таблицы WordTranslation
translations_result = await session.execute(
select(WordTranslation)
.where(WordTranslation.vocabulary_id == word.id)
.order_by(WordTranslation.is_primary.desc())
)
translations = list(translations_result.scalars().all())
# Случайно выбираем тип задания
# Если есть переводы с контекстом, добавляем тип 'context_translate'
task_types = ['translate', 'fill_in']
if translations and any(tr.context for tr in translations):
task_types.append('context_translate')
task_type = random.choice(task_types)
if task_type == 'context_translate' and translations:
# Задание на перевод по контексту
# Выбираем случайный перевод с контекстом
translations_with_context = [tr for tr in translations if tr.context]
if translations_with_context:
selected_tr = random.choice(translations_with_context)
# Локализация фразы
if translation_lang == 'en':
prompt = "Translate the highlighted word in context:"
elif translation_lang == 'ja':
prompt = "文脈に合った翻訳を入力してください:"
else:
prompt = "Переведи выделенное слово в контексте:"
task = {
'type': 'context_translate',
'word_id': word.id,
'translation_id': selected_tr.id,
'question': (
f"{prompt}\n\n"
f"<i>«{selected_tr.context}»</i>\n\n"
f"<b>{word.word_original}</b> = ?"
),
'word': word.word_original,
'correct_answer': selected_tr.translation,
'transcription': word.transcription,
'context': selected_tr.context,
'context_translation': selected_tr.context_translation
}
tasks.append(task)
continue
if task_type == 'translate':
# Задание на перевод между языком обучения и языком перевода
direction = random.choice(['learn_to_tr', 'tr_to_learn'])
# Локализация фразы "Переведи слово"
if translation_lang == 'en':
prompt = "Translate the word:"
elif translation_lang == 'ja':
prompt = "単語を訳してください:"
else:
prompt = "Переведи слово:"
# Определяем правильный ответ - берём из таблицы переводов если есть
correct_translation = word.word_translation
if translations:
# Берём основной перевод или первый
primary = next((tr for tr in translations if tr.is_primary), translations[0] if translations else None)
if primary:
correct_translation = primary.translation
if direction == 'learn_to_tr':
task = {
'type': f'translate_to_{translation_lang}',
'word_id': word.id,
'question': f"{prompt} <b>{word.word_original}</b>",
'word': word.word_original,
'correct_answer': correct_translation,
'transcription': word.transcription,
# Все допустимые ответы для проверки
'all_translations': [tr.translation for tr in translations] if translations else [correct_translation]
}
else:
task = {
'type': f'translate_to_{learning_lang}',
'word_id': word.id,
'question': f"{prompt} <b>{correct_translation}</b>",
'word': correct_translation,
'correct_answer': word.word_original,
'transcription': word.transcription
}
else:
# Задание на заполнение пропуска
# Генерируем предложение с пропуском через AI
sentence_data = await ai_service.generate_fill_in_sentence(
word.word_original,
learning_lang=learning_lang,
translation_lang=translation_lang,
user_id=user_id
)
# Локализация заголовка
if translation_lang == 'en':
fill_title = "Fill in the blank in the sentence:"
elif translation_lang == 'ja':
fill_title = "文の空欄を埋めてください:"
else:
fill_title = "Заполни пропуск в предложении:"
task = {
'type': 'fill_in',
'word_id': word.id,
'question': (
f"{fill_title}\n\n"
f"<b>{sentence_data['sentence']}</b>\n\n"
f"<i>{sentence_data.get('translation', '')}</i>"
),
'word': word.word_original,
'correct_answer': sentence_data['answer'],
'sentence': sentence_data['sentence']
}
tasks.append(task)
return tasks
@staticmethod
async def generate_tasks_by_type(
session: AsyncSession,
user_id: int,
count: int = 5,
task_type: str = 'mix',
learning_lang: str = 'en',
translation_lang: str = 'ru'
) -> List[Dict]:
"""
Генерация заданий определённого типа (оптимизировано - 1 запрос к AI)
Args:
session: Сессия базы данных
user_id: ID пользователя
count: Количество заданий
task_type: Тип заданий (mix, word_translate, fill_blank, sentence_translate)
learning_lang: Язык обучения
translation_lang: Язык перевода
Returns:
Список заданий
"""
# Получаем слова пользователя на изучаемом языке
result = await session.execute(
select(Vocabulary)
.where(Vocabulary.user_id == user_id)
.where(Vocabulary.source_lang == learning_lang)
.order_by(Vocabulary.last_reviewed.asc().nullsfirst())
.limit(count * 2)
)
words = list(result.scalars().all())
if not words:
return []
# Выбираем случайные слова
selected_words = random.sample(words, min(count, len(words)))
# 1. Подготовка: определяем типы и собираем данные для всех слов
word_data_list = []
for word in selected_words:
# Получаем переводы
translations_result = await session.execute(
select(WordTranslation)
.where(WordTranslation.vocabulary_id == word.id)
.order_by(WordTranslation.is_primary.desc())
)
translations = list(translations_result.scalars().all())
# Определяем тип задания
if task_type == 'mix':
chosen_type = random.choice(['word_translate', 'fill_blank', 'sentence_translate'])
else:
chosen_type = task_type
# Определяем перевод
correct_translation = word.word_translation
if translations:
primary = next((tr for tr in translations if tr.is_primary), translations[0] if translations else None)
if primary:
correct_translation = primary.translation
word_data_list.append({
'word': word,
'translations': translations,
'correct_translation': correct_translation,
'chosen_type': chosen_type
})
# 2. Собираем задания, требующие AI
ai_tasks = []
ai_task_indices = []
for i, wd in enumerate(word_data_list):
if wd['chosen_type'] in ('fill_blank', 'sentence_translate'):
ai_tasks.append({
'word': wd['word'].word_original,
'task_type': wd['chosen_type']
})
ai_task_indices.append(i)
# 3. Один запрос к AI
ai_results = []
if ai_tasks:
ai_results = await ai_service.generate_task_sentences_batch(
ai_tasks,
learning_lang=learning_lang,
translation_lang=translation_lang,
user_id=user_id
)
# Маппинг результатов
ai_results_map = {}
for idx, result in zip(ai_task_indices, ai_results):
ai_results_map[idx] = result
# 4. Собираем финальные задания
tasks = []
for i, wd in enumerate(word_data_list):
word = wd['word']
translations = wd['translations']
correct_translation = wd['correct_translation']
chosen_type = wd['chosen_type']
if chosen_type == 'word_translate':
direction = random.choice(['learn_to_tr', 'tr_to_learn'])
if translation_lang == 'en':
prompt = "Translate the word:"
elif translation_lang == 'ja':
prompt = "単語を訳してください:"
else:
prompt = "Переведи слово:"
if direction == 'learn_to_tr':
task = {
'type': f'translate_to_{translation_lang}',
'word_id': word.id,
'question': f"{prompt} <b>{word.word_original}</b>",
'word': word.word_original,
'correct_answer': correct_translation,
'transcription': word.transcription,
'all_translations': [tr.translation for tr in translations] if translations else [correct_translation]
}
else:
task = {
'type': f'translate_to_{learning_lang}',
'word_id': word.id,
'question': f"{prompt} <b>{correct_translation}</b>",
'word': correct_translation,
'correct_answer': word.word_original,
'transcription': word.transcription
}
elif chosen_type == 'fill_blank':
sentence_data = ai_results_map.get(i, {})
if translation_lang == 'en':
fill_title = "Fill in the blank:"
elif translation_lang == 'ja':
fill_title = "空欄を埋めてください:"
else:
fill_title = "Заполни пропуск:"
task = {
'type': 'fill_in',
'word_id': word.id,
'question': (
f"{fill_title}\n\n"
f"<b>{sentence_data.get('sentence', '___')}</b>\n\n"
f"<i>{sentence_data.get('translation', '')}</i>"
),
'word': word.word_original,
'correct_answer': sentence_data.get('answer', word.word_original),
'sentence': sentence_data.get('sentence', '___')
}
elif chosen_type == 'sentence_translate':
sentence_data = ai_results_map.get(i, {})
if translation_lang == 'en':
sentence_title = "Translate the sentence:"
word_hint = "Word"
elif translation_lang == 'ja':
sentence_title = "文を翻訳してください:"
word_hint = "単語"
else:
sentence_title = "Переведи предложение:"
word_hint = "Слово"
task = {
'type': 'sentence_translate',
'word_id': word.id,
'question': f"{sentence_title}\n\n<b>{sentence_data.get('sentence', word.word_original)}</b>\n\n📝 {word_hint}: <code>{word.word_original}</code> — {correct_translation}",
'word': word.word_original,
'correct_answer': sentence_data.get('translation', correct_translation),
'sentence': sentence_data.get('sentence', word.word_original)
}
tasks.append(task)
return tasks
@staticmethod
async def save_task_result(
session: AsyncSession,
user_id: int,
task_type: str,
content: Dict,
user_answer: str,
correct_answer: str,
is_correct: bool,
ai_feedback: Optional[str] = None
) -> Task:
"""
Сохранение результата выполнения задания
Args:
session: Сессия базы данных
user_id: ID пользователя
task_type: Тип задания
content: Содержимое задания
user_answer: Ответ пользователя
correct_answer: Правильный ответ
is_correct: Правильность ответа
ai_feedback: Обратная связь от AI
Returns:
Сохраненное задание
"""
task = Task(
user_id=user_id,
task_type=task_type,
content=content,
user_answer=user_answer,
correct_answer=correct_answer,
is_correct=is_correct,
ai_feedback=ai_feedback,
completed_at=datetime.utcnow()
)
session.add(task)
await session.commit()
await session.refresh(task)
return task
@staticmethod
async def update_word_statistics(
session: AsyncSession,
word_id: int,
is_correct: bool
):
"""
Обновление статистики слова
Args:
session: Сессия базы данных
word_id: ID слова
is_correct: Правильность ответа
"""
result = await session.execute(
select(Vocabulary).where(Vocabulary.id == word_id)
)
word = result.scalar_one_or_none()
if word:
word.times_reviewed += 1
if is_correct:
word.correct_answers += 1
word.last_reviewed = datetime.utcnow()
await session.commit()
@staticmethod
async def get_user_stats(session: AsyncSession, user_id: int) -> Dict:
"""
Получение статистики пользователя
Args:
session: Сессия базы данных
user_id: ID пользователя
Returns:
Статистика пользователя
"""
# Количество слов
words_result = await session.execute(
select(Vocabulary).where(Vocabulary.user_id == user_id)
)
words = list(words_result.scalars().all())
total_words = len(words)
# Количество выполненных заданий
tasks_result = await session.execute(
select(Task).where(Task.user_id == user_id)
)
tasks = list(tasks_result.scalars().all())
total_tasks = len(tasks)
# Правильные ответы
correct_tasks = len([t for t in tasks if t.is_correct])
accuracy = int((correct_tasks / total_tasks * 100)) if total_tasks > 0 else 0
# Слова с повторениями
reviewed_words = len([w for w in words if w.times_reviewed > 0])
return {
'total_words': total_words,
'reviewed_words': reviewed_words,
'total_tasks': total_tasks,
'correct_tasks': correct_tasks,
'accuracy': accuracy
}
@staticmethod
async def get_correctly_answered_words(
session: AsyncSession,
user_id: int
) -> List[str]:
"""
Получить список слов, на которые пользователь правильно ответил в заданиях
Args:
session: Сессия базы данных
user_id: ID пользователя
Returns:
Список слов (строки) с правильными ответами
"""
result = await session.execute(
select(Task)
.where(Task.user_id == user_id)
.where(Task.is_correct == True)
)
tasks = list(result.scalars().all())
words = []
for task in tasks:
if task.content and isinstance(task.content, dict):
word = task.content.get('word')
if word:
words.append(word.lower())
return list(set(words))