Files
enigFM/backend/app/services/ping_task.py

89 lines
3.8 KiB
Python
Raw Normal View History

import asyncio
import time
from uuid import UUID
from datetime import datetime
from sqlalchemy import select, delete
from app.database import async_session
from app.models.room import Room, RoomParticipant
from app.services.sync import manager
async def ping_users_task():
"""Фоновая задача для периодического пинга пользователей и отключения неактивных"""
while True:
try:
await asyncio.sleep(manager.ping_interval)
current_time = time.time()
disconnected_users = []
# Проходим по всем активным соединениям
for room_id, connections in list(manager.get_all_connections().items()):
for websocket, user_id in list(connections):
# Отправляем ping
try:
await websocket.send_json({"type": "ping"})
except Exception:
# Если не смогли отправить - помечаем на отключение
disconnected_users.append((websocket, room_id, user_id))
continue
# Проверяем время последнего pong
last_pong_time = manager.last_pong.get((room_id, user_id))
if last_pong_time is None or (current_time - last_pong_time) > manager.ping_timeout:
# Пользователь не отвечает слишком долго
disconnected_users.append((websocket, room_id, user_id))
# Отключаем неактивных пользователей
for websocket, room_id, user_id in disconnected_users:
await disconnect_inactive_user(websocket, room_id, user_id)
except Exception as e:
print(f"Error in ping_users_task: {e}")
await asyncio.sleep(5) # Подождём немного при ошибке
async def disconnect_inactive_user(websocket, room_id: UUID, user_id: UUID):
"""Отключить неактивного пользователя"""
try:
# Закрываем WebSocket соединение
await websocket.close(code=1000, reason="Ping timeout")
except Exception:
pass
# Удаляем из менеджера соединений
manager.disconnect(websocket, room_id, user_id)
# Удаляем из БД
async with async_session() as db:
await db.execute(
delete(RoomParticipant).where(
RoomParticipant.room_id == room_id,
RoomParticipant.user_id == user_id
)
)
await db.commit()
# Проверяем, остались ли участники
room_user_count = manager.get_room_user_count(room_id)
# Если комната пустая - ставим трек на паузу
if room_user_count == 0:
result = await db.execute(select(Room).where(Room.id == room_id))
room = result.scalar_one_or_none()
if room and room.is_playing:
# Сохраняем текущую позицию
if room.playback_started_at:
elapsed = (datetime.utcnow() - room.playback_started_at).total_seconds() * 1000
room.playback_position = int((room.playback_position or 0) + elapsed)
room.is_playing = False
room.playback_started_at = None
await db.commit()
# Уведомляем остальных участников
await manager.broadcast_to_room(
room_id,
{"type": "user_left", "user_id": str(user_id), "reason": "timeout"},
)