import boto3 import urllib3 from botocore.config import Config from ..config import get_settings # Suppress SSL warnings for self-signed certificate urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) settings = get_settings() def get_s3_client(): return boto3.client( "s3", endpoint_url=settings.s3_endpoint_url, aws_access_key_id=settings.s3_access_key, aws_secret_access_key=settings.s3_secret_key, region_name=settings.s3_region, config=Config(signature_version="s3v4"), verify=False, # FirstVDS uses self-signed certificate ) async def get_total_storage_size() -> int: """Returns total size of all objects in bucket in bytes""" client = get_s3_client() total_size = 0 paginator = client.get_paginator("list_objects_v2") for page in paginator.paginate(Bucket=settings.s3_bucket_name): for obj in page.get("Contents", []): total_size += obj["Size"] return total_size async def can_upload_file(file_size: int) -> bool: """Check if file can be uploaded without exceeding storage limit""" max_bytes = settings.max_storage_gb * 1024 * 1024 * 1024 current_size = await get_total_storage_size() return (current_size + file_size) <= max_bytes async def upload_file(file_content: bytes, s3_key: str, content_type: str = "audio/mpeg") -> str: """Upload file to S3 and return the key""" client = get_s3_client() client.put_object( Bucket=settings.s3_bucket_name, Key=s3_key, Body=file_content, ContentType=content_type, ) return s3_key async def delete_file(s3_key: str) -> None: """Delete file from S3""" client = get_s3_client() client.delete_object(Bucket=settings.s3_bucket_name, Key=s3_key) def generate_presigned_url(s3_key: str, expiration: int = 3600) -> str: """Generate presigned URL for file access""" client = get_s3_client() url = client.generate_presigned_url( "get_object", Params={"Bucket": settings.s3_bucket_name, "Key": s3_key}, ExpiresIn=expiration, ) return url def get_file_content(s3_key: str) -> bytes: """Get full file content from S3""" client = get_s3_client() response = client.get_object(Bucket=settings.s3_bucket_name, Key=s3_key) return response["Body"].read() def get_file_size(s3_key: str) -> int: """Get file size from S3 without downloading""" client = get_s3_client() response = client.head_object(Bucket=settings.s3_bucket_name, Key=s3_key) return response["ContentLength"] def get_file_range(s3_key: str, start: int, end: int): """Get a range of bytes from S3 file""" client = get_s3_client() response = client.get_object( Bucket=settings.s3_bucket_name, Key=s3_key, Range=f"bytes={start}-{end}" ) return response["Body"].read() def stream_file_chunks(s3_key: str, start: int = 0, end: int = None, chunk_size: int = 64 * 1024): """Stream file from S3 in chunks (default 64KB chunks)""" client = get_s3_client() if end is None: range_header = f"bytes={start}-" else: range_header = f"bytes={start}-{end}" response = client.get_object( Bucket=settings.s3_bucket_name, Key=s3_key, Range=range_header ) for chunk in response["Body"].iter_chunks(chunk_size=chunk_size): yield chunk