318 lines
12 KiB
Python
318 lines
12 KiB
Python
"""Service monitoring with persistence and alerting."""
|
|
import asyncio
|
|
from datetime import datetime, timedelta
|
|
from dataclasses import dataclass
|
|
from typing import Optional
|
|
from enum import Enum
|
|
|
|
import httpx
|
|
|
|
from database import (
|
|
save_metric, get_latency_history, get_uptime_stats, get_avg_latency,
|
|
create_incident, resolve_incident, get_open_incident, mark_incident_notified
|
|
)
|
|
from alerts import alert_service_down, alert_service_recovered
|
|
from ssl_monitor import check_and_alert_ssl, SSLInfo
|
|
|
|
|
|
class Status(str, Enum):
|
|
OPERATIONAL = "operational"
|
|
DEGRADED = "degraded"
|
|
DOWN = "down"
|
|
UNKNOWN = "unknown"
|
|
|
|
|
|
@dataclass
|
|
class ServiceStatus:
|
|
name: str
|
|
display_name: str
|
|
status: Status = Status.UNKNOWN
|
|
latency_ms: Optional[float] = None
|
|
last_check: Optional[datetime] = None
|
|
last_incident: Optional[datetime] = None
|
|
uptime_percent: float = 100.0
|
|
message: Optional[str] = None
|
|
version: Optional[str] = None
|
|
avg_latency_24h: Optional[float] = None
|
|
latency_history: list = None
|
|
|
|
# For uptime calculation (in-memory, backed by DB)
|
|
total_checks: int = 0
|
|
successful_checks: int = 0
|
|
|
|
def __post_init__(self):
|
|
if self.latency_history is None:
|
|
self.latency_history = []
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"name": self.name,
|
|
"display_name": self.display_name,
|
|
"status": self.status.value,
|
|
"latency_ms": round(self.latency_ms, 2) if self.latency_ms else None,
|
|
"last_check": self.last_check.isoformat() if self.last_check else None,
|
|
"last_incident": self.last_incident.isoformat() if self.last_incident else None,
|
|
"uptime_percent": round(self.uptime_percent, 2),
|
|
"message": self.message,
|
|
"version": self.version,
|
|
"avg_latency_24h": round(self.avg_latency_24h, 2) if self.avg_latency_24h else None,
|
|
}
|
|
|
|
def update_uptime(self, is_success: bool):
|
|
self.total_checks += 1
|
|
if is_success:
|
|
self.successful_checks += 1
|
|
if self.total_checks > 0:
|
|
self.uptime_percent = (self.successful_checks / self.total_checks) * 100
|
|
|
|
|
|
class ServiceMonitor:
|
|
def __init__(self):
|
|
self.services: dict[str, ServiceStatus] = {
|
|
"backend": ServiceStatus(
|
|
name="backend",
|
|
display_name="Backend API"
|
|
),
|
|
"database": ServiceStatus(
|
|
name="database",
|
|
display_name="Database"
|
|
),
|
|
"frontend": ServiceStatus(
|
|
name="frontend",
|
|
display_name="Frontend"
|
|
),
|
|
"bot": ServiceStatus(
|
|
name="bot",
|
|
display_name="Telegram Bot"
|
|
),
|
|
"external": ServiceStatus(
|
|
name="external",
|
|
display_name="External Access"
|
|
),
|
|
}
|
|
self.last_check: Optional[datetime] = None
|
|
self.ssl_info: Optional[SSLInfo] = None
|
|
|
|
async def check_backend(self, url: str) -> tuple[Status, Optional[float], Optional[str], Optional[str]]:
|
|
"""Check backend API health."""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
start = datetime.now()
|
|
response = await client.get(f"{url}/health")
|
|
latency = (datetime.now() - start).total_seconds() * 1000
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
return Status.OPERATIONAL, latency, None, data.get("version")
|
|
else:
|
|
return Status.DEGRADED, latency, f"HTTP {response.status_code}", None
|
|
except httpx.TimeoutException:
|
|
return Status.DOWN, None, "Timeout", None
|
|
except Exception as e:
|
|
return Status.DOWN, None, str(e)[:100], None
|
|
|
|
async def check_database(self, backend_url: str) -> tuple[Status, Optional[float], Optional[str]]:
|
|
"""Check database through backend."""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
start = datetime.now()
|
|
response = await client.get(f"{backend_url}/health")
|
|
latency = (datetime.now() - start).total_seconds() * 1000
|
|
|
|
if response.status_code == 200:
|
|
return Status.OPERATIONAL, latency, None
|
|
else:
|
|
return Status.DOWN, latency, "Backend reports unhealthy"
|
|
except Exception as e:
|
|
return Status.DOWN, None, "Cannot reach backend"
|
|
|
|
async def check_frontend(self, url: str) -> tuple[Status, Optional[float], Optional[str]]:
|
|
"""Check frontend availability."""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
start = datetime.now()
|
|
response = await client.get(url)
|
|
latency = (datetime.now() - start).total_seconds() * 1000
|
|
|
|
if response.status_code == 200:
|
|
return Status.OPERATIONAL, latency, None
|
|
else:
|
|
return Status.DEGRADED, latency, f"HTTP {response.status_code}"
|
|
except httpx.TimeoutException:
|
|
return Status.DOWN, None, "Timeout"
|
|
except Exception as e:
|
|
return Status.DOWN, None, str(e)[:100]
|
|
|
|
async def check_bot(self, url: str) -> tuple[Status, Optional[float], Optional[str]]:
|
|
"""Check Telegram bot health."""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
start = datetime.now()
|
|
response = await client.get(f"{url}/health")
|
|
latency = (datetime.now() - start).total_seconds() * 1000
|
|
|
|
if response.status_code == 200:
|
|
return Status.OPERATIONAL, latency, None
|
|
else:
|
|
return Status.DEGRADED, latency, f"HTTP {response.status_code}"
|
|
except httpx.TimeoutException:
|
|
return Status.DOWN, None, "Timeout"
|
|
except Exception as e:
|
|
return Status.DOWN, None, str(e)[:100]
|
|
|
|
async def check_external(self, url: str) -> tuple[Status, Optional[float], Optional[str]]:
|
|
"""Check external (public) URL availability."""
|
|
if not url:
|
|
return Status.UNKNOWN, None, "Not configured"
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=15.0, follow_redirects=True) as client:
|
|
start = datetime.now()
|
|
response = await client.get(url)
|
|
latency = (datetime.now() - start).total_seconds() * 1000
|
|
|
|
if response.status_code == 200:
|
|
return Status.OPERATIONAL, latency, None
|
|
else:
|
|
return Status.DEGRADED, latency, f"HTTP {response.status_code}"
|
|
except httpx.TimeoutException:
|
|
return Status.DOWN, None, "Timeout"
|
|
except Exception as e:
|
|
return Status.DOWN, None, str(e)[:100]
|
|
|
|
async def _process_check_result(
|
|
self,
|
|
service_name: str,
|
|
result: tuple,
|
|
now: datetime,
|
|
suppress_alerts: bool = False
|
|
):
|
|
"""Process check result with DB persistence and alerting."""
|
|
if isinstance(result, Exception):
|
|
return
|
|
|
|
if len(result) == 4:
|
|
status, latency, message, version = result
|
|
else:
|
|
status, latency, message = result
|
|
version = None
|
|
|
|
svc = self.services[service_name]
|
|
was_down = svc.status in (Status.DOWN, Status.DEGRADED)
|
|
is_down = status in (Status.DOWN, Status.DEGRADED)
|
|
|
|
# Update service status
|
|
svc.status = status
|
|
svc.latency_ms = latency
|
|
svc.message = message
|
|
if version:
|
|
svc.version = version
|
|
svc.last_check = now
|
|
svc.update_uptime(status == Status.OPERATIONAL)
|
|
|
|
# Save metric to database
|
|
save_metric(service_name, status.value, latency, message)
|
|
|
|
# Load historical data
|
|
svc.latency_history = get_latency_history(service_name, hours=24)
|
|
svc.avg_latency_24h = get_avg_latency(service_name, hours=24)
|
|
|
|
# Update uptime from DB
|
|
stats = get_uptime_stats(service_name, hours=24)
|
|
if stats["total_checks"] > 0:
|
|
svc.uptime_percent = stats["uptime_percent"]
|
|
|
|
# Handle incident tracking and alerting (skip alerts during grace period)
|
|
if is_down and not was_down:
|
|
# Service just went down
|
|
svc.last_incident = now
|
|
incident_id = create_incident(service_name, status.value, message)
|
|
if not suppress_alerts:
|
|
await alert_service_down(service_name, svc.display_name, message)
|
|
mark_incident_notified(incident_id)
|
|
|
|
elif not is_down and was_down:
|
|
# Service recovered
|
|
open_incident = get_open_incident(service_name)
|
|
if open_incident:
|
|
started_at = datetime.fromisoformat(open_incident["started_at"])
|
|
downtime_minutes = int((now - started_at).total_seconds() / 60)
|
|
resolve_incident(service_name)
|
|
if not suppress_alerts:
|
|
await alert_service_recovered(service_name, svc.display_name, downtime_minutes)
|
|
|
|
async def check_all_services(
|
|
self,
|
|
backend_url: str,
|
|
frontend_url: str,
|
|
bot_url: str,
|
|
external_url: str = "",
|
|
public_url: str = "",
|
|
suppress_alerts: bool = False
|
|
):
|
|
"""Check all services concurrently."""
|
|
now = datetime.now()
|
|
|
|
# Run all checks concurrently
|
|
results = await asyncio.gather(
|
|
self.check_backend(backend_url),
|
|
self.check_database(backend_url),
|
|
self.check_frontend(frontend_url),
|
|
self.check_bot(bot_url),
|
|
self.check_external(external_url),
|
|
return_exceptions=True
|
|
)
|
|
|
|
# Process results
|
|
service_names = ["backend", "database", "frontend", "bot", "external"]
|
|
for i, service_name in enumerate(service_names):
|
|
await self._process_check_result(service_name, results[i], now, suppress_alerts)
|
|
|
|
# Check SSL certificate (if public URL is HTTPS)
|
|
if public_url and public_url.startswith("https://"):
|
|
self.ssl_info = await check_and_alert_ssl(public_url)
|
|
|
|
self.last_check = now
|
|
|
|
def get_all_statuses(self, period_hours: int = 24) -> dict[str, ServiceStatus]:
|
|
"""Get all service statuses with data for specified period."""
|
|
# Update historical data for requested period
|
|
for name, svc in self.services.items():
|
|
svc.latency_history = get_latency_history(name, hours=period_hours)
|
|
svc.avg_latency_24h = get_avg_latency(name, hours=period_hours)
|
|
stats = get_uptime_stats(name, hours=period_hours)
|
|
if stats["total_checks"] > 0:
|
|
svc.uptime_percent = stats["uptime_percent"]
|
|
return self.services
|
|
|
|
def get_overall_status(self) -> Status:
|
|
"""Get overall system status based on all services."""
|
|
# Exclude external from overall status if not configured
|
|
statuses = [
|
|
svc.status for name, svc in self.services.items()
|
|
if name != "external" or svc.status != Status.UNKNOWN
|
|
]
|
|
|
|
if all(s == Status.OPERATIONAL for s in statuses):
|
|
return Status.OPERATIONAL
|
|
elif any(s == Status.DOWN for s in statuses):
|
|
return Status.DOWN
|
|
elif any(s == Status.DEGRADED for s in statuses):
|
|
return Status.DEGRADED
|
|
else:
|
|
return Status.UNKNOWN
|
|
|
|
def get_ssl_status(self) -> Optional[dict]:
|
|
"""Get SSL certificate status."""
|
|
if not self.ssl_info:
|
|
return None
|
|
|
|
return {
|
|
"domain": self.ssl_info.domain,
|
|
"issuer": self.ssl_info.issuer,
|
|
"expires_at": self.ssl_info.expires_at.isoformat(),
|
|
"days_until_expiry": self.ssl_info.days_until_expiry,
|
|
"is_valid": self.ssl_info.is_valid,
|
|
"error": self.ssl_info.error
|
|
}
|