Files
game-marathon/status-service/monitors.py
2025-12-20 02:28:41 +07:00

318 lines
12 KiB
Python

"""Service monitoring with persistence and alerting."""
import asyncio
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import Optional
from enum import Enum
import httpx
from database import (
save_metric, get_latency_history, get_uptime_stats, get_avg_latency,
create_incident, resolve_incident, get_open_incident, mark_incident_notified
)
from alerts import alert_service_down, alert_service_recovered
from ssl_monitor import check_and_alert_ssl, SSLInfo
class Status(str, Enum):
OPERATIONAL = "operational"
DEGRADED = "degraded"
DOWN = "down"
UNKNOWN = "unknown"
@dataclass
class ServiceStatus:
name: str
display_name: str
status: Status = Status.UNKNOWN
latency_ms: Optional[float] = None
last_check: Optional[datetime] = None
last_incident: Optional[datetime] = None
uptime_percent: float = 100.0
message: Optional[str] = None
version: Optional[str] = None
avg_latency_24h: Optional[float] = None
latency_history: list = None
# For uptime calculation (in-memory, backed by DB)
total_checks: int = 0
successful_checks: int = 0
def __post_init__(self):
if self.latency_history is None:
self.latency_history = []
def to_dict(self) -> dict:
return {
"name": self.name,
"display_name": self.display_name,
"status": self.status.value,
"latency_ms": round(self.latency_ms, 2) if self.latency_ms else None,
"last_check": self.last_check.isoformat() if self.last_check else None,
"last_incident": self.last_incident.isoformat() if self.last_incident else None,
"uptime_percent": round(self.uptime_percent, 2),
"message": self.message,
"version": self.version,
"avg_latency_24h": round(self.avg_latency_24h, 2) if self.avg_latency_24h else None,
}
def update_uptime(self, is_success: bool):
self.total_checks += 1
if is_success:
self.successful_checks += 1
if self.total_checks > 0:
self.uptime_percent = (self.successful_checks / self.total_checks) * 100
class ServiceMonitor:
def __init__(self):
self.services: dict[str, ServiceStatus] = {
"backend": ServiceStatus(
name="backend",
display_name="Backend API"
),
"database": ServiceStatus(
name="database",
display_name="Database"
),
"frontend": ServiceStatus(
name="frontend",
display_name="Frontend"
),
"bot": ServiceStatus(
name="bot",
display_name="Telegram Bot"
),
"external": ServiceStatus(
name="external",
display_name="External Access"
),
}
self.last_check: Optional[datetime] = None
self.ssl_info: Optional[SSLInfo] = None
async def check_backend(self, url: str) -> tuple[Status, Optional[float], Optional[str], Optional[str]]:
"""Check backend API health."""
try:
async with httpx.AsyncClient(timeout=10.0) as client:
start = datetime.now()
response = await client.get(f"{url}/health")
latency = (datetime.now() - start).total_seconds() * 1000
if response.status_code == 200:
data = response.json()
return Status.OPERATIONAL, latency, None, data.get("version")
else:
return Status.DEGRADED, latency, f"HTTP {response.status_code}", None
except httpx.TimeoutException:
return Status.DOWN, None, "Timeout", None
except Exception as e:
return Status.DOWN, None, str(e)[:100], None
async def check_database(self, backend_url: str) -> tuple[Status, Optional[float], Optional[str]]:
"""Check database through backend."""
try:
async with httpx.AsyncClient(timeout=10.0) as client:
start = datetime.now()
response = await client.get(f"{backend_url}/health")
latency = (datetime.now() - start).total_seconds() * 1000
if response.status_code == 200:
return Status.OPERATIONAL, latency, None
else:
return Status.DOWN, latency, "Backend reports unhealthy"
except Exception as e:
return Status.DOWN, None, "Cannot reach backend"
async def check_frontend(self, url: str) -> tuple[Status, Optional[float], Optional[str]]:
"""Check frontend availability."""
try:
async with httpx.AsyncClient(timeout=10.0) as client:
start = datetime.now()
response = await client.get(url)
latency = (datetime.now() - start).total_seconds() * 1000
if response.status_code == 200:
return Status.OPERATIONAL, latency, None
else:
return Status.DEGRADED, latency, f"HTTP {response.status_code}"
except httpx.TimeoutException:
return Status.DOWN, None, "Timeout"
except Exception as e:
return Status.DOWN, None, str(e)[:100]
async def check_bot(self, url: str) -> tuple[Status, Optional[float], Optional[str]]:
"""Check Telegram bot health."""
try:
async with httpx.AsyncClient(timeout=10.0) as client:
start = datetime.now()
response = await client.get(f"{url}/health")
latency = (datetime.now() - start).total_seconds() * 1000
if response.status_code == 200:
return Status.OPERATIONAL, latency, None
else:
return Status.DEGRADED, latency, f"HTTP {response.status_code}"
except httpx.TimeoutException:
return Status.DOWN, None, "Timeout"
except Exception as e:
return Status.DOWN, None, str(e)[:100]
async def check_external(self, url: str) -> tuple[Status, Optional[float], Optional[str]]:
"""Check external (public) URL availability."""
if not url:
return Status.UNKNOWN, None, "Not configured"
try:
async with httpx.AsyncClient(timeout=15.0, follow_redirects=True) as client:
start = datetime.now()
response = await client.get(url)
latency = (datetime.now() - start).total_seconds() * 1000
if response.status_code == 200:
return Status.OPERATIONAL, latency, None
else:
return Status.DEGRADED, latency, f"HTTP {response.status_code}"
except httpx.TimeoutException:
return Status.DOWN, None, "Timeout"
except Exception as e:
return Status.DOWN, None, str(e)[:100]
async def _process_check_result(
self,
service_name: str,
result: tuple,
now: datetime,
suppress_alerts: bool = False
):
"""Process check result with DB persistence and alerting."""
if isinstance(result, Exception):
return
if len(result) == 4:
status, latency, message, version = result
else:
status, latency, message = result
version = None
svc = self.services[service_name]
was_down = svc.status in (Status.DOWN, Status.DEGRADED)
is_down = status in (Status.DOWN, Status.DEGRADED)
# Update service status
svc.status = status
svc.latency_ms = latency
svc.message = message
if version:
svc.version = version
svc.last_check = now
svc.update_uptime(status == Status.OPERATIONAL)
# Save metric to database
save_metric(service_name, status.value, latency, message)
# Load historical data
svc.latency_history = get_latency_history(service_name, hours=24)
svc.avg_latency_24h = get_avg_latency(service_name, hours=24)
# Update uptime from DB
stats = get_uptime_stats(service_name, hours=24)
if stats["total_checks"] > 0:
svc.uptime_percent = stats["uptime_percent"]
# Handle incident tracking and alerting (skip alerts during grace period)
if is_down and not was_down:
# Service just went down
svc.last_incident = now
incident_id = create_incident(service_name, status.value, message)
if not suppress_alerts:
await alert_service_down(service_name, svc.display_name, message)
mark_incident_notified(incident_id)
elif not is_down and was_down:
# Service recovered
open_incident = get_open_incident(service_name)
if open_incident:
started_at = datetime.fromisoformat(open_incident["started_at"])
downtime_minutes = int((now - started_at).total_seconds() / 60)
resolve_incident(service_name)
if not suppress_alerts:
await alert_service_recovered(service_name, svc.display_name, downtime_minutes)
async def check_all_services(
self,
backend_url: str,
frontend_url: str,
bot_url: str,
external_url: str = "",
public_url: str = "",
suppress_alerts: bool = False
):
"""Check all services concurrently."""
now = datetime.now()
# Run all checks concurrently
results = await asyncio.gather(
self.check_backend(backend_url),
self.check_database(backend_url),
self.check_frontend(frontend_url),
self.check_bot(bot_url),
self.check_external(external_url),
return_exceptions=True
)
# Process results
service_names = ["backend", "database", "frontend", "bot", "external"]
for i, service_name in enumerate(service_names):
await self._process_check_result(service_name, results[i], now, suppress_alerts)
# Check SSL certificate (if public URL is HTTPS)
if public_url and public_url.startswith("https://"):
self.ssl_info = await check_and_alert_ssl(public_url)
self.last_check = now
def get_all_statuses(self, period_hours: int = 24) -> dict[str, ServiceStatus]:
"""Get all service statuses with data for specified period."""
# Update historical data for requested period
for name, svc in self.services.items():
svc.latency_history = get_latency_history(name, hours=period_hours)
svc.avg_latency_24h = get_avg_latency(name, hours=period_hours)
stats = get_uptime_stats(name, hours=period_hours)
if stats["total_checks"] > 0:
svc.uptime_percent = stats["uptime_percent"]
return self.services
def get_overall_status(self) -> Status:
"""Get overall system status based on all services."""
# Exclude external from overall status if not configured
statuses = [
svc.status for name, svc in self.services.items()
if name != "external" or svc.status != Status.UNKNOWN
]
if all(s == Status.OPERATIONAL for s in statuses):
return Status.OPERATIONAL
elif any(s == Status.DOWN for s in statuses):
return Status.DOWN
elif any(s == Status.DEGRADED for s in statuses):
return Status.DEGRADED
else:
return Status.UNKNOWN
def get_ssl_status(self) -> Optional[dict]:
"""Get SSL certificate status."""
if not self.ssl_info:
return None
return {
"domain": self.ssl_info.domain,
"issuer": self.ssl_info.issuer,
"expires_at": self.ssl_info.expires_at.isoformat(),
"days_until_expiry": self.ssl_info.days_until_expiry,
"is_valid": self.ssl_info.is_valid,
"error": self.ssl_info.error
}