Files
server-manager/core/s3_client.py
chrome-storm-c442 9b0e4c76a3 v1.9.0: S3 server type — bucket/object browser, drag-and-drop upload, resilient transfers
New server type: S3 (MinIO, AWS, any S3-compatible storage)
- core/s3_client.py: boto3 client with auto-reconnect, 10 retries, exponential backoff, multipart upload/download, tcp_keepalive
- gui/tabs/s3_tab.py: object browser (Treeview), bucket selector, folder navigation, drag-and-drop upload from Explorer (windnd), progress bar with %, multi-file upload
- CLI: --s3-buckets, --s3-ls, --s3-upload, --s3-download, --s3-delete with retry
- ServerDialog: access_key, secret_key, bucket fields
- Registration: server_store, connection_factory, status_checker, icons, app, i18n (EN/RU/ZH)
- Fix: build.py cleanup_old_releases now sorts by semver (was lexicographic, broke v1.8.100+)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-03 06:32:03 -05:00

289 lines
11 KiB
Python

"""
S3 client wrapper — duck-typed, lazy-imports boto3 module.
Works with any S3-compatible storage (AWS, MinIO, etc.).
Resilience features:
- Adaptive retry with exponential backoff (up to 10 attempts)
- Multipart upload/download with configurable chunk size
- Auto-reconnect on connection loss (network switch, Wi-Fi change)
- boto3 TransferConfig tuned for unstable connections
"""
import os
import time
from core.logger import log
_boto3 = None
_botocore = None
# Retry / resilience constants
_MAX_RETRIES = 10
_BASE_DELAY = 2.0 # seconds
_MAX_DELAY = 60.0 # seconds
_MULTIPART_THRESHOLD = 8 * 1024 * 1024 # 8 MB — use multipart above this
_MULTIPART_CHUNKSIZE = 8 * 1024 * 1024 # 8 MB chunks
_MAX_CONCURRENCY = 4 # parallel parts (low for unstable links)
def _get_boto3():
global _boto3, _botocore
if _boto3 is None:
import boto3 as _b
import botocore as _bc
_boto3 = _b
_botocore = _bc
return _boto3
def _get_transfer_config():
"""TransferConfig tuned for unreliable connections."""
from boto3.s3.transfer import TransferConfig
return TransferConfig(
multipart_threshold=_MULTIPART_THRESHOLD,
multipart_chunksize=_MULTIPART_CHUNKSIZE,
max_concurrency=_MAX_CONCURRENCY,
num_download_attempts=_MAX_RETRIES,
)
def _retry_delay(attempt: int) -> float:
"""Exponential backoff: 2, 4, 8, 16, 32, 60, 60, ..."""
delay = min(_BASE_DELAY * (2 ** attempt), _MAX_DELAY)
return delay
class S3Client:
"""Manage a single S3 connection. No ABC — duck typing."""
def __init__(self, server: dict):
self._server = server
self._endpoint = server.get("ip", "")
# If endpoint doesn't start with http, add https
if self._endpoint and not self._endpoint.startswith("http"):
use_ssl = server.get("use_ssl", True)
scheme = "https" if use_ssl else "http"
port = int(server.get("port", 443))
if (scheme == "https" and port == 443) or (scheme == "http" and port == 80):
self._endpoint = f"{scheme}://{self._endpoint}"
else:
self._endpoint = f"{scheme}://{self._endpoint}:{port}"
self._access_key = server.get("access_key", "")
self._secret_key = server.get("secret_key", "")
self._bucket = server.get("bucket", "")
self._use_ssl = server.get("use_ssl", True)
self._client = None
self._transfer_config = None
# -- lifecycle --------------------------------------------------------
def connect(self) -> bool:
try:
b3 = _get_boto3()
import botocore.config
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
config = botocore.config.Config(
signature_version="s3v4",
connect_timeout=15,
read_timeout=60,
retries={"max_attempts": 5, "mode": "adaptive"},
tcp_keepalive=True,
)
self._client = b3.client(
"s3",
endpoint_url=self._endpoint,
aws_access_key_id=self._access_key,
aws_secret_access_key=self._secret_key,
config=config,
verify=False,
)
self._transfer_config = _get_transfer_config()
# Test connection
self._client.list_buckets()
log.info("S3 connected %s", self._endpoint)
return True
except Exception as exc:
log.error("S3 connect failed: %s", exc)
self._client = None
return False
def _reconnect(self) -> bool:
"""Try to re-establish the S3 connection after a drop."""
log.warning("S3 reconnecting to %s...", self._endpoint)
self._client = None
return self.connect()
def _ensure_connected(self) -> bool:
"""Check connection, reconnect if needed."""
if self._client is None:
return self._reconnect()
try:
self._client.list_buckets()
return True
except Exception:
return self._reconnect()
def disconnect(self):
self._client = None
log.info("S3 disconnected")
def check_connection(self) -> bool:
try:
if self._client is None:
return False
self._client.list_buckets()
return True
except Exception:
return False
# -- bucket operations ------------------------------------------------
def list_buckets(self) -> list[dict]:
"""Return list of {'Name': str, 'CreationDate': datetime}."""
if not self._ensure_connected():
return []
try:
resp = self._client.list_buckets()
return resp.get("Buckets", [])
except Exception as exc:
log.error("S3 list_buckets failed: %s", exc)
return []
# -- object operations ------------------------------------------------
def list_objects(self, bucket: str = "", prefix: str = "",
delimiter: str = "/") -> tuple[list[dict], list[str]]:
"""List objects and common prefixes in a bucket/prefix.
Returns (objects, prefixes) where:
- objects: list of {'Key', 'Size', 'LastModified'}
- prefixes: list of prefix strings (subdirectories)
"""
if not self._ensure_connected():
return [], []
bucket = bucket or self._bucket
if not bucket:
return [], []
try:
objects = []
prefixes = []
paginator = self._client.get_paginator("list_objects_v2")
kwargs = {"Bucket": bucket, "Delimiter": delimiter}
if prefix:
kwargs["Prefix"] = prefix
for page in paginator.paginate(**kwargs):
for obj in page.get("Contents", []):
# Skip the prefix itself
if obj["Key"] != prefix:
objects.append(obj)
for cp in page.get("CommonPrefixes", []):
prefixes.append(cp["Prefix"])
return objects, prefixes
except Exception as exc:
log.error("S3 list_objects failed: %s", exc)
return [], []
def upload_file(self, local_path: str, bucket: str, key: str,
progress_cb=None, status_cb=None) -> bool:
"""Upload a local file to S3 with retry and resume.
progress_cb(bytes_transferred) — called periodically for progress bar.
status_cb(message) — called with status messages (retry info, etc.).
Uses multipart upload for files > 8 MB.
On failure, retries up to 10 times with exponential backoff.
boto3 multipart automatically resumes failed parts.
"""
if not self._ensure_connected():
return False
file_size = os.path.getsize(local_path)
for attempt in range(_MAX_RETRIES):
try:
self._client.upload_file(
local_path, bucket, key,
Config=self._transfer_config,
Callback=progress_cb,
)
log.info("S3 uploaded %s -> s3://%s/%s (%d bytes)",
local_path, bucket, key, file_size)
return True
except Exception as exc:
delay = _retry_delay(attempt)
log.warning("S3 upload attempt %d/%d failed: %s (retry in %.0fs)",
attempt + 1, _MAX_RETRIES, exc, delay)
if status_cb:
status_cb(f"Retry {attempt + 1}/{_MAX_RETRIES} in {delay:.0f}s...")
# Reset progress for retry (callback accumulates)
if progress_cb and attempt < _MAX_RETRIES - 1:
# We can't easily reset boto3's internal counter,
# but the GUI tracks total bytes itself
pass
time.sleep(delay)
# Reconnect before retry
if not self._reconnect():
log.error("S3 reconnect failed on attempt %d", attempt + 1)
continue
log.error("S3 upload failed after %d attempts: %s -> s3://%s/%s",
_MAX_RETRIES, local_path, bucket, key)
return False
def download_file(self, bucket: str, key: str, local_path: str,
progress_cb=None, status_cb=None) -> bool:
"""Download an S3 object to a local file with retry.
progress_cb(bytes_transferred) — called periodically.
status_cb(message) — called with retry info.
boto3 TransferConfig.num_download_attempts handles part-level retries.
This method adds full-transfer retries with reconnect.
"""
if not self._ensure_connected():
return False
for attempt in range(_MAX_RETRIES):
try:
self._client.download_file(
bucket, key, local_path,
Config=self._transfer_config,
Callback=progress_cb,
)
log.info("S3 downloaded s3://%s/%s -> %s", bucket, key, local_path)
return True
except Exception as exc:
delay = _retry_delay(attempt)
log.warning("S3 download attempt %d/%d failed: %s (retry in %.0fs)",
attempt + 1, _MAX_RETRIES, exc, delay)
if status_cb:
status_cb(f"Retry {attempt + 1}/{_MAX_RETRIES} in {delay:.0f}s...")
time.sleep(delay)
if not self._reconnect():
log.error("S3 reconnect failed on attempt %d", attempt + 1)
continue
log.error("S3 download failed after %d attempts: s3://%s/%s -> %s",
_MAX_RETRIES, bucket, key, local_path)
return False
def delete_object(self, bucket: str, key: str) -> bool:
"""Delete an object from S3."""
if not self._ensure_connected():
return False
try:
self._client.delete_object(Bucket=bucket, Key=key)
log.info("S3 deleted s3://%s/%s", bucket, key)
return True
except Exception as exc:
log.error("S3 delete failed: %s", exc)
return False
def get_object_size(self, bucket: str, key: str) -> int:
"""Get size of an object in bytes."""
if not self._ensure_connected():
return 0
try:
resp = self._client.head_object(Bucket=bucket, Key=key)
return resp.get("ContentLength", 0)
except Exception:
return 0