""" S3 client wrapper — duck-typed, lazy-imports boto3 module. Works with any S3-compatible storage (AWS, MinIO, etc.). Resilience features: - Adaptive retry with exponential backoff (up to 10 attempts) - Multipart upload/download with configurable chunk size - Auto-reconnect on connection loss (network switch, Wi-Fi change) - boto3 TransferConfig tuned for unstable connections """ import os import time from core.logger import log _boto3 = None _botocore = None # Retry / resilience constants _MAX_RETRIES = 10 _BASE_DELAY = 2.0 # seconds _MAX_DELAY = 60.0 # seconds _MULTIPART_THRESHOLD = 8 * 1024 * 1024 # 8 MB — use multipart above this _MULTIPART_CHUNKSIZE = 8 * 1024 * 1024 # 8 MB chunks _MAX_CONCURRENCY = 4 # parallel parts (low for unstable links) def _get_boto3(): global _boto3, _botocore if _boto3 is None: import boto3 as _b import botocore as _bc _boto3 = _b _botocore = _bc return _boto3 def _get_transfer_config(): """TransferConfig tuned for unreliable connections.""" from boto3.s3.transfer import TransferConfig return TransferConfig( multipart_threshold=_MULTIPART_THRESHOLD, multipart_chunksize=_MULTIPART_CHUNKSIZE, max_concurrency=_MAX_CONCURRENCY, num_download_attempts=_MAX_RETRIES, ) def _retry_delay(attempt: int) -> float: """Exponential backoff: 2, 4, 8, 16, 32, 60, 60, ...""" delay = min(_BASE_DELAY * (2 ** attempt), _MAX_DELAY) return delay class S3Client: """Manage a single S3 connection. No ABC — duck typing.""" def __init__(self, server: dict): self._server = server self._endpoint = server.get("ip", "") # If endpoint doesn't start with http, add https if self._endpoint and not self._endpoint.startswith("http"): use_ssl = server.get("use_ssl", True) scheme = "https" if use_ssl else "http" port = int(server.get("port", 443)) if (scheme == "https" and port == 443) or (scheme == "http" and port == 80): self._endpoint = f"{scheme}://{self._endpoint}" else: self._endpoint = f"{scheme}://{self._endpoint}:{port}" self._access_key = server.get("access_key", "") self._secret_key = server.get("secret_key", "") self._bucket = server.get("bucket", "") self._use_ssl = server.get("use_ssl", True) self._client = None self._transfer_config = None self._last_ok: float = 0 # timestamp of last successful operation # -- lifecycle -------------------------------------------------------- def connect(self) -> bool: try: b3 = _get_boto3() import botocore.config import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) config = botocore.config.Config( signature_version="s3v4", connect_timeout=15, read_timeout=60, retries={"max_attempts": 5, "mode": "adaptive"}, tcp_keepalive=True, ) self._client = b3.client( "s3", endpoint_url=self._endpoint, aws_access_key_id=self._access_key, aws_secret_access_key=self._secret_key, config=config, verify=False, ) self._transfer_config = _get_transfer_config() # Test connection self._client.list_buckets() self._last_ok = time.time() log.info("S3 connected %s", self._endpoint) return True except Exception as exc: log.error("S3 connect failed: %s", exc) self._client = None return False def _reconnect(self) -> bool: """Try to re-establish the S3 connection after a drop.""" log.warning("S3 reconnecting to %s...", self._endpoint) self._client = None return self.connect() def _ensure_connected(self) -> bool: """Check connection, reconnect if needed. Skips health-check if last success was <30s ago (avoids redundant RTTs). """ if self._client is None: return self._reconnect() if time.time() - self._last_ok < 30: return True try: self._client.list_buckets() self._last_ok = time.time() return True except Exception: return self._reconnect() def disconnect(self): self._client = None log.info("S3 disconnected") def check_connection(self) -> bool: try: if self._client is None: return False self._client.list_buckets() return True except Exception: return False # -- bucket operations ------------------------------------------------ def list_buckets(self) -> list[dict]: """Return list of {'Name': str, 'CreationDate': datetime}.""" if not self._ensure_connected(): return [] try: resp = self._client.list_buckets() self._last_ok = time.time() return resp.get("Buckets", []) except Exception as exc: log.error("S3 list_buckets failed: %s", exc) return [] # -- object operations ------------------------------------------------ def list_objects(self, bucket: str = "", prefix: str = "", delimiter: str = "/") -> tuple[list[dict], list[str]]: """List objects and common prefixes in a bucket/prefix. Returns (objects, prefixes) where: - objects: list of {'Key', 'Size', 'LastModified'} - prefixes: list of prefix strings (subdirectories) """ if not self._ensure_connected(): return [], [] bucket = bucket or self._bucket if not bucket: return [], [] try: objects = [] prefixes = [] paginator = self._client.get_paginator("list_objects_v2") kwargs = {"Bucket": bucket, "Delimiter": delimiter} if prefix: kwargs["Prefix"] = prefix for page in paginator.paginate(**kwargs): for obj in page.get("Contents", []): # Skip the prefix itself if obj["Key"] != prefix: objects.append(obj) for cp in page.get("CommonPrefixes", []): prefixes.append(cp["Prefix"]) self._last_ok = time.time() return objects, prefixes except Exception as exc: log.error("S3 list_objects failed: %s", exc) return [], [] def upload_file(self, local_path: str, bucket: str, key: str, progress_cb=None, status_cb=None) -> bool: """Upload a local file to S3 with retry and resume. progress_cb(bytes_transferred) — called periodically for progress bar. status_cb(message) — called with status messages (retry info, etc.). Uses multipart upload for files > 8 MB. On failure, retries up to 10 times with exponential backoff. boto3 multipart automatically resumes failed parts. """ if not self._ensure_connected(): return False file_size = os.path.getsize(local_path) for attempt in range(_MAX_RETRIES): try: self._client.upload_file( local_path, bucket, key, Config=self._transfer_config, Callback=progress_cb, ) log.info("S3 uploaded %s -> s3://%s/%s (%d bytes)", local_path, bucket, key, file_size) return True except Exception as exc: delay = _retry_delay(attempt) log.warning("S3 upload attempt %d/%d failed: %s (retry in %.0fs)", attempt + 1, _MAX_RETRIES, exc, delay) if status_cb: status_cb(f"Retry {attempt + 1}/{_MAX_RETRIES} in {delay:.0f}s...") # Reset progress for retry (callback accumulates) if progress_cb and attempt < _MAX_RETRIES - 1: # We can't easily reset boto3's internal counter, # but the GUI tracks total bytes itself pass time.sleep(delay) # Reconnect before retry if not self._reconnect(): log.error("S3 reconnect failed on attempt %d", attempt + 1) continue log.error("S3 upload failed after %d attempts: %s -> s3://%s/%s", _MAX_RETRIES, local_path, bucket, key) return False def download_file(self, bucket: str, key: str, local_path: str, progress_cb=None, status_cb=None) -> bool: """Download with resume support using S3 Range GET. On disconnect, keeps the .s3part file and resumes from where it stopped. ETag is checked to detect if the remote file changed (in that case the partial file is discarded and download restarts). progress_cb(bytes_delta) — called with each chunk size. status_cb(message) — called with retry / resume info. """ if not self._ensure_connected(): return False # --- 1. HEAD — get size and ETag --- try: head = self._client.head_object(Bucket=bucket, Key=key) total_size = head["ContentLength"] etag = head.get("ETag", "") except Exception as exc: log.error("S3 head_object failed: %s", exc) return False # Small files (< 1 MB) — simple download, no resume overhead if total_size < 1024 * 1024: return self._download_file_simple( bucket, key, local_path, progress_cb, status_cb) # --- 2. Check .s3part (partial download) --- temp_path = local_path + ".s3part" meta_path = local_path + ".s3meta" start_byte = 0 if os.path.exists(temp_path): saved_etag = "" if os.path.exists(meta_path): try: with open(meta_path, "r") as f: saved_etag = f.read().strip() except Exception: pass if saved_etag == etag and etag: start_byte = os.path.getsize(temp_path) if start_byte >= total_size: # Already fully downloaded os.replace(temp_path, local_path) self._cleanup_meta(meta_path) self._last_ok = time.time() return True log.info("S3 resuming from byte %d / %d", start_byte, total_size) if status_cb: mb = start_byte / (1024 * 1024) status_cb(f"Resuming from {mb:.1f} MB...") else: # ETag changed — file was modified on server, start fresh try: os.remove(temp_path) except OSError: pass start_byte = 0 # Save ETag for future resume try: with open(meta_path, "w") as f: f.write(etag) except Exception: pass # Report already-downloaded bytes so progress bar is correct if progress_cb and start_byte > 0: progress_cb(start_byte) # --- 3. Download loop with retry --- chunk_size = _MULTIPART_CHUNKSIZE # 8 MB for attempt in range(_MAX_RETRIES): try: if start_byte >= total_size: break range_header = f"bytes={start_byte}-" resp = self._client.get_object( Bucket=bucket, Key=key, Range=range_header) with open(temp_path, "ab") as f: for chunk in resp["Body"].iter_chunks(chunk_size=chunk_size): f.write(chunk) f.flush() start_byte += len(chunk) if progress_cb: progress_cb(len(chunk)) # --- 4. Verify size --- actual = os.path.getsize(temp_path) if actual != total_size: log.warning("S3 size mismatch: got %d, expected %d", actual, total_size) # Don't delete — maybe we can resume next attempt if actual < total_size: start_byte = actual continue # actual > total_size — corrupted, restart try: os.remove(temp_path) except OSError: pass start_byte = 0 continue # --- 5. Atomic rename --- os.replace(temp_path, local_path) self._cleanup_meta(meta_path) self._last_ok = time.time() log.info("S3 downloaded s3://%s/%s -> %s (%d bytes, resumed)", bucket, key, local_path, total_size) return True except Exception as exc: # Update start_byte from actual file size if os.path.exists(temp_path): start_byte = os.path.getsize(temp_path) delay = _retry_delay(attempt) log.warning("S3 download attempt %d/%d failed at byte %d: %s", attempt + 1, _MAX_RETRIES, start_byte, exc) if status_cb: pct = (start_byte / total_size * 100) if total_size else 0 status_cb(f"Retry {attempt+1}/{_MAX_RETRIES} at {pct:.0f}%...") time.sleep(delay) self._reconnect() # Adaptive chunk: reduce on repeated failures if attempt >= 2 and chunk_size > 1024 * 1024: chunk_size = 1024 * 1024 # 1 MB log.info("S3 reducing chunk size to 1 MB") log.error("S3 download failed after %d attempts: s3://%s/%s -> %s", _MAX_RETRIES, bucket, key, local_path) return False def _download_file_simple(self, bucket: str, key: str, local_path: str, progress_cb=None, status_cb=None) -> bool: """Simple download for small files (no resume overhead).""" for attempt in range(_MAX_RETRIES): try: self._client.download_file( bucket, key, local_path, Config=self._transfer_config, Callback=progress_cb, ) self._last_ok = time.time() log.info("S3 downloaded s3://%s/%s -> %s", bucket, key, local_path) return True except Exception as exc: delay = _retry_delay(attempt) log.warning("S3 download attempt %d/%d failed: %s (retry in %.0fs)", attempt + 1, _MAX_RETRIES, exc, delay) if status_cb: status_cb(f"Retry {attempt + 1}/{_MAX_RETRIES} in {delay:.0f}s...") time.sleep(delay) if not self._reconnect(): log.error("S3 reconnect failed on attempt %d", attempt + 1) continue log.error("S3 download failed after %d attempts: s3://%s/%s -> %s", _MAX_RETRIES, bucket, key, local_path) return False @staticmethod def _cleanup_meta(meta_path: str): """Remove .s3meta file silently.""" try: os.remove(meta_path) except OSError: pass def delete_object(self, bucket: str, key: str) -> bool: """Delete an object from S3.""" if not self._ensure_connected(): return False try: self._client.delete_object(Bucket=bucket, Key=key) log.info("S3 deleted s3://%s/%s", bucket, key) return True except Exception as exc: log.error("S3 delete failed: %s", exc) return False def generate_presigned_url(self, bucket: str, key: str, expires_in: int = 3600) -> str | None: """Generate a presigned download URL for an object. expires_in: URL lifetime in seconds (default 1 hour). Returns URL string or None on failure. """ if not self._ensure_connected(): return None try: url = self._client.generate_presigned_url( "get_object", Params={"Bucket": bucket, "Key": key}, ExpiresIn=expires_in, ) return url except Exception as exc: log.error("S3 presigned URL failed: %s", exc) return None def list_all_objects(self, bucket: str, prefix: str = "") -> list[dict]: """List ALL objects under prefix recursively (no delimiter). Returns list of {'Key', 'Size', 'LastModified'}. """ if not self._ensure_connected(): return [] try: objects = [] paginator = self._client.get_paginator("list_objects_v2") kwargs = {"Bucket": bucket} if prefix: kwargs["Prefix"] = prefix for page in paginator.paginate(**kwargs): for obj in page.get("Contents", []): # Skip "folder" markers (zero-byte keys ending with /) if obj["Key"].endswith("/") and obj.get("Size", 0) == 0: continue objects.append(obj) self._last_ok = time.time() return objects except Exception as exc: log.error("S3 list_all_objects failed: %s", exc) return [] def delete_prefix(self, bucket: str, prefix: str) -> int: """Recursively delete all objects under a prefix. Returns count deleted.""" if not self._ensure_connected(): return 0 try: deleted = 0 paginator = self._client.get_paginator("list_objects_v2") for page in paginator.paginate(Bucket=bucket, Prefix=prefix): objects = page.get("Contents", []) if not objects: continue delete_req = { "Objects": [{"Key": obj["Key"]} for obj in objects], "Quiet": True, } self._client.delete_objects(Bucket=bucket, Delete=delete_req) deleted += len(objects) self._last_ok = time.time() log.info("S3 deleted prefix s3://%s/%s (%d objects)", bucket, prefix, deleted) return deleted except Exception as exc: log.error("S3 delete prefix failed: %s", exc) return 0 def create_folder(self, bucket: str, key: str) -> bool: """Create a folder (empty object with trailing slash) in S3.""" if not self._ensure_connected(): return False try: self._client.put_object(Bucket=bucket, Key=key, Body=b"") self._last_ok = time.time() log.info("S3 created folder s3://%s/%s", bucket, key) return True except Exception as exc: log.error("S3 create folder failed: %s", exc) return False def get_direct_url(self, bucket: str, key: str) -> str: """Build a direct (permanent) URL: endpoint/bucket/key.""" endpoint = self._endpoint.rstrip("/") return f"{endpoint}/{bucket}/{key}" def get_object_size(self, bucket: str, key: str) -> int: """Get size of an object in bytes.""" if not self._ensure_connected(): return 0 try: resp = self._client.head_object(Bucket=bucket, Key=key) return resp.get("ContentLength", 0) except Exception: return 0 def create_bucket(self, bucket_name: str) -> bool: """Create a new S3 bucket.""" if not self._ensure_connected(): return False try: self._client.create_bucket(Bucket=bucket_name) self._last_ok = time.time() log.info("S3 bucket created: %s", bucket_name) return True except Exception as exc: log.error("S3 create_bucket failed: %s", exc) return False def delete_bucket(self, bucket_name: str) -> bool: """Delete an empty S3 bucket.""" if not self._ensure_connected(): return False try: self._client.delete_bucket(Bucket=bucket_name) self._last_ok = time.time() log.info("S3 bucket deleted: %s", bucket_name) return True except Exception as exc: log.error("S3 delete_bucket failed: %s", exc) return False