782 lines
29 KiB
Python
782 lines
29 KiB
Python
"""
|
|
S3 tab — bucket/object browser with upload, download, delete actions.
|
|
Supports drag-and-drop from OS file manager for upload.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
import threading
|
|
import tkinter as tk
|
|
from tkinter import ttk, filedialog
|
|
|
|
import customtkinter as ctk
|
|
from core.s3_client import S3Client
|
|
from core.i18n import t
|
|
from core.icons import icon_text, make_icon_button
|
|
from gui.tabs.query_tab import apply_dark_scrollbar_style
|
|
|
|
|
|
def _human_size(size_bytes: int) -> str:
|
|
"""Format bytes to human-readable string."""
|
|
if size_bytes < 1024:
|
|
return f"{size_bytes} B"
|
|
for unit in ("KB", "MB", "GB", "TB"):
|
|
size_bytes /= 1024
|
|
if size_bytes < 1024:
|
|
return f"{size_bytes:.1f} {unit}"
|
|
return f"{size_bytes:.1f} PB"
|
|
|
|
|
|
def _setup_drop(widget, callback):
|
|
"""Setup OS drag-and-drop onto a widget. Cross-platform with graceful fallback."""
|
|
if sys.platform == "win32":
|
|
try:
|
|
import windnd
|
|
windnd.hook_dropfiles(widget, func=callback)
|
|
return True
|
|
except Exception:
|
|
pass
|
|
# tkinterdnd2 fallback (works on Linux/macOS if installed)
|
|
try:
|
|
widget.drop_target_register("DND_Files")
|
|
widget.dnd_bind("<<Drop>>", lambda e: callback(_parse_dnd_paths(e.data)))
|
|
return True
|
|
except Exception:
|
|
pass
|
|
return False
|
|
|
|
|
|
def _parse_dnd_paths(data: str) -> list[str]:
|
|
"""Parse tkinterdnd2 drop data into list of file paths."""
|
|
paths = []
|
|
# tkinterdnd2 wraps paths with spaces in braces: {C:/path with spaces/file.txt}
|
|
i = 0
|
|
while i < len(data):
|
|
if data[i] == '{':
|
|
end = data.index('}', i)
|
|
paths.append(data[i + 1:end])
|
|
i = end + 2 # skip } and space
|
|
elif data[i] == ' ':
|
|
i += 1
|
|
else:
|
|
end = data.find(' ', i)
|
|
if end == -1:
|
|
end = len(data)
|
|
paths.append(data[i:end])
|
|
i = end + 1
|
|
return paths
|
|
|
|
|
|
class S3Tab(ctk.CTkFrame):
|
|
def __init__(self, master, store):
|
|
super().__init__(master, fg_color="transparent")
|
|
self.store = store
|
|
self._current_alias: str | None = None
|
|
self._client: S3Client | None = None
|
|
self._current_bucket: str = ""
|
|
self._current_prefix: str = ""
|
|
self._nav_stack: list[str] = []
|
|
self._dnd_active = False
|
|
|
|
self._build_ui()
|
|
|
|
def _build_ui(self):
|
|
apply_dark_scrollbar_style()
|
|
|
|
# ── Header ──
|
|
header = ctk.CTkFrame(self, fg_color="transparent")
|
|
header.pack(fill="x", padx=15, pady=(15, 5))
|
|
|
|
self._title_label = ctk.CTkLabel(
|
|
header, text=t("s3_objects"),
|
|
font=ctk.CTkFont(size=18, weight="bold"),
|
|
)
|
|
self._title_label.pack(side="left")
|
|
|
|
self._status_label = ctk.CTkLabel(
|
|
header, text="", font=ctk.CTkFont(size=12),
|
|
text_color="#9ca3af",
|
|
)
|
|
self._status_label.pack(side="left", padx=(15, 0))
|
|
|
|
# Buttons
|
|
btn_frame = ctk.CTkFrame(header, fg_color="transparent")
|
|
btn_frame.pack(side="right")
|
|
|
|
self._back_btn = make_icon_button(
|
|
btn_frame, "back", t("s3_back"), width=80,
|
|
command=self._go_back, state="disabled",
|
|
)
|
|
self._back_btn.pack(side="left", padx=(0, 5))
|
|
|
|
self._refresh_btn = make_icon_button(
|
|
btn_frame, "refresh", t("s3_refresh"), width=100,
|
|
command=self._refresh,
|
|
)
|
|
self._refresh_btn.pack(side="left", padx=(0, 5))
|
|
|
|
self._mkdir_btn = make_icon_button(
|
|
btn_frame, "add", t("s3_new_folder"), width=120,
|
|
command=self._create_folder,
|
|
)
|
|
self._mkdir_btn.pack(side="left", padx=(0, 5))
|
|
|
|
self._upload_btn = make_icon_button(
|
|
btn_frame, "upload", t("s3_upload"), width=100,
|
|
command=self._upload,
|
|
)
|
|
self._upload_btn.pack(side="left", padx=(0, 5))
|
|
|
|
self._download_btn = make_icon_button(
|
|
btn_frame, "download", t("s3_download"), width=110,
|
|
command=self._download,
|
|
)
|
|
self._download_btn.pack(side="left", padx=(0, 5))
|
|
|
|
self._delete_btn = make_icon_button(
|
|
btn_frame, "delete", t("s3_delete"), width=100,
|
|
fg_color="#dc2626", hover_color="#b91c1c",
|
|
command=self._delete,
|
|
)
|
|
self._delete_btn.pack(side="left")
|
|
|
|
# ── Bucket selector row ──
|
|
bucket_frame = ctk.CTkFrame(self, fg_color="transparent")
|
|
bucket_frame.pack(fill="x", padx=15, pady=(5, 5))
|
|
|
|
ctk.CTkLabel(bucket_frame, text=t("s3_bucket"),
|
|
font=ctk.CTkFont(size=12, weight="bold")).pack(side="left", padx=(0, 5))
|
|
|
|
self._bucket_var = ctk.StringVar(value="")
|
|
self._bucket_menu = ctk.CTkOptionMenu(
|
|
bucket_frame, variable=self._bucket_var, values=[""],
|
|
width=200, command=self._on_bucket_change,
|
|
)
|
|
self._bucket_menu.pack(side="left", padx=(0, 15))
|
|
|
|
# Path display
|
|
self._path_label = ctk.CTkLabel(
|
|
bucket_frame, text="/", font=ctk.CTkFont(family="Consolas", size=12),
|
|
text_color="#60a5fa",
|
|
)
|
|
self._path_label.pack(side="left", fill="x", expand=True)
|
|
|
|
# ── Progress bar (hidden by default) ──
|
|
self._progress_frame = ctk.CTkFrame(self, fg_color="transparent")
|
|
# Don't pack yet — shown only during transfer
|
|
|
|
self._progress_label = ctk.CTkLabel(
|
|
self._progress_frame, text="", font=ctk.CTkFont(size=11),
|
|
text_color="#9ca3af",
|
|
)
|
|
self._progress_label.pack(side="left", padx=(0, 10))
|
|
|
|
self._progress_bar = ctk.CTkProgressBar(self._progress_frame, width=300, height=14)
|
|
self._progress_bar.set(0)
|
|
self._progress_bar.pack(side="left", fill="x", expand=True)
|
|
|
|
self._progress_pct = ctk.CTkLabel(
|
|
self._progress_frame, text="0%", font=ctk.CTkFont(size=11, weight="bold"),
|
|
text_color="#60a5fa", width=45,
|
|
)
|
|
self._progress_pct.pack(side="left", padx=(10, 0))
|
|
|
|
self._transfer_bytes = 0
|
|
self._transfer_total = 0
|
|
|
|
# ── Treeview for objects ──
|
|
self._tree_frame = ctk.CTkFrame(self, fg_color="#1e1e1e", corner_radius=8)
|
|
self._tree_frame.pack(fill="both", expand=True, padx=15, pady=(5, 15))
|
|
|
|
columns = ("name", "size", "modified")
|
|
self._tree = ttk.Treeview(
|
|
self._tree_frame, columns=columns, show="headings",
|
|
selectmode="browse", style="Dark.Treeview",
|
|
)
|
|
self._tree.heading("name", text=t("s3_col_name"))
|
|
self._tree.heading("size", text=t("s3_col_size"))
|
|
self._tree.heading("modified", text=t("s3_col_modified"))
|
|
self._tree.column("name", width=400, minwidth=200)
|
|
self._tree.column("size", width=100, minwidth=80, anchor="e")
|
|
self._tree.column("modified", width=180, minwidth=120)
|
|
|
|
scrollbar = ttk.Scrollbar(self._tree_frame, orient="vertical",
|
|
command=self._tree.yview,
|
|
style="Dark.Vertical.TScrollbar")
|
|
self._tree.configure(yscrollcommand=scrollbar.set)
|
|
|
|
self._tree.pack(side="left", fill="both", expand=True)
|
|
scrollbar.pack(side="right", fill="y")
|
|
|
|
# Double-click to enter prefix (folder) or download file
|
|
self._tree.bind("<Double-1>", self._on_double_click)
|
|
# Backspace / Alt+Left to go back
|
|
self._tree.bind("<BackSpace>", lambda e: self._go_back())
|
|
self._tree.bind("<Alt-Left>", lambda e: self._go_back())
|
|
|
|
# Right-click context menu
|
|
self._ctx_menu = tk.Menu(self._tree, tearoff=0)
|
|
self._ctx_menu.add_command(
|
|
label=icon_text("copy", t("s3_copy_link_48h")),
|
|
command=self._copy_link_48h,
|
|
)
|
|
self._ctx_menu.add_command(
|
|
label=icon_text("copy", t("s3_copy_link_permanent")),
|
|
command=self._copy_link_permanent,
|
|
)
|
|
self._ctx_menu.add_separator()
|
|
self._ctx_menu.add_command(
|
|
label=icon_text("download", t("s3_download")),
|
|
command=self._download,
|
|
)
|
|
self._ctx_menu.add_separator()
|
|
self._ctx_menu.add_command(
|
|
label=icon_text("add", t("s3_new_folder")),
|
|
command=self._create_folder,
|
|
)
|
|
self._ctx_menu.add_separator()
|
|
self._ctx_menu.add_command(
|
|
label=icon_text("delete", t("s3_delete")),
|
|
command=self._delete,
|
|
)
|
|
self._tree.bind("<Button-3>", self._on_right_click)
|
|
|
|
# Empty area context menu (back + new folder)
|
|
self._bg_menu = tk.Menu(self._tree, tearoff=0)
|
|
self._bg_menu.add_command(
|
|
label=icon_text("back", t("s3_back")),
|
|
command=self._go_back,
|
|
)
|
|
self._bg_menu.add_command(
|
|
label=icon_text("add", t("s3_new_folder")),
|
|
command=self._create_folder,
|
|
)
|
|
|
|
# Dark treeview style
|
|
style = ttk.Style()
|
|
style.configure("Dark.Treeview",
|
|
background="#2b2b2b", foreground="#e5e5e5",
|
|
fieldbackground="#2b2b2b", borderwidth=0, rowheight=26)
|
|
style.configure("Dark.Treeview.Heading",
|
|
background="#333333", foreground="#e5e5e5",
|
|
borderwidth=0, relief="flat")
|
|
style.map("Dark.Treeview",
|
|
background=[("selected", "#1e3a5f")],
|
|
foreground=[("selected", "#ffffff")])
|
|
|
|
# ── Drop zone overlay (shown when no files / as hint) ──
|
|
self._drop_hint = ctk.CTkLabel(
|
|
self._tree_frame,
|
|
text=t("s3_drop_hint"),
|
|
font=ctk.CTkFont(size=14),
|
|
text_color="#6b7280",
|
|
)
|
|
|
|
# Setup OS drag-and-drop on the treeview area
|
|
self.after(200, self._init_dnd)
|
|
|
|
def _init_dnd(self):
|
|
"""Initialize drag-and-drop after widget is mapped."""
|
|
try:
|
|
self._dnd_active = _setup_drop(self._tree, self._on_files_dropped)
|
|
if not self._dnd_active:
|
|
# Try on the frame too
|
|
self._dnd_active = _setup_drop(self._tree_frame, self._on_files_dropped)
|
|
except Exception:
|
|
self._dnd_active = False
|
|
|
|
def _on_files_dropped(self, files):
|
|
"""Handle files/folders dropped from OS file manager."""
|
|
if not self._client or not self._current_bucket:
|
|
return
|
|
# windnd gives list of bytes on Windows
|
|
raw_paths = []
|
|
for f in files:
|
|
if isinstance(f, bytes):
|
|
raw_paths.append(f.decode("utf-8", errors="replace"))
|
|
else:
|
|
raw_paths.append(str(f))
|
|
|
|
# Collect (local_path, s3_key_suffix) pairs
|
|
upload_pairs: list[tuple[str, str]] = []
|
|
for p in raw_paths:
|
|
if os.path.isfile(p):
|
|
upload_pairs.append((p, os.path.basename(p)))
|
|
elif os.path.isdir(p):
|
|
base = os.path.basename(p.rstrip("/\\"))
|
|
for root, _dirs, fnames in os.walk(p):
|
|
for fn in fnames:
|
|
full = os.path.join(root, fn)
|
|
rel = os.path.relpath(full, os.path.dirname(p))
|
|
rel = rel.replace("\\", "/")
|
|
upload_pairs.append((full, rel))
|
|
|
|
if not upload_pairs:
|
|
return
|
|
self._upload_pairs(upload_pairs)
|
|
|
|
def _show_progress(self, label: str, total_bytes: int):
|
|
"""Show and reset the progress bar."""
|
|
self._transfer_bytes = 0
|
|
self._transfer_total = max(total_bytes, 1)
|
|
self._progress_bar.set(0)
|
|
self._progress_pct.configure(text="0%")
|
|
self._progress_label.configure(text=label)
|
|
self._progress_frame.pack(fill="x", padx=15, pady=(2, 2),
|
|
before=self._tree_frame)
|
|
|
|
def _hide_progress(self):
|
|
"""Hide the progress bar."""
|
|
self._progress_frame.pack_forget()
|
|
|
|
def _on_progress(self, chunk_bytes: int):
|
|
"""Called from transfer thread — schedule GUI update."""
|
|
self._transfer_bytes += chunk_bytes
|
|
self.after(0, self._update_progress)
|
|
|
|
def _update_progress(self):
|
|
"""Update progress bar on GUI thread."""
|
|
if self._transfer_total <= 0:
|
|
return
|
|
ratio = min(self._transfer_bytes / self._transfer_total, 1.0)
|
|
self._progress_bar.set(ratio)
|
|
pct = int(ratio * 100)
|
|
self._progress_pct.configure(text=f"{pct}%")
|
|
size_str = f"{_human_size(self._transfer_bytes)} / {_human_size(self._transfer_total)}"
|
|
label = self._progress_label.cget("text").split(" — ")[0]
|
|
self._progress_label.configure(text=f"{label} — {size_str}")
|
|
|
|
def _on_transfer_status(self, message: str):
|
|
"""Called from transfer thread with retry/status info."""
|
|
# Note: do NOT reset _transfer_bytes here — resumable download
|
|
# reports already-downloaded bytes via progress_cb, so resetting
|
|
# would break the progress bar on resume.
|
|
self.after(0, lambda: self._status_label.configure(text=message))
|
|
|
|
def _upload_files(self, paths: list[str]):
|
|
"""Upload multiple files to current prefix (flat — no subdirs)."""
|
|
pairs = [(p, os.path.basename(p)) for p in paths if os.path.isfile(p)]
|
|
if pairs:
|
|
self._upload_pairs(pairs)
|
|
|
|
def _upload_pairs(self, pairs: list[tuple[str, str]]):
|
|
"""Upload (local_path, relative_key) pairs to current prefix."""
|
|
if not self._client or not self._current_bucket:
|
|
return
|
|
total_files = len(pairs)
|
|
total_bytes = sum(os.path.getsize(p) for p, _ in pairs if os.path.isfile(p))
|
|
label = (t("s3_uploading_n").format(count=total_files) if total_files > 1
|
|
else t("s3_uploading"))
|
|
self._status_label.configure(text=label)
|
|
self._show_progress(label, total_bytes)
|
|
|
|
def _do():
|
|
ok_count = 0
|
|
for local_path, rel_key in pairs:
|
|
key = self._current_prefix + rel_key
|
|
if self._client.upload_file(
|
|
local_path, self._current_bucket, key,
|
|
progress_cb=self._on_progress,
|
|
status_cb=self._on_transfer_status):
|
|
ok_count += 1
|
|
self.after(0, lambda: self._on_upload_done(ok_count, total_files))
|
|
|
|
threading.Thread(target=_do, daemon=True).start()
|
|
|
|
def _on_upload_done(self, ok_count: int, total: int):
|
|
self._hide_progress()
|
|
if ok_count == total:
|
|
self._status_label.configure(
|
|
text=t("s3_uploaded_n").format(count=ok_count))
|
|
else:
|
|
self._status_label.configure(
|
|
text=t("s3_upload_partial").format(ok=ok_count, total=total))
|
|
self._refresh()
|
|
|
|
# -- server switch ----------------------------------------------------
|
|
|
|
def set_server(self, alias: str | None):
|
|
"""Called when user selects a server in sidebar."""
|
|
if alias == self._current_alias:
|
|
return
|
|
self._current_alias = alias
|
|
if not alias:
|
|
self._client = None
|
|
self._tree.delete(*self._tree.get_children())
|
|
self._status_label.configure(text="")
|
|
return
|
|
self._client = None
|
|
self._current_prefix = ""
|
|
self._nav_stack.clear()
|
|
self._tree.delete(*self._tree.get_children())
|
|
self._status_label.configure(text=t("s3_connecting"))
|
|
|
|
server = self.store.get_server(alias)
|
|
if not server:
|
|
return
|
|
|
|
self._current_bucket = server.get("bucket", "")
|
|
|
|
def _connect():
|
|
client = S3Client(server)
|
|
ok = client.connect()
|
|
if ok:
|
|
self._client = client
|
|
self.after(0, self._load_buckets)
|
|
else:
|
|
self.after(0, lambda: self._status_label.configure(
|
|
text=t("s3_connect_failed")))
|
|
|
|
threading.Thread(target=_connect, daemon=True).start()
|
|
|
|
def _load_buckets(self):
|
|
if not self._client:
|
|
return
|
|
|
|
def _fetch():
|
|
buckets = self._client.list_buckets()
|
|
names = [b["Name"] for b in buckets]
|
|
self.after(0, lambda: self._update_buckets(names))
|
|
|
|
threading.Thread(target=_fetch, daemon=True).start()
|
|
|
|
def _update_buckets(self, names: list[str]):
|
|
if not names:
|
|
names = [""]
|
|
self._bucket_menu.configure(values=names)
|
|
if self._current_bucket and self._current_bucket in names:
|
|
self._bucket_var.set(self._current_bucket)
|
|
elif names:
|
|
self._bucket_var.set(names[0])
|
|
self._current_bucket = names[0]
|
|
self._refresh()
|
|
|
|
def _on_bucket_change(self, value: str):
|
|
self._current_bucket = value
|
|
self._current_prefix = ""
|
|
self._nav_stack.clear()
|
|
self._refresh()
|
|
|
|
# -- navigation -------------------------------------------------------
|
|
|
|
def _refresh(self):
|
|
if not self._client or not self._current_bucket:
|
|
return
|
|
self._status_label.configure(text=t("s3_loading"))
|
|
self._path_label.configure(text=f"/{self._current_prefix}" if self._current_prefix else "/")
|
|
|
|
def _fetch():
|
|
objects, prefixes = self._client.list_objects(
|
|
self._current_bucket, self._current_prefix)
|
|
self.after(0, lambda: self._display(objects, prefixes))
|
|
|
|
threading.Thread(target=_fetch, daemon=True).start()
|
|
|
|
def _display(self, objects: list[dict], prefixes: list[str]):
|
|
self._tree.delete(*self._tree.get_children())
|
|
|
|
# Folders first
|
|
for prefix in sorted(prefixes):
|
|
display_name = prefix[len(self._current_prefix):]
|
|
if display_name.endswith("/"):
|
|
display_name = display_name[:-1]
|
|
self._tree.insert("", "end", values=(
|
|
f"\U0001f4c1 {display_name}/", "", ""),
|
|
tags=("folder",),
|
|
)
|
|
|
|
# Files
|
|
for obj in sorted(objects, key=lambda o: o["Key"]):
|
|
name = obj["Key"][len(self._current_prefix):]
|
|
size = _human_size(obj.get("Size", 0))
|
|
modified = ""
|
|
if obj.get("LastModified"):
|
|
modified = obj["LastModified"].strftime("%Y-%m-%d %H:%M:%S")
|
|
self._tree.insert("", "end", values=(name, size, modified),
|
|
tags=("file",))
|
|
|
|
count = len(objects) + len(prefixes)
|
|
self._status_label.configure(text=t("s3_items_count").format(count=count))
|
|
self._back_btn.configure(
|
|
state="normal" if self._current_prefix else "disabled")
|
|
|
|
# Show drop hint if empty
|
|
if count == 0:
|
|
self._drop_hint.place(relx=0.5, rely=0.5, anchor="center")
|
|
else:
|
|
self._drop_hint.place_forget()
|
|
|
|
def _on_double_click(self, event):
|
|
sel = self._tree.selection()
|
|
if not sel:
|
|
return
|
|
item = self._tree.item(sel[0])
|
|
name = item["values"][0] if item["values"] else ""
|
|
if not isinstance(name, str):
|
|
name = str(name)
|
|
# Check if it's a folder
|
|
if name.endswith("/"):
|
|
# Strip folder icon
|
|
clean = name.replace("\U0001f4c1 ", "").strip()
|
|
self._nav_stack.append(self._current_prefix)
|
|
self._current_prefix = self._current_prefix + clean
|
|
if not self._current_prefix.endswith("/"):
|
|
self._current_prefix += "/"
|
|
self._refresh()
|
|
else:
|
|
# Double-click file = download
|
|
self._download()
|
|
|
|
def _on_right_click(self, event):
|
|
"""Show context menu on right-click."""
|
|
item_id = self._tree.identify_row(event.y)
|
|
if not item_id:
|
|
# Clicked on empty area — show background menu
|
|
self._bg_menu.entryconfigure(0,
|
|
state="normal" if self._current_prefix else "disabled")
|
|
self._bg_menu.tk_popup(event.x_root, event.y_root)
|
|
return
|
|
self._tree.selection_set(item_id)
|
|
# Check if it's a file (not a folder)
|
|
item = self._tree.item(item_id)
|
|
name = item["values"][0] if item["values"] else ""
|
|
if not isinstance(name, str):
|
|
name = str(name)
|
|
is_folder = name.endswith("/")
|
|
# Enable/disable menu items based on type
|
|
file_state = "normal" if not is_folder else "disabled"
|
|
self._ctx_menu.entryconfigure(0, state=file_state) # Link 48h
|
|
self._ctx_menu.entryconfigure(1, state=file_state) # Link permanent
|
|
self._ctx_menu.entryconfigure(3, state="normal") # Download (files + folders)
|
|
# 5 = New Folder — always enabled
|
|
# 7 = Delete — enabled for both files and folders
|
|
self._ctx_menu.tk_popup(event.x_root, event.y_root)
|
|
|
|
def _get_selected_key(self) -> str | None:
|
|
"""Return the full S3 key for the selected file, or None."""
|
|
sel = self._tree.selection()
|
|
if not sel or not self._client or not self._current_bucket:
|
|
return None
|
|
item = self._tree.item(sel[0])
|
|
name = item["values"][0] if item["values"] else ""
|
|
if not isinstance(name, str):
|
|
name = str(name)
|
|
if name.endswith("/"):
|
|
return None
|
|
return self._current_prefix + name
|
|
|
|
def _copy_link_48h(self):
|
|
"""Generate presigned URL (48h) and copy to clipboard."""
|
|
key = self._get_selected_key()
|
|
if not key:
|
|
return
|
|
self._status_label.configure(text=t("s3_generating_link"))
|
|
|
|
def _do():
|
|
url = self._client.generate_presigned_url(
|
|
self._current_bucket, key, expires_in=48 * 3600)
|
|
self.after(0, lambda: self._on_link_ready(url, "48h"))
|
|
|
|
threading.Thread(target=_do, daemon=True).start()
|
|
|
|
def _copy_link_permanent(self):
|
|
"""Build direct (permanent) URL and copy to clipboard."""
|
|
key = self._get_selected_key()
|
|
if not key:
|
|
return
|
|
url = self._client.get_direct_url(self._current_bucket, key)
|
|
if url:
|
|
self.clipboard_clear()
|
|
self.clipboard_append(url)
|
|
self._status_label.configure(text=t("s3_link_copied"))
|
|
else:
|
|
self._status_label.configure(text=t("s3_link_failed"))
|
|
|
|
def _on_link_ready(self, url: str | None, kind: str = ""):
|
|
if url:
|
|
self.clipboard_clear()
|
|
self.clipboard_append(url)
|
|
self._status_label.configure(text=t("s3_link_copied"))
|
|
else:
|
|
self._status_label.configure(text=t("s3_link_failed"))
|
|
|
|
def _create_folder(self):
|
|
"""Prompt for folder name and create it as an empty prefix in S3."""
|
|
if not self._client or not self._current_bucket:
|
|
return
|
|
dialog = ctk.CTkInputDialog(
|
|
text=t("s3_folder_name_prompt"),
|
|
title=t("s3_new_folder"),
|
|
)
|
|
name = dialog.get_input()
|
|
if not name or not name.strip():
|
|
return
|
|
name = name.strip().strip("/")
|
|
key = self._current_prefix + name + "/"
|
|
self._status_label.configure(text=t("s3_creating_folder"))
|
|
|
|
def _do():
|
|
ok = self._client.create_folder(self._current_bucket, key)
|
|
if ok:
|
|
self.after(0, self._refresh)
|
|
else:
|
|
self.after(0, lambda: self._status_label.configure(
|
|
text=t("s3_folder_failed")))
|
|
|
|
threading.Thread(target=_do, daemon=True).start()
|
|
|
|
def _go_back(self):
|
|
if self._nav_stack:
|
|
self._current_prefix = self._nav_stack.pop()
|
|
else:
|
|
self._current_prefix = ""
|
|
self._refresh()
|
|
|
|
# -- actions ----------------------------------------------------------
|
|
|
|
def _upload(self):
|
|
if not self._client or not self._current_bucket:
|
|
return
|
|
paths = filedialog.askopenfilenames()
|
|
if not paths:
|
|
return
|
|
self._upload_files(list(paths))
|
|
|
|
def _download(self):
|
|
sel = self._tree.selection()
|
|
if not sel or not self._client or not self._current_bucket:
|
|
return
|
|
item = self._tree.item(sel[0])
|
|
name = item["values"][0] if item["values"] else ""
|
|
if not isinstance(name, str):
|
|
name = str(name)
|
|
|
|
if name.endswith("/"):
|
|
self._download_folder(name)
|
|
else:
|
|
self._download_file(name)
|
|
|
|
def _download_file(self, name: str):
|
|
"""Download a single file."""
|
|
key = self._current_prefix + name
|
|
save_path = filedialog.asksaveasfilename(initialfile=name)
|
|
if not save_path:
|
|
return
|
|
|
|
total_bytes = self._client.get_object_size(self._current_bucket, key)
|
|
label = t("s3_downloading")
|
|
self._status_label.configure(text=label)
|
|
self._show_progress(label, total_bytes)
|
|
|
|
def _do():
|
|
ok = self._client.download_file(
|
|
self._current_bucket, key, save_path,
|
|
progress_cb=self._on_progress,
|
|
status_cb=self._on_transfer_status)
|
|
self.after(0, lambda: self._on_download_done(ok))
|
|
|
|
threading.Thread(target=_do, daemon=True).start()
|
|
|
|
def _download_folder(self, display_name: str):
|
|
"""Download all objects under a prefix, preserving directory structure."""
|
|
clean = display_name.replace("\U0001f4c1 ", "").strip()
|
|
prefix = self._current_prefix + clean
|
|
|
|
# Ask user to pick a local directory
|
|
dest_dir = filedialog.askdirectory(title=t("s3_download_folder_title"))
|
|
if not dest_dir:
|
|
return
|
|
|
|
self._status_label.configure(text=t("s3_downloading"))
|
|
|
|
def _do():
|
|
# List all objects recursively under this prefix
|
|
objects = self._client.list_all_objects(self._current_bucket, prefix)
|
|
if not objects:
|
|
self.after(0, lambda: self._status_label.configure(
|
|
text=t("s3_download_failed")))
|
|
return
|
|
|
|
total_bytes = sum(o.get("Size", 0) for o in objects)
|
|
self.after(0, lambda: self._show_progress(
|
|
t("s3_downloading_n").format(count=len(objects)), total_bytes))
|
|
|
|
ok_count = 0
|
|
for obj in objects:
|
|
obj_key = obj["Key"]
|
|
# Relative path from the selected folder
|
|
rel = obj_key[len(self._current_prefix):]
|
|
local_path = os.path.join(dest_dir, rel.replace("/", os.sep))
|
|
os.makedirs(os.path.dirname(local_path), exist_ok=True)
|
|
if self._client.download_file(
|
|
self._current_bucket, obj_key, local_path,
|
|
progress_cb=self._on_progress,
|
|
status_cb=self._on_transfer_status):
|
|
ok_count += 1
|
|
|
|
total = len(objects)
|
|
self.after(0, lambda: self._on_folder_download_done(ok_count, total))
|
|
|
|
threading.Thread(target=_do, daemon=True).start()
|
|
|
|
def _on_folder_download_done(self, ok_count: int, total: int):
|
|
self._hide_progress()
|
|
if ok_count == total:
|
|
self._status_label.configure(
|
|
text=t("s3_downloaded_n").format(count=ok_count))
|
|
else:
|
|
self._status_label.configure(
|
|
text=t("s3_download_partial").format(ok=ok_count, total=total))
|
|
|
|
def _on_download_done(self, success: bool):
|
|
self._hide_progress()
|
|
if success:
|
|
self._status_label.configure(text=t("s3_download_ok"))
|
|
else:
|
|
self._status_label.configure(text=t("s3_download_failed"))
|
|
|
|
def _delete(self):
|
|
sel = self._tree.selection()
|
|
if not sel or not self._client or not self._current_bucket:
|
|
return
|
|
item = self._tree.item(sel[0])
|
|
name = item["values"][0] if item["values"] else ""
|
|
if not isinstance(name, str):
|
|
name = str(name)
|
|
|
|
if name.endswith("/"):
|
|
# Folder — ask confirmation
|
|
clean = name.replace("\U0001f4c1 ", "").strip()
|
|
prefix = self._current_prefix + clean
|
|
from tkinter import messagebox
|
|
ok = messagebox.askyesno(
|
|
t("s3_delete"),
|
|
t("s3_delete_folder_confirm").format(folder=clean),
|
|
)
|
|
if not ok:
|
|
return
|
|
self._status_label.configure(text=t("s3_deleting"))
|
|
|
|
def _do_folder():
|
|
count = self._client.delete_prefix(self._current_bucket, prefix)
|
|
self.after(0, lambda: self._status_label.configure(
|
|
text=t("s3_deleted_n").format(count=count)))
|
|
self.after(0, self._refresh)
|
|
|
|
threading.Thread(target=_do_folder, daemon=True).start()
|
|
else:
|
|
# Single file
|
|
key = self._current_prefix + name
|
|
|
|
def _do():
|
|
ok = self._client.delete_object(self._current_bucket, key)
|
|
if ok:
|
|
self.after(0, self._refresh)
|
|
else:
|
|
self.after(0, lambda: self._status_label.configure(
|
|
text=t("s3_delete_failed")))
|
|
|
|
self._status_label.configure(text=t("s3_deleting"))
|
|
threading.Thread(target=_do, daemon=True).start()
|