547 lines
21 KiB
Python
547 lines
21 KiB
Python
"""
|
|
S3 client wrapper — duck-typed, lazy-imports boto3 module.
|
|
Works with any S3-compatible storage (AWS, MinIO, etc.).
|
|
|
|
Resilience features:
|
|
- Adaptive retry with exponential backoff (up to 10 attempts)
|
|
- Multipart upload/download with configurable chunk size
|
|
- Auto-reconnect on connection loss (network switch, Wi-Fi change)
|
|
- boto3 TransferConfig tuned for unstable connections
|
|
"""
|
|
|
|
import os
|
|
import time
|
|
|
|
from core.logger import log
|
|
|
|
_boto3 = None
|
|
_botocore = None
|
|
|
|
# Retry / resilience constants
|
|
_MAX_RETRIES = 10
|
|
_BASE_DELAY = 2.0 # seconds
|
|
_MAX_DELAY = 60.0 # seconds
|
|
_MULTIPART_THRESHOLD = 8 * 1024 * 1024 # 8 MB — use multipart above this
|
|
_MULTIPART_CHUNKSIZE = 8 * 1024 * 1024 # 8 MB chunks
|
|
_MAX_CONCURRENCY = 4 # parallel parts (low for unstable links)
|
|
|
|
|
|
def _get_boto3():
|
|
global _boto3, _botocore
|
|
if _boto3 is None:
|
|
import boto3 as _b
|
|
import botocore as _bc
|
|
_boto3 = _b
|
|
_botocore = _bc
|
|
return _boto3
|
|
|
|
|
|
def _get_transfer_config():
|
|
"""TransferConfig tuned for unreliable connections."""
|
|
from boto3.s3.transfer import TransferConfig
|
|
return TransferConfig(
|
|
multipart_threshold=_MULTIPART_THRESHOLD,
|
|
multipart_chunksize=_MULTIPART_CHUNKSIZE,
|
|
max_concurrency=_MAX_CONCURRENCY,
|
|
num_download_attempts=_MAX_RETRIES,
|
|
)
|
|
|
|
|
|
def _retry_delay(attempt: int) -> float:
|
|
"""Exponential backoff: 2, 4, 8, 16, 32, 60, 60, ..."""
|
|
delay = min(_BASE_DELAY * (2 ** attempt), _MAX_DELAY)
|
|
return delay
|
|
|
|
|
|
class S3Client:
|
|
"""Manage a single S3 connection. No ABC — duck typing."""
|
|
|
|
def __init__(self, server: dict):
|
|
self._server = server
|
|
self._endpoint = server.get("ip", "")
|
|
# If endpoint doesn't start with http, add https
|
|
if self._endpoint and not self._endpoint.startswith("http"):
|
|
use_ssl = server.get("use_ssl", True)
|
|
scheme = "https" if use_ssl else "http"
|
|
port = int(server.get("port", 443))
|
|
if (scheme == "https" and port == 443) or (scheme == "http" and port == 80):
|
|
self._endpoint = f"{scheme}://{self._endpoint}"
|
|
else:
|
|
self._endpoint = f"{scheme}://{self._endpoint}:{port}"
|
|
self._access_key = server.get("access_key", "")
|
|
self._secret_key = server.get("secret_key", "")
|
|
self._bucket = server.get("bucket", "")
|
|
self._use_ssl = server.get("use_ssl", True)
|
|
self._client = None
|
|
self._transfer_config = None
|
|
self._last_ok: float = 0 # timestamp of last successful operation
|
|
|
|
# -- lifecycle --------------------------------------------------------
|
|
|
|
def connect(self) -> bool:
|
|
try:
|
|
b3 = _get_boto3()
|
|
import botocore.config
|
|
import urllib3
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
config = botocore.config.Config(
|
|
signature_version="s3v4",
|
|
connect_timeout=15,
|
|
read_timeout=60,
|
|
retries={"max_attempts": 5, "mode": "adaptive"},
|
|
tcp_keepalive=True,
|
|
)
|
|
self._client = b3.client(
|
|
"s3",
|
|
endpoint_url=self._endpoint,
|
|
aws_access_key_id=self._access_key,
|
|
aws_secret_access_key=self._secret_key,
|
|
config=config,
|
|
verify=False,
|
|
)
|
|
self._transfer_config = _get_transfer_config()
|
|
# Test connection
|
|
self._client.list_buckets()
|
|
self._last_ok = time.time()
|
|
log.info("S3 connected %s", self._endpoint)
|
|
return True
|
|
except Exception as exc:
|
|
log.error("S3 connect failed: %s", exc)
|
|
self._client = None
|
|
return False
|
|
|
|
def _reconnect(self) -> bool:
|
|
"""Try to re-establish the S3 connection after a drop."""
|
|
log.warning("S3 reconnecting to %s...", self._endpoint)
|
|
self._client = None
|
|
return self.connect()
|
|
|
|
def _ensure_connected(self) -> bool:
|
|
"""Check connection, reconnect if needed.
|
|
Skips health-check if last success was <30s ago (avoids redundant RTTs).
|
|
"""
|
|
if self._client is None:
|
|
return self._reconnect()
|
|
if time.time() - self._last_ok < 30:
|
|
return True
|
|
try:
|
|
self._client.list_buckets()
|
|
self._last_ok = time.time()
|
|
return True
|
|
except Exception:
|
|
return self._reconnect()
|
|
|
|
def disconnect(self):
|
|
self._client = None
|
|
log.info("S3 disconnected")
|
|
|
|
def check_connection(self) -> bool:
|
|
try:
|
|
if self._client is None:
|
|
return False
|
|
self._client.list_buckets()
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
# -- bucket operations ------------------------------------------------
|
|
|
|
def list_buckets(self) -> list[dict]:
|
|
"""Return list of {'Name': str, 'CreationDate': datetime}."""
|
|
if not self._ensure_connected():
|
|
return []
|
|
try:
|
|
resp = self._client.list_buckets()
|
|
self._last_ok = time.time()
|
|
return resp.get("Buckets", [])
|
|
except Exception as exc:
|
|
log.error("S3 list_buckets failed: %s", exc)
|
|
return []
|
|
|
|
# -- object operations ------------------------------------------------
|
|
|
|
def list_objects(self, bucket: str = "", prefix: str = "",
|
|
delimiter: str = "/") -> tuple[list[dict], list[str]]:
|
|
"""List objects and common prefixes in a bucket/prefix.
|
|
|
|
Returns (objects, prefixes) where:
|
|
- objects: list of {'Key', 'Size', 'LastModified'}
|
|
- prefixes: list of prefix strings (subdirectories)
|
|
"""
|
|
if not self._ensure_connected():
|
|
return [], []
|
|
bucket = bucket or self._bucket
|
|
if not bucket:
|
|
return [], []
|
|
try:
|
|
objects = []
|
|
prefixes = []
|
|
paginator = self._client.get_paginator("list_objects_v2")
|
|
kwargs = {"Bucket": bucket, "Delimiter": delimiter}
|
|
if prefix:
|
|
kwargs["Prefix"] = prefix
|
|
for page in paginator.paginate(**kwargs):
|
|
for obj in page.get("Contents", []):
|
|
# Skip the prefix itself
|
|
if obj["Key"] != prefix:
|
|
objects.append(obj)
|
|
for cp in page.get("CommonPrefixes", []):
|
|
prefixes.append(cp["Prefix"])
|
|
self._last_ok = time.time()
|
|
return objects, prefixes
|
|
except Exception as exc:
|
|
log.error("S3 list_objects failed: %s", exc)
|
|
return [], []
|
|
|
|
def upload_file(self, local_path: str, bucket: str, key: str,
|
|
progress_cb=None, status_cb=None) -> bool:
|
|
"""Upload a local file to S3 with retry and resume.
|
|
|
|
progress_cb(bytes_transferred) — called periodically for progress bar.
|
|
status_cb(message) — called with status messages (retry info, etc.).
|
|
|
|
Uses multipart upload for files > 8 MB.
|
|
On failure, retries up to 10 times with exponential backoff.
|
|
boto3 multipart automatically resumes failed parts.
|
|
"""
|
|
if not self._ensure_connected():
|
|
return False
|
|
file_size = os.path.getsize(local_path)
|
|
for attempt in range(_MAX_RETRIES):
|
|
try:
|
|
self._client.upload_file(
|
|
local_path, bucket, key,
|
|
Config=self._transfer_config,
|
|
Callback=progress_cb,
|
|
)
|
|
log.info("S3 uploaded %s -> s3://%s/%s (%d bytes)",
|
|
local_path, bucket, key, file_size)
|
|
return True
|
|
except Exception as exc:
|
|
delay = _retry_delay(attempt)
|
|
log.warning("S3 upload attempt %d/%d failed: %s (retry in %.0fs)",
|
|
attempt + 1, _MAX_RETRIES, exc, delay)
|
|
if status_cb:
|
|
status_cb(f"Retry {attempt + 1}/{_MAX_RETRIES} in {delay:.0f}s...")
|
|
# Reset progress for retry (callback accumulates)
|
|
if progress_cb and attempt < _MAX_RETRIES - 1:
|
|
# We can't easily reset boto3's internal counter,
|
|
# but the GUI tracks total bytes itself
|
|
pass
|
|
time.sleep(delay)
|
|
# Reconnect before retry
|
|
if not self._reconnect():
|
|
log.error("S3 reconnect failed on attempt %d", attempt + 1)
|
|
continue
|
|
|
|
log.error("S3 upload failed after %d attempts: %s -> s3://%s/%s",
|
|
_MAX_RETRIES, local_path, bucket, key)
|
|
return False
|
|
|
|
def download_file(self, bucket: str, key: str, local_path: str,
|
|
progress_cb=None, status_cb=None) -> bool:
|
|
"""Download with resume support using S3 Range GET.
|
|
|
|
On disconnect, keeps the .s3part file and resumes from where it
|
|
stopped. ETag is checked to detect if the remote file changed
|
|
(in that case the partial file is discarded and download restarts).
|
|
|
|
progress_cb(bytes_delta) — called with each chunk size.
|
|
status_cb(message) — called with retry / resume info.
|
|
"""
|
|
if not self._ensure_connected():
|
|
return False
|
|
|
|
# --- 1. HEAD — get size and ETag ---
|
|
try:
|
|
head = self._client.head_object(Bucket=bucket, Key=key)
|
|
total_size = head["ContentLength"]
|
|
etag = head.get("ETag", "")
|
|
except Exception as exc:
|
|
log.error("S3 head_object failed: %s", exc)
|
|
return False
|
|
|
|
# Small files (< 1 MB) — simple download, no resume overhead
|
|
if total_size < 1024 * 1024:
|
|
return self._download_file_simple(
|
|
bucket, key, local_path, progress_cb, status_cb)
|
|
|
|
# --- 2. Check .s3part (partial download) ---
|
|
temp_path = local_path + ".s3part"
|
|
meta_path = local_path + ".s3meta"
|
|
start_byte = 0
|
|
|
|
if os.path.exists(temp_path):
|
|
saved_etag = ""
|
|
if os.path.exists(meta_path):
|
|
try:
|
|
with open(meta_path, "r") as f:
|
|
saved_etag = f.read().strip()
|
|
except Exception:
|
|
pass
|
|
if saved_etag == etag and etag:
|
|
start_byte = os.path.getsize(temp_path)
|
|
if start_byte >= total_size:
|
|
# Already fully downloaded
|
|
os.replace(temp_path, local_path)
|
|
self._cleanup_meta(meta_path)
|
|
self._last_ok = time.time()
|
|
return True
|
|
log.info("S3 resuming from byte %d / %d", start_byte, total_size)
|
|
if status_cb:
|
|
mb = start_byte / (1024 * 1024)
|
|
status_cb(f"Resuming from {mb:.1f} MB...")
|
|
else:
|
|
# ETag changed — file was modified on server, start fresh
|
|
try:
|
|
os.remove(temp_path)
|
|
except OSError:
|
|
pass
|
|
start_byte = 0
|
|
|
|
# Save ETag for future resume
|
|
try:
|
|
with open(meta_path, "w") as f:
|
|
f.write(etag)
|
|
except Exception:
|
|
pass
|
|
|
|
# Report already-downloaded bytes so progress bar is correct
|
|
if progress_cb and start_byte > 0:
|
|
progress_cb(start_byte)
|
|
|
|
# --- 3. Download loop with retry ---
|
|
chunk_size = _MULTIPART_CHUNKSIZE # 8 MB
|
|
|
|
for attempt in range(_MAX_RETRIES):
|
|
try:
|
|
if start_byte >= total_size:
|
|
break
|
|
|
|
range_header = f"bytes={start_byte}-"
|
|
resp = self._client.get_object(
|
|
Bucket=bucket, Key=key, Range=range_header)
|
|
|
|
with open(temp_path, "ab") as f:
|
|
for chunk in resp["Body"].iter_chunks(chunk_size=chunk_size):
|
|
f.write(chunk)
|
|
f.flush()
|
|
start_byte += len(chunk)
|
|
if progress_cb:
|
|
progress_cb(len(chunk))
|
|
|
|
# --- 4. Verify size ---
|
|
actual = os.path.getsize(temp_path)
|
|
if actual != total_size:
|
|
log.warning("S3 size mismatch: got %d, expected %d",
|
|
actual, total_size)
|
|
# Don't delete — maybe we can resume next attempt
|
|
if actual < total_size:
|
|
start_byte = actual
|
|
continue
|
|
# actual > total_size — corrupted, restart
|
|
try:
|
|
os.remove(temp_path)
|
|
except OSError:
|
|
pass
|
|
start_byte = 0
|
|
continue
|
|
|
|
# --- 5. Atomic rename ---
|
|
os.replace(temp_path, local_path)
|
|
self._cleanup_meta(meta_path)
|
|
self._last_ok = time.time()
|
|
log.info("S3 downloaded s3://%s/%s -> %s (%d bytes, resumed)",
|
|
bucket, key, local_path, total_size)
|
|
return True
|
|
|
|
except Exception as exc:
|
|
# Update start_byte from actual file size
|
|
if os.path.exists(temp_path):
|
|
start_byte = os.path.getsize(temp_path)
|
|
delay = _retry_delay(attempt)
|
|
log.warning("S3 download attempt %d/%d failed at byte %d: %s",
|
|
attempt + 1, _MAX_RETRIES, start_byte, exc)
|
|
if status_cb:
|
|
pct = (start_byte / total_size * 100) if total_size else 0
|
|
status_cb(f"Retry {attempt+1}/{_MAX_RETRIES} at {pct:.0f}%...")
|
|
time.sleep(delay)
|
|
self._reconnect()
|
|
# Adaptive chunk: reduce on repeated failures
|
|
if attempt >= 2 and chunk_size > 1024 * 1024:
|
|
chunk_size = 1024 * 1024 # 1 MB
|
|
log.info("S3 reducing chunk size to 1 MB")
|
|
|
|
log.error("S3 download failed after %d attempts: s3://%s/%s -> %s",
|
|
_MAX_RETRIES, bucket, key, local_path)
|
|
return False
|
|
|
|
def _download_file_simple(self, bucket: str, key: str, local_path: str,
|
|
progress_cb=None, status_cb=None) -> bool:
|
|
"""Simple download for small files (no resume overhead)."""
|
|
for attempt in range(_MAX_RETRIES):
|
|
try:
|
|
self._client.download_file(
|
|
bucket, key, local_path,
|
|
Config=self._transfer_config,
|
|
Callback=progress_cb,
|
|
)
|
|
self._last_ok = time.time()
|
|
log.info("S3 downloaded s3://%s/%s -> %s", bucket, key, local_path)
|
|
return True
|
|
except Exception as exc:
|
|
delay = _retry_delay(attempt)
|
|
log.warning("S3 download attempt %d/%d failed: %s (retry in %.0fs)",
|
|
attempt + 1, _MAX_RETRIES, exc, delay)
|
|
if status_cb:
|
|
status_cb(f"Retry {attempt + 1}/{_MAX_RETRIES} in {delay:.0f}s...")
|
|
time.sleep(delay)
|
|
if not self._reconnect():
|
|
log.error("S3 reconnect failed on attempt %d", attempt + 1)
|
|
continue
|
|
log.error("S3 download failed after %d attempts: s3://%s/%s -> %s",
|
|
_MAX_RETRIES, bucket, key, local_path)
|
|
return False
|
|
|
|
@staticmethod
|
|
def _cleanup_meta(meta_path: str):
|
|
"""Remove .s3meta file silently."""
|
|
try:
|
|
os.remove(meta_path)
|
|
except OSError:
|
|
pass
|
|
|
|
def delete_object(self, bucket: str, key: str) -> bool:
|
|
"""Delete an object from S3."""
|
|
if not self._ensure_connected():
|
|
return False
|
|
try:
|
|
self._client.delete_object(Bucket=bucket, Key=key)
|
|
log.info("S3 deleted s3://%s/%s", bucket, key)
|
|
return True
|
|
except Exception as exc:
|
|
log.error("S3 delete failed: %s", exc)
|
|
return False
|
|
|
|
def generate_presigned_url(self, bucket: str, key: str,
|
|
expires_in: int = 3600) -> str | None:
|
|
"""Generate a presigned download URL for an object.
|
|
|
|
expires_in: URL lifetime in seconds (default 1 hour).
|
|
Returns URL string or None on failure.
|
|
"""
|
|
if not self._ensure_connected():
|
|
return None
|
|
try:
|
|
url = self._client.generate_presigned_url(
|
|
"get_object",
|
|
Params={"Bucket": bucket, "Key": key},
|
|
ExpiresIn=expires_in,
|
|
)
|
|
return url
|
|
except Exception as exc:
|
|
log.error("S3 presigned URL failed: %s", exc)
|
|
return None
|
|
|
|
def list_all_objects(self, bucket: str, prefix: str = "") -> list[dict]:
|
|
"""List ALL objects under prefix recursively (no delimiter).
|
|
Returns list of {'Key', 'Size', 'LastModified'}.
|
|
"""
|
|
if not self._ensure_connected():
|
|
return []
|
|
try:
|
|
objects = []
|
|
paginator = self._client.get_paginator("list_objects_v2")
|
|
kwargs = {"Bucket": bucket}
|
|
if prefix:
|
|
kwargs["Prefix"] = prefix
|
|
for page in paginator.paginate(**kwargs):
|
|
for obj in page.get("Contents", []):
|
|
# Skip "folder" markers (zero-byte keys ending with /)
|
|
if obj["Key"].endswith("/") and obj.get("Size", 0) == 0:
|
|
continue
|
|
objects.append(obj)
|
|
self._last_ok = time.time()
|
|
return objects
|
|
except Exception as exc:
|
|
log.error("S3 list_all_objects failed: %s", exc)
|
|
return []
|
|
|
|
def delete_prefix(self, bucket: str, prefix: str) -> int:
|
|
"""Recursively delete all objects under a prefix. Returns count deleted."""
|
|
if not self._ensure_connected():
|
|
return 0
|
|
try:
|
|
deleted = 0
|
|
paginator = self._client.get_paginator("list_objects_v2")
|
|
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
|
|
objects = page.get("Contents", [])
|
|
if not objects:
|
|
continue
|
|
delete_req = {
|
|
"Objects": [{"Key": obj["Key"]} for obj in objects],
|
|
"Quiet": True,
|
|
}
|
|
self._client.delete_objects(Bucket=bucket, Delete=delete_req)
|
|
deleted += len(objects)
|
|
self._last_ok = time.time()
|
|
log.info("S3 deleted prefix s3://%s/%s (%d objects)", bucket, prefix, deleted)
|
|
return deleted
|
|
except Exception as exc:
|
|
log.error("S3 delete prefix failed: %s", exc)
|
|
return 0
|
|
|
|
def create_folder(self, bucket: str, key: str) -> bool:
|
|
"""Create a folder (empty object with trailing slash) in S3."""
|
|
if not self._ensure_connected():
|
|
return False
|
|
try:
|
|
self._client.put_object(Bucket=bucket, Key=key, Body=b"")
|
|
self._last_ok = time.time()
|
|
log.info("S3 created folder s3://%s/%s", bucket, key)
|
|
return True
|
|
except Exception as exc:
|
|
log.error("S3 create folder failed: %s", exc)
|
|
return False
|
|
|
|
def get_direct_url(self, bucket: str, key: str) -> str:
|
|
"""Build a direct (permanent) URL: endpoint/bucket/key."""
|
|
endpoint = self._endpoint.rstrip("/")
|
|
return f"{endpoint}/{bucket}/{key}"
|
|
|
|
def get_object_size(self, bucket: str, key: str) -> int:
|
|
"""Get size of an object in bytes."""
|
|
if not self._ensure_connected():
|
|
return 0
|
|
try:
|
|
resp = self._client.head_object(Bucket=bucket, Key=key)
|
|
return resp.get("ContentLength", 0)
|
|
except Exception:
|
|
return 0
|
|
|
|
def create_bucket(self, bucket_name: str) -> bool:
|
|
"""Create a new S3 bucket."""
|
|
if not self._ensure_connected():
|
|
return False
|
|
try:
|
|
self._client.create_bucket(Bucket=bucket_name)
|
|
self._last_ok = time.time()
|
|
log.info("S3 bucket created: %s", bucket_name)
|
|
return True
|
|
except Exception as exc:
|
|
log.error("S3 create_bucket failed: %s", exc)
|
|
return False
|
|
|
|
def delete_bucket(self, bucket_name: str) -> bool:
|
|
"""Delete an empty S3 bucket."""
|
|
if not self._ensure_connected():
|
|
return False
|
|
try:
|
|
self._client.delete_bucket(Bucket=bucket_name)
|
|
self._last_ok = time.time()
|
|
log.info("S3 bucket deleted: %s", bucket_name)
|
|
return True
|
|
except Exception as exc:
|
|
log.error("S3 delete_bucket failed: %s", exc)
|
|
return False
|