""" Dispute Scheduler - marks disputes as pending admin review after 24 hours. """ import asyncio from datetime import datetime, timedelta from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from app.models import Dispute, DisputeStatus, Assignment, AssignmentStatus from app.services.telegram_notifier import telegram_notifier # Configuration CHECK_INTERVAL_SECONDS = 300 # Check every 5 minutes DISPUTE_WINDOW_HOURS = 24 # Disputes need admin decision after 24 hours class DisputeScheduler: """Background scheduler that marks expired disputes for admin review.""" def __init__(self): self._running = False self._task: asyncio.Task | None = None async def start(self, session_factory) -> None: """Start the scheduler background task.""" if self._running: return self._running = True self._task = asyncio.create_task(self._run_loop(session_factory)) print("[DisputeScheduler] Started") async def stop(self) -> None: """Stop the scheduler.""" self._running = False if self._task: self._task.cancel() try: await self._task except asyncio.CancelledError: pass print("[DisputeScheduler] Stopped") async def _run_loop(self, session_factory) -> None: """Main scheduler loop.""" while self._running: try: async with session_factory() as db: await self._process_expired_disputes(db) except Exception as e: print(f"[DisputeScheduler] Error in loop: {e}") await asyncio.sleep(CHECK_INTERVAL_SECONDS) async def _process_expired_disputes(self, db: AsyncSession) -> None: """Mark expired disputes as pending admin review.""" cutoff_time = datetime.utcnow() - timedelta(hours=DISPUTE_WINDOW_HOURS) # Find all open disputes that have expired result = await db.execute( select(Dispute) .options( selectinload(Dispute.votes), ) .where( Dispute.status == DisputeStatus.OPEN.value, Dispute.created_at < cutoff_time, ) ) expired_disputes = result.scalars().all() for dispute in expired_disputes: try: # Count votes for logging votes_valid = sum(1 for v in dispute.votes if v.vote is True) votes_invalid = sum(1 for v in dispute.votes if v.vote is False) # Mark as pending admin decision dispute.status = DisputeStatus.PENDING_ADMIN.value print( f"[DisputeScheduler] Dispute {dispute.id} marked as pending admin " f"(recommendation: {'invalid' if votes_invalid > votes_valid else 'valid'}, " f"votes: {votes_valid} valid, {votes_invalid} invalid)" ) except Exception as e: print(f"[DisputeScheduler] Failed to process dispute {dispute.id}: {e}") if expired_disputes: await db.commit() # Notify admins about pending disputes await telegram_notifier.notify_admin_disputes_pending(db, len(expired_disputes)) # Global scheduler instance dispute_scheduler = DisputeScheduler()