""" Симулятор движения транспортных средств. Генерирует реалистичные данные о перемещении объектов. """ import asyncio import random import math import httpx from datetime import datetime # Backend API URL (внутри Docker сети, без /api — root_path только для nginx) API_URL = "http://backend:8000" # Начальные координаты (Новосибирск - центр) START_COORDS = [ (55.0304, 82.9204), # Центр (55.0411, 82.9344), # Север (55.0198, 82.9064), # Юг (55.0350, 82.8904), # Запад (55.0250, 82.9504), # Восток ] class VehicleSimulator: def __init__(self, vehicle_id: int, start_lat: float, start_lon: float): self.vehicle_id = vehicle_id self.lat = start_lat self.lon = start_lon self.speed = random.uniform(20, 60) # km/h self.heading = random.uniform(0, 360) # degrees self.is_stopped = False self.stop_duration = 0 def update(self): """Обновить позицию транспортного средства""" # Случайная остановка if not self.is_stopped and random.random() < 0.02: # 2% шанс остановиться self.is_stopped = True self.stop_duration = random.randint(5, 30) # секунд self.speed = 0 if self.is_stopped: self.stop_duration -= 1 if self.stop_duration <= 0: self.is_stopped = False self.speed = random.uniform(20, 60) if not self.is_stopped: # Случайное изменение направления self.heading += random.uniform(-15, 15) self.heading = self.heading % 360 # Случайное изменение скорости self.speed += random.uniform(-5, 5) self.speed = max(10, min(90, self.speed)) # Ограничение 10-90 км/ч # Расчёт нового положения # Примерно: 1 градус широты = 111 км, 1 градус долготы = 111 * cos(lat) км speed_ms = self.speed / 3.6 # м/с distance = speed_ms * 2 # за 2 секунды # Перевод в градусы delta_lat = (distance * math.cos(math.radians(self.heading))) / 111000 delta_lon = (distance * math.sin(math.radians(self.heading))) / (111000 * math.cos(math.radians(self.lat))) self.lat += delta_lat self.lon += delta_lon return { "vehicle_id": self.vehicle_id, "lat": round(self.lat, 6), "lon": round(self.lon, 6), "speed": round(self.speed, 1), "heading": round(self.heading, 1), "timestamp": datetime.utcnow().isoformat() } async def send_position(client: httpx.AsyncClient, position: dict): """Отправить позицию на сервер""" try: response = await client.post(f"{API_URL}/ingest/position", json=position) if response.status_code == 201: print(f"✓ Vehicle {position['vehicle_id']}: ({position['lat']}, {position['lon']}) @ {position['speed']} km/h") else: print(f"✗ Vehicle {position['vehicle_id']}: Error {response.status_code}") except Exception as e: print(f"✗ Vehicle {position['vehicle_id']}: {e}") async def main(): print("🚗 Запуск симулятора транспорта...") print(f"📡 API URL: {API_URL}") # Создаём симуляторы для каждого транспортного средства simulators = [] for i, (lat, lon) in enumerate(START_COORDS, start=1): sim = VehicleSimulator(vehicle_id=i, start_lat=lat, start_lon=lon) simulators.append(sim) print(f" → Vehicle {i}: начальная позиция ({lat}, {lon})") print("\n🔄 Начинаем отправку данных (Ctrl+C для остановки)...\n") async with httpx.AsyncClient(timeout=10.0) as client: while True: # Обновляем и отправляем позиции всех транспортных средств tasks = [] for sim in simulators: position = sim.update() tasks.append(send_position(client, position)) await asyncio.gather(*tasks) await asyncio.sleep(2) # Интервал 2 секунды if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print("\n\n⏹ Симулятор остановлен")