Files
server-manager/core/s3_client.py
2026-03-03 08:33:48 -05:00

521 lines
20 KiB
Python

"""
S3 client wrapper — duck-typed, lazy-imports boto3 module.
Works with any S3-compatible storage (AWS, MinIO, etc.).
Resilience features:
- Adaptive retry with exponential backoff (up to 10 attempts)
- Multipart upload/download with configurable chunk size
- Auto-reconnect on connection loss (network switch, Wi-Fi change)
- boto3 TransferConfig tuned for unstable connections
"""
import os
import time
from core.logger import log
_boto3 = None
_botocore = None
# Retry / resilience constants
_MAX_RETRIES = 10
_BASE_DELAY = 2.0 # seconds
_MAX_DELAY = 60.0 # seconds
_MULTIPART_THRESHOLD = 8 * 1024 * 1024 # 8 MB — use multipart above this
_MULTIPART_CHUNKSIZE = 8 * 1024 * 1024 # 8 MB chunks
_MAX_CONCURRENCY = 4 # parallel parts (low for unstable links)
def _get_boto3():
global _boto3, _botocore
if _boto3 is None:
import boto3 as _b
import botocore as _bc
_boto3 = _b
_botocore = _bc
return _boto3
def _get_transfer_config():
"""TransferConfig tuned for unreliable connections."""
from boto3.s3.transfer import TransferConfig
return TransferConfig(
multipart_threshold=_MULTIPART_THRESHOLD,
multipart_chunksize=_MULTIPART_CHUNKSIZE,
max_concurrency=_MAX_CONCURRENCY,
num_download_attempts=_MAX_RETRIES,
)
def _retry_delay(attempt: int) -> float:
"""Exponential backoff: 2, 4, 8, 16, 32, 60, 60, ..."""
delay = min(_BASE_DELAY * (2 ** attempt), _MAX_DELAY)
return delay
class S3Client:
"""Manage a single S3 connection. No ABC — duck typing."""
def __init__(self, server: dict):
self._server = server
self._endpoint = server.get("ip", "")
# If endpoint doesn't start with http, add https
if self._endpoint and not self._endpoint.startswith("http"):
use_ssl = server.get("use_ssl", True)
scheme = "https" if use_ssl else "http"
port = int(server.get("port", 443))
if (scheme == "https" and port == 443) or (scheme == "http" and port == 80):
self._endpoint = f"{scheme}://{self._endpoint}"
else:
self._endpoint = f"{scheme}://{self._endpoint}:{port}"
self._access_key = server.get("access_key", "")
self._secret_key = server.get("secret_key", "")
self._bucket = server.get("bucket", "")
self._use_ssl = server.get("use_ssl", True)
self._client = None
self._transfer_config = None
self._last_ok: float = 0 # timestamp of last successful operation
# -- lifecycle --------------------------------------------------------
def connect(self) -> bool:
try:
b3 = _get_boto3()
import botocore.config
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
config = botocore.config.Config(
signature_version="s3v4",
connect_timeout=15,
read_timeout=60,
retries={"max_attempts": 5, "mode": "adaptive"},
tcp_keepalive=True,
)
self._client = b3.client(
"s3",
endpoint_url=self._endpoint,
aws_access_key_id=self._access_key,
aws_secret_access_key=self._secret_key,
config=config,
verify=False,
)
self._transfer_config = _get_transfer_config()
# Test connection
self._client.list_buckets()
self._last_ok = time.time()
log.info("S3 connected %s", self._endpoint)
return True
except Exception as exc:
log.error("S3 connect failed: %s", exc)
self._client = None
return False
def _reconnect(self) -> bool:
"""Try to re-establish the S3 connection after a drop."""
log.warning("S3 reconnecting to %s...", self._endpoint)
self._client = None
return self.connect()
def _ensure_connected(self) -> bool:
"""Check connection, reconnect if needed.
Skips health-check if last success was <30s ago (avoids redundant RTTs).
"""
if self._client is None:
return self._reconnect()
if time.time() - self._last_ok < 30:
return True
try:
self._client.list_buckets()
self._last_ok = time.time()
return True
except Exception:
return self._reconnect()
def disconnect(self):
self._client = None
log.info("S3 disconnected")
def check_connection(self) -> bool:
try:
if self._client is None:
return False
self._client.list_buckets()
return True
except Exception:
return False
# -- bucket operations ------------------------------------------------
def list_buckets(self) -> list[dict]:
"""Return list of {'Name': str, 'CreationDate': datetime}."""
if not self._ensure_connected():
return []
try:
resp = self._client.list_buckets()
self._last_ok = time.time()
return resp.get("Buckets", [])
except Exception as exc:
log.error("S3 list_buckets failed: %s", exc)
return []
# -- object operations ------------------------------------------------
def list_objects(self, bucket: str = "", prefix: str = "",
delimiter: str = "/") -> tuple[list[dict], list[str]]:
"""List objects and common prefixes in a bucket/prefix.
Returns (objects, prefixes) where:
- objects: list of {'Key', 'Size', 'LastModified'}
- prefixes: list of prefix strings (subdirectories)
"""
if not self._ensure_connected():
return [], []
bucket = bucket or self._bucket
if not bucket:
return [], []
try:
objects = []
prefixes = []
paginator = self._client.get_paginator("list_objects_v2")
kwargs = {"Bucket": bucket, "Delimiter": delimiter}
if prefix:
kwargs["Prefix"] = prefix
for page in paginator.paginate(**kwargs):
for obj in page.get("Contents", []):
# Skip the prefix itself
if obj["Key"] != prefix:
objects.append(obj)
for cp in page.get("CommonPrefixes", []):
prefixes.append(cp["Prefix"])
self._last_ok = time.time()
return objects, prefixes
except Exception as exc:
log.error("S3 list_objects failed: %s", exc)
return [], []
def upload_file(self, local_path: str, bucket: str, key: str,
progress_cb=None, status_cb=None) -> bool:
"""Upload a local file to S3 with retry and resume.
progress_cb(bytes_transferred) — called periodically for progress bar.
status_cb(message) — called with status messages (retry info, etc.).
Uses multipart upload for files > 8 MB.
On failure, retries up to 10 times with exponential backoff.
boto3 multipart automatically resumes failed parts.
"""
if not self._ensure_connected():
return False
file_size = os.path.getsize(local_path)
for attempt in range(_MAX_RETRIES):
try:
self._client.upload_file(
local_path, bucket, key,
Config=self._transfer_config,
Callback=progress_cb,
)
log.info("S3 uploaded %s -> s3://%s/%s (%d bytes)",
local_path, bucket, key, file_size)
return True
except Exception as exc:
delay = _retry_delay(attempt)
log.warning("S3 upload attempt %d/%d failed: %s (retry in %.0fs)",
attempt + 1, _MAX_RETRIES, exc, delay)
if status_cb:
status_cb(f"Retry {attempt + 1}/{_MAX_RETRIES} in {delay:.0f}s...")
# Reset progress for retry (callback accumulates)
if progress_cb and attempt < _MAX_RETRIES - 1:
# We can't easily reset boto3's internal counter,
# but the GUI tracks total bytes itself
pass
time.sleep(delay)
# Reconnect before retry
if not self._reconnect():
log.error("S3 reconnect failed on attempt %d", attempt + 1)
continue
log.error("S3 upload failed after %d attempts: %s -> s3://%s/%s",
_MAX_RETRIES, local_path, bucket, key)
return False
def download_file(self, bucket: str, key: str, local_path: str,
progress_cb=None, status_cb=None) -> bool:
"""Download with resume support using S3 Range GET.
On disconnect, keeps the .s3part file and resumes from where it
stopped. ETag is checked to detect if the remote file changed
(in that case the partial file is discarded and download restarts).
progress_cb(bytes_delta) — called with each chunk size.
status_cb(message) — called with retry / resume info.
"""
if not self._ensure_connected():
return False
# --- 1. HEAD — get size and ETag ---
try:
head = self._client.head_object(Bucket=bucket, Key=key)
total_size = head["ContentLength"]
etag = head.get("ETag", "")
except Exception as exc:
log.error("S3 head_object failed: %s", exc)
return False
# Small files (< 1 MB) — simple download, no resume overhead
if total_size < 1024 * 1024:
return self._download_file_simple(
bucket, key, local_path, progress_cb, status_cb)
# --- 2. Check .s3part (partial download) ---
temp_path = local_path + ".s3part"
meta_path = local_path + ".s3meta"
start_byte = 0
if os.path.exists(temp_path):
saved_etag = ""
if os.path.exists(meta_path):
try:
with open(meta_path, "r") as f:
saved_etag = f.read().strip()
except Exception:
pass
if saved_etag == etag and etag:
start_byte = os.path.getsize(temp_path)
if start_byte >= total_size:
# Already fully downloaded
os.replace(temp_path, local_path)
self._cleanup_meta(meta_path)
self._last_ok = time.time()
return True
log.info("S3 resuming from byte %d / %d", start_byte, total_size)
if status_cb:
mb = start_byte / (1024 * 1024)
status_cb(f"Resuming from {mb:.1f} MB...")
else:
# ETag changed — file was modified on server, start fresh
try:
os.remove(temp_path)
except OSError:
pass
start_byte = 0
# Save ETag for future resume
try:
with open(meta_path, "w") as f:
f.write(etag)
except Exception:
pass
# Report already-downloaded bytes so progress bar is correct
if progress_cb and start_byte > 0:
progress_cb(start_byte)
# --- 3. Download loop with retry ---
chunk_size = _MULTIPART_CHUNKSIZE # 8 MB
for attempt in range(_MAX_RETRIES):
try:
if start_byte >= total_size:
break
range_header = f"bytes={start_byte}-"
resp = self._client.get_object(
Bucket=bucket, Key=key, Range=range_header)
with open(temp_path, "ab") as f:
for chunk in resp["Body"].iter_chunks(chunk_size=chunk_size):
f.write(chunk)
f.flush()
start_byte += len(chunk)
if progress_cb:
progress_cb(len(chunk))
# --- 4. Verify size ---
actual = os.path.getsize(temp_path)
if actual != total_size:
log.warning("S3 size mismatch: got %d, expected %d",
actual, total_size)
# Don't delete — maybe we can resume next attempt
if actual < total_size:
start_byte = actual
continue
# actual > total_size — corrupted, restart
try:
os.remove(temp_path)
except OSError:
pass
start_byte = 0
continue
# --- 5. Atomic rename ---
os.replace(temp_path, local_path)
self._cleanup_meta(meta_path)
self._last_ok = time.time()
log.info("S3 downloaded s3://%s/%s -> %s (%d bytes, resumed)",
bucket, key, local_path, total_size)
return True
except Exception as exc:
# Update start_byte from actual file size
if os.path.exists(temp_path):
start_byte = os.path.getsize(temp_path)
delay = _retry_delay(attempt)
log.warning("S3 download attempt %d/%d failed at byte %d: %s",
attempt + 1, _MAX_RETRIES, start_byte, exc)
if status_cb:
pct = (start_byte / total_size * 100) if total_size else 0
status_cb(f"Retry {attempt+1}/{_MAX_RETRIES} at {pct:.0f}%...")
time.sleep(delay)
self._reconnect()
# Adaptive chunk: reduce on repeated failures
if attempt >= 2 and chunk_size > 1024 * 1024:
chunk_size = 1024 * 1024 # 1 MB
log.info("S3 reducing chunk size to 1 MB")
log.error("S3 download failed after %d attempts: s3://%s/%s -> %s",
_MAX_RETRIES, bucket, key, local_path)
return False
def _download_file_simple(self, bucket: str, key: str, local_path: str,
progress_cb=None, status_cb=None) -> bool:
"""Simple download for small files (no resume overhead)."""
for attempt in range(_MAX_RETRIES):
try:
self._client.download_file(
bucket, key, local_path,
Config=self._transfer_config,
Callback=progress_cb,
)
self._last_ok = time.time()
log.info("S3 downloaded s3://%s/%s -> %s", bucket, key, local_path)
return True
except Exception as exc:
delay = _retry_delay(attempt)
log.warning("S3 download attempt %d/%d failed: %s (retry in %.0fs)",
attempt + 1, _MAX_RETRIES, exc, delay)
if status_cb:
status_cb(f"Retry {attempt + 1}/{_MAX_RETRIES} in {delay:.0f}s...")
time.sleep(delay)
if not self._reconnect():
log.error("S3 reconnect failed on attempt %d", attempt + 1)
continue
log.error("S3 download failed after %d attempts: s3://%s/%s -> %s",
_MAX_RETRIES, bucket, key, local_path)
return False
@staticmethod
def _cleanup_meta(meta_path: str):
"""Remove .s3meta file silently."""
try:
os.remove(meta_path)
except OSError:
pass
def delete_object(self, bucket: str, key: str) -> bool:
"""Delete an object from S3."""
if not self._ensure_connected():
return False
try:
self._client.delete_object(Bucket=bucket, Key=key)
log.info("S3 deleted s3://%s/%s", bucket, key)
return True
except Exception as exc:
log.error("S3 delete failed: %s", exc)
return False
def generate_presigned_url(self, bucket: str, key: str,
expires_in: int = 3600) -> str | None:
"""Generate a presigned download URL for an object.
expires_in: URL lifetime in seconds (default 1 hour).
Returns URL string or None on failure.
"""
if not self._ensure_connected():
return None
try:
url = self._client.generate_presigned_url(
"get_object",
Params={"Bucket": bucket, "Key": key},
ExpiresIn=expires_in,
)
return url
except Exception as exc:
log.error("S3 presigned URL failed: %s", exc)
return None
def list_all_objects(self, bucket: str, prefix: str = "") -> list[dict]:
"""List ALL objects under prefix recursively (no delimiter).
Returns list of {'Key', 'Size', 'LastModified'}.
"""
if not self._ensure_connected():
return []
try:
objects = []
paginator = self._client.get_paginator("list_objects_v2")
kwargs = {"Bucket": bucket}
if prefix:
kwargs["Prefix"] = prefix
for page in paginator.paginate(**kwargs):
for obj in page.get("Contents", []):
# Skip "folder" markers (zero-byte keys ending with /)
if obj["Key"].endswith("/") and obj.get("Size", 0) == 0:
continue
objects.append(obj)
self._last_ok = time.time()
return objects
except Exception as exc:
log.error("S3 list_all_objects failed: %s", exc)
return []
def delete_prefix(self, bucket: str, prefix: str) -> int:
"""Recursively delete all objects under a prefix. Returns count deleted."""
if not self._ensure_connected():
return 0
try:
deleted = 0
paginator = self._client.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
objects = page.get("Contents", [])
if not objects:
continue
delete_req = {
"Objects": [{"Key": obj["Key"]} for obj in objects],
"Quiet": True,
}
self._client.delete_objects(Bucket=bucket, Delete=delete_req)
deleted += len(objects)
self._last_ok = time.time()
log.info("S3 deleted prefix s3://%s/%s (%d objects)", bucket, prefix, deleted)
return deleted
except Exception as exc:
log.error("S3 delete prefix failed: %s", exc)
return 0
def create_folder(self, bucket: str, key: str) -> bool:
"""Create a folder (empty object with trailing slash) in S3."""
if not self._ensure_connected():
return False
try:
self._client.put_object(Bucket=bucket, Key=key, Body=b"")
self._last_ok = time.time()
log.info("S3 created folder s3://%s/%s", bucket, key)
return True
except Exception as exc:
log.error("S3 create folder failed: %s", exc)
return False
def get_direct_url(self, bucket: str, key: str) -> str:
"""Build a direct (permanent) URL: endpoint/bucket/key."""
endpoint = self._endpoint.rstrip("/")
return f"{endpoint}/{bucket}/{key}"
def get_object_size(self, bucket: str, key: str) -> int:
"""Get size of an object in bytes."""
if not self._ensure_connected():
return 0
try:
resp = self._client.head_object(Bucket=bucket, Key=key)
return resp.get("ContentLength", 0)
except Exception:
return 0