Files
server-manager/gui/tabs/s3_tab.py
chrome-storm-c442 1e729fcf3a v1.9.1: PNG Material Design icons — 28 icons, dark/light theme, HiDPI, graceful Unicode fallback
- 56 PNG icons (28 unique × 2 color variants) from Material Design Icons (round style, 96×96px)
- core/icons.py: ctk_icon(), make_icon_button(), reconfigure_icon_button() with CTkImage cache
- Updated 15 GUI files: app.py, sidebar.py, server_dialog.py, all tabs
- build.py: auto-include assets/icons/ in PyInstaller bundle, patch rollover at 99→minor+1
- tools/download_icons.py: icon download script
- Automatic dark↔light theme switching via CTkImage dual-image support

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-03 07:27:49 -05:00

544 lines
19 KiB
Python

"""
S3 tab — bucket/object browser with upload, download, delete actions.
Supports drag-and-drop from OS file manager for upload.
"""
import os
import sys
import tempfile
import threading
from tkinter import ttk, filedialog
import customtkinter as ctk
from core.s3_client import S3Client
from core.i18n import t
from core.icons import icon_text, make_icon_button
from gui.tabs.query_tab import apply_dark_scrollbar_style
def _human_size(size_bytes: int) -> str:
"""Format bytes to human-readable string."""
if size_bytes < 1024:
return f"{size_bytes} B"
for unit in ("KB", "MB", "GB", "TB"):
size_bytes /= 1024
if size_bytes < 1024:
return f"{size_bytes:.1f} {unit}"
return f"{size_bytes:.1f} PB"
def _setup_drop(widget, callback):
"""Setup OS drag-and-drop onto a widget. Cross-platform with graceful fallback."""
if sys.platform == "win32":
try:
import windnd
windnd.hook_dropfiles(widget, func=callback)
return True
except Exception:
pass
# tkinterdnd2 fallback (works on Linux/macOS if installed)
try:
widget.drop_target_register("DND_Files")
widget.dnd_bind("<<Drop>>", lambda e: callback(_parse_dnd_paths(e.data)))
return True
except Exception:
pass
return False
def _parse_dnd_paths(data: str) -> list[str]:
"""Parse tkinterdnd2 drop data into list of file paths."""
paths = []
# tkinterdnd2 wraps paths with spaces in braces: {C:/path with spaces/file.txt}
i = 0
while i < len(data):
if data[i] == '{':
end = data.index('}', i)
paths.append(data[i + 1:end])
i = end + 2 # skip } and space
elif data[i] == ' ':
i += 1
else:
end = data.find(' ', i)
if end == -1:
end = len(data)
paths.append(data[i:end])
i = end + 1
return paths
class S3Tab(ctk.CTkFrame):
def __init__(self, master, store):
super().__init__(master, fg_color="transparent")
self.store = store
self._current_alias: str | None = None
self._client: S3Client | None = None
self._current_bucket: str = ""
self._current_prefix: str = ""
self._nav_stack: list[str] = []
self._dnd_active = False
self._build_ui()
def _build_ui(self):
apply_dark_scrollbar_style()
# ── Header ──
header = ctk.CTkFrame(self, fg_color="transparent")
header.pack(fill="x", padx=15, pady=(15, 5))
self._title_label = ctk.CTkLabel(
header, text=t("s3_objects"),
font=ctk.CTkFont(size=18, weight="bold"),
)
self._title_label.pack(side="left")
self._status_label = ctk.CTkLabel(
header, text="", font=ctk.CTkFont(size=12),
text_color="#9ca3af",
)
self._status_label.pack(side="left", padx=(15, 0))
# Buttons
btn_frame = ctk.CTkFrame(header, fg_color="transparent")
btn_frame.pack(side="right")
self._back_btn = make_icon_button(
btn_frame, "back", t("s3_back"), width=80,
command=self._go_back, state="disabled",
)
self._back_btn.pack(side="left", padx=(0, 5))
self._refresh_btn = make_icon_button(
btn_frame, "refresh", t("s3_refresh"), width=100,
command=self._refresh,
)
self._refresh_btn.pack(side="left", padx=(0, 5))
self._upload_btn = make_icon_button(
btn_frame, "upload", t("s3_upload"), width=100,
command=self._upload,
)
self._upload_btn.pack(side="left", padx=(0, 5))
self._download_btn = make_icon_button(
btn_frame, "download", t("s3_download"), width=110,
command=self._download,
)
self._download_btn.pack(side="left", padx=(0, 5))
self._delete_btn = make_icon_button(
btn_frame, "delete", t("s3_delete"), width=100,
fg_color="#dc2626", hover_color="#b91c1c",
command=self._delete,
)
self._delete_btn.pack(side="left")
# ── Bucket selector row ──
bucket_frame = ctk.CTkFrame(self, fg_color="transparent")
bucket_frame.pack(fill="x", padx=15, pady=(5, 5))
ctk.CTkLabel(bucket_frame, text=t("s3_bucket"),
font=ctk.CTkFont(size=12, weight="bold")).pack(side="left", padx=(0, 5))
self._bucket_var = ctk.StringVar(value="")
self._bucket_menu = ctk.CTkOptionMenu(
bucket_frame, variable=self._bucket_var, values=[""],
width=200, command=self._on_bucket_change,
)
self._bucket_menu.pack(side="left", padx=(0, 15))
# Path display
self._path_label = ctk.CTkLabel(
bucket_frame, text="/", font=ctk.CTkFont(family="Consolas", size=12),
text_color="#60a5fa",
)
self._path_label.pack(side="left", fill="x", expand=True)
# ── Progress bar (hidden by default) ──
self._progress_frame = ctk.CTkFrame(self, fg_color="transparent")
# Don't pack yet — shown only during transfer
self._progress_label = ctk.CTkLabel(
self._progress_frame, text="", font=ctk.CTkFont(size=11),
text_color="#9ca3af",
)
self._progress_label.pack(side="left", padx=(0, 10))
self._progress_bar = ctk.CTkProgressBar(self._progress_frame, width=300, height=14)
self._progress_bar.set(0)
self._progress_bar.pack(side="left", fill="x", expand=True)
self._progress_pct = ctk.CTkLabel(
self._progress_frame, text="0%", font=ctk.CTkFont(size=11, weight="bold"),
text_color="#60a5fa", width=45,
)
self._progress_pct.pack(side="left", padx=(10, 0))
self._transfer_bytes = 0
self._transfer_total = 0
# ── Treeview for objects ──
self._tree_frame = ctk.CTkFrame(self, fg_color="#1e1e1e", corner_radius=8)
self._tree_frame.pack(fill="both", expand=True, padx=15, pady=(5, 15))
columns = ("name", "size", "modified")
self._tree = ttk.Treeview(
self._tree_frame, columns=columns, show="headings",
selectmode="browse", style="Dark.Treeview",
)
self._tree.heading("name", text=t("s3_col_name"))
self._tree.heading("size", text=t("s3_col_size"))
self._tree.heading("modified", text=t("s3_col_modified"))
self._tree.column("name", width=400, minwidth=200)
self._tree.column("size", width=100, minwidth=80, anchor="e")
self._tree.column("modified", width=180, minwidth=120)
scrollbar = ttk.Scrollbar(self._tree_frame, orient="vertical",
command=self._tree.yview,
style="Dark.Vertical.TScrollbar")
self._tree.configure(yscrollcommand=scrollbar.set)
self._tree.pack(side="left", fill="both", expand=True)
scrollbar.pack(side="right", fill="y")
# Double-click to enter prefix (folder) or download file
self._tree.bind("<Double-1>", self._on_double_click)
# Dark treeview style
style = ttk.Style()
style.configure("Dark.Treeview",
background="#2b2b2b", foreground="#e5e5e5",
fieldbackground="#2b2b2b", borderwidth=0, rowheight=26)
style.configure("Dark.Treeview.Heading",
background="#333333", foreground="#e5e5e5",
borderwidth=0, relief="flat")
style.map("Dark.Treeview",
background=[("selected", "#1e3a5f")],
foreground=[("selected", "#ffffff")])
# ── Drop zone overlay (shown when no files / as hint) ──
self._drop_hint = ctk.CTkLabel(
self._tree_frame,
text=t("s3_drop_hint"),
font=ctk.CTkFont(size=14),
text_color="#6b7280",
)
# Setup OS drag-and-drop on the treeview area
self.after(200, self._init_dnd)
def _init_dnd(self):
"""Initialize drag-and-drop after widget is mapped."""
try:
self._dnd_active = _setup_drop(self._tree, self._on_files_dropped)
if not self._dnd_active:
# Try on the frame too
self._dnd_active = _setup_drop(self._tree_frame, self._on_files_dropped)
except Exception:
self._dnd_active = False
def _on_files_dropped(self, files):
"""Handle files dropped from OS file manager."""
if not self._client or not self._current_bucket:
return
# windnd gives list of bytes on Windows
paths = []
for f in files:
if isinstance(f, bytes):
paths.append(f.decode("utf-8", errors="replace"))
else:
paths.append(str(f))
paths = [p for p in paths if os.path.isfile(p)]
if not paths:
return
self._upload_files(paths)
def _show_progress(self, label: str, total_bytes: int):
"""Show and reset the progress bar."""
self._transfer_bytes = 0
self._transfer_total = max(total_bytes, 1)
self._progress_bar.set(0)
self._progress_pct.configure(text="0%")
self._progress_label.configure(text=label)
self._progress_frame.pack(fill="x", padx=15, pady=(2, 2),
before=self._tree_frame)
def _hide_progress(self):
"""Hide the progress bar."""
self._progress_frame.pack_forget()
def _on_progress(self, chunk_bytes: int):
"""Called from transfer thread — schedule GUI update."""
self._transfer_bytes += chunk_bytes
self.after(0, self._update_progress)
def _update_progress(self):
"""Update progress bar on GUI thread."""
if self._transfer_total <= 0:
return
ratio = min(self._transfer_bytes / self._transfer_total, 1.0)
self._progress_bar.set(ratio)
pct = int(ratio * 100)
self._progress_pct.configure(text=f"{pct}%")
size_str = f"{_human_size(self._transfer_bytes)} / {_human_size(self._transfer_total)}"
label = self._progress_label.cget("text").split("")[0]
self._progress_label.configure(text=f"{label}{size_str}")
def _on_transfer_status(self, message: str):
"""Called from transfer thread with retry/status info."""
# Reset progress on retry (boto3 restarts the transfer)
self._transfer_bytes = 0
self.after(0, lambda: self._status_label.configure(text=message))
def _upload_files(self, paths: list[str]):
"""Upload multiple files to current prefix."""
if not self._client or not self._current_bucket:
return
total_files = len(paths)
total_bytes = sum(os.path.getsize(p) for p in paths if os.path.isfile(p))
label = (t("s3_uploading_n").format(count=total_files) if total_files > 1
else t("s3_uploading"))
self._status_label.configure(text=label)
self._show_progress(label, total_bytes)
def _do():
ok_count = 0
for path in paths:
filename = os.path.basename(path)
key = self._current_prefix + filename
if self._client.upload_file(
path, self._current_bucket, key,
progress_cb=self._on_progress,
status_cb=self._on_transfer_status):
ok_count += 1
self.after(0, lambda: self._on_upload_done(ok_count, total_files))
threading.Thread(target=_do, daemon=True).start()
def _on_upload_done(self, ok_count: int, total: int):
self._hide_progress()
if ok_count == total:
self._status_label.configure(
text=t("s3_uploaded_n").format(count=ok_count))
else:
self._status_label.configure(
text=t("s3_upload_partial").format(ok=ok_count, total=total))
self._refresh()
# -- server switch ----------------------------------------------------
def set_server(self, alias: str | None):
"""Called when user selects a server in sidebar."""
if alias == self._current_alias:
return
self._current_alias = alias
if not alias:
self._client = None
self._tree.delete(*self._tree.get_children())
self._status_label.configure(text="")
return
self._client = None
self._current_prefix = ""
self._nav_stack.clear()
self._tree.delete(*self._tree.get_children())
self._status_label.configure(text=t("s3_connecting"))
server = self.store.get_server(alias)
if not server:
return
self._current_bucket = server.get("bucket", "")
def _connect():
client = S3Client(server)
ok = client.connect()
if ok:
self._client = client
self.after(0, self._load_buckets)
else:
self.after(0, lambda: self._status_label.configure(
text=t("s3_connect_failed")))
threading.Thread(target=_connect, daemon=True).start()
def _load_buckets(self):
if not self._client:
return
def _fetch():
buckets = self._client.list_buckets()
names = [b["Name"] for b in buckets]
self.after(0, lambda: self._update_buckets(names))
threading.Thread(target=_fetch, daemon=True).start()
def _update_buckets(self, names: list[str]):
if not names:
names = [""]
self._bucket_menu.configure(values=names)
if self._current_bucket and self._current_bucket in names:
self._bucket_var.set(self._current_bucket)
elif names:
self._bucket_var.set(names[0])
self._current_bucket = names[0]
self._refresh()
def _on_bucket_change(self, value: str):
self._current_bucket = value
self._current_prefix = ""
self._nav_stack.clear()
self._refresh()
# -- navigation -------------------------------------------------------
def _refresh(self):
if not self._client or not self._current_bucket:
return
self._status_label.configure(text=t("s3_loading"))
self._path_label.configure(text=f"/{self._current_prefix}" if self._current_prefix else "/")
def _fetch():
objects, prefixes = self._client.list_objects(
self._current_bucket, self._current_prefix)
self.after(0, lambda: self._display(objects, prefixes))
threading.Thread(target=_fetch, daemon=True).start()
def _display(self, objects: list[dict], prefixes: list[str]):
self._tree.delete(*self._tree.get_children())
# Folders first
for prefix in sorted(prefixes):
display_name = prefix[len(self._current_prefix):]
if display_name.endswith("/"):
display_name = display_name[:-1]
self._tree.insert("", "end", values=(
f"\U0001f4c1 {display_name}/", "", ""),
tags=("folder",),
)
# Files
for obj in sorted(objects, key=lambda o: o["Key"]):
name = obj["Key"][len(self._current_prefix):]
size = _human_size(obj.get("Size", 0))
modified = ""
if obj.get("LastModified"):
modified = obj["LastModified"].strftime("%Y-%m-%d %H:%M:%S")
self._tree.insert("", "end", values=(name, size, modified),
tags=("file",))
count = len(objects) + len(prefixes)
self._status_label.configure(text=t("s3_items_count").format(count=count))
self._back_btn.configure(
state="normal" if self._current_prefix else "disabled")
# Show drop hint if empty
if count == 0:
self._drop_hint.place(relx=0.5, rely=0.5, anchor="center")
else:
self._drop_hint.place_forget()
def _on_double_click(self, event):
sel = self._tree.selection()
if not sel:
return
item = self._tree.item(sel[0])
name = item["values"][0] if item["values"] else ""
if not isinstance(name, str):
name = str(name)
# Check if it's a folder
if name.endswith("/"):
# Strip folder icon
clean = name.replace("\U0001f4c1 ", "").strip()
self._nav_stack.append(self._current_prefix)
self._current_prefix = self._current_prefix + clean
if not self._current_prefix.endswith("/"):
self._current_prefix += "/"
self._refresh()
else:
# Double-click file = download
self._download()
def _go_back(self):
if self._nav_stack:
self._current_prefix = self._nav_stack.pop()
else:
self._current_prefix = ""
self._refresh()
# -- actions ----------------------------------------------------------
def _upload(self):
if not self._client or not self._current_bucket:
return
paths = filedialog.askopenfilenames()
if not paths:
return
self._upload_files(list(paths))
def _download(self):
sel = self._tree.selection()
if not sel:
return
item = self._tree.item(sel[0])
name = item["values"][0] if item["values"] else ""
if not isinstance(name, str):
name = str(name)
if name.endswith("/"):
return # Can't download a folder
key = self._current_prefix + name
save_path = filedialog.asksaveasfilename(initialfile=name)
if not save_path:
return
# Get file size for progress
total_bytes = self._client.get_object_size(self._current_bucket, key)
label = t("s3_downloading")
self._status_label.configure(text=label)
self._show_progress(label, total_bytes)
def _do():
ok = self._client.download_file(
self._current_bucket, key, save_path,
progress_cb=self._on_progress,
status_cb=self._on_transfer_status)
if ok:
self.after(0, lambda: self._on_download_done(True))
else:
self.after(0, lambda: self._on_download_done(False))
threading.Thread(target=_do, daemon=True).start()
def _on_download_done(self, success: bool):
self._hide_progress()
if success:
self._status_label.configure(text=t("s3_download_ok"))
else:
self._status_label.configure(text=t("s3_download_failed"))
def _delete(self):
sel = self._tree.selection()
if not sel:
return
item = self._tree.item(sel[0])
name = item["values"][0] if item["values"] else ""
if not isinstance(name, str):
name = str(name)
if name.endswith("/"):
return # Don't delete prefixes
key = self._current_prefix + name
def _do():
ok = self._client.delete_object(self._current_bucket, key)
if ok:
self.after(0, self._refresh)
else:
self.after(0, lambda: self._status_label.configure(
text=t("s3_delete_failed")))
self._status_label.configure(text=t("s3_deleting"))
threading.Thread(target=_do, daemon=True).start()