import base64 import hashlib import hmac import struct import time from datetime import datetime, timedelta from typing import Any from jose import jwt from passlib.context import CryptContext from app.core.config import settings pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def verify_password(plain_password: str, hashed_password: str) -> bool: return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: return pwd_context.hash(password) def create_access_token(subject: int | Any, expires_delta: timedelta | None = None) -> str: if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) to_encode = {"exp": expire, "sub": str(subject)} encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM) return encoded_jwt def decode_access_token(token: str) -> dict | None: try: payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]) return payload except jwt.JWTError: return None def create_telegram_link_token(user_id: int, expire_minutes: int = 10) -> str: """ Create a short token for Telegram account linking. Format: base64url encoded binary data (no separators). Structure: user_id (4 bytes) + expire_at (4 bytes) + signature (8 bytes) = 16 bytes -> 22 chars base64url. """ expire_at = int(time.time()) + (expire_minutes * 60) # Pack user_id and expire_at as unsigned 32-bit integers (8 bytes total) data = struct.pack(">II", user_id, expire_at) # Create HMAC signature (take first 8 bytes) signature = hmac.new( settings.SECRET_KEY.encode(), data, hashlib.sha256 ).digest()[:8] # Combine data + signature (16 bytes) token_bytes = data + signature # Encode as base64url without padding token = base64.urlsafe_b64encode(token_bytes).decode().rstrip("=") return token def verify_telegram_link_token(token: str) -> int | None: """ Verify Telegram link token and return user_id if valid. Returns None if token is invalid or expired. """ try: # Add padding if needed for base64 decoding padding = 4 - (len(token) % 4) if padding != 4: token += "=" * padding token_bytes = base64.urlsafe_b64decode(token) if len(token_bytes) != 16: return None # Unpack data data = token_bytes[:8] provided_signature = token_bytes[8:] user_id, expire_at = struct.unpack(">II", data) # Check expiration if time.time() > expire_at: return None # Verify signature expected_signature = hmac.new( settings.SECRET_KEY.encode(), data, hashlib.sha256 ).digest()[:8] if not hmac.compare_digest(provided_signature, expected_signature): return None return user_id except (ValueError, struct.error, Exception): return None