""" S3 tab — bucket/object browser with upload, download, delete actions. Supports drag-and-drop from OS file manager for upload. """ import os import sys import tempfile import threading import tkinter as tk from tkinter import ttk, filedialog import customtkinter as ctk from core.s3_client import S3Client from core.i18n import t from core.icons import icon_text, make_icon_button from gui.tabs.query_tab import apply_dark_scrollbar_style def _human_size(size_bytes: int) -> str: """Format bytes to human-readable string.""" if size_bytes < 1024: return f"{size_bytes} B" for unit in ("KB", "MB", "GB", "TB"): size_bytes /= 1024 if size_bytes < 1024: return f"{size_bytes:.1f} {unit}" return f"{size_bytes:.1f} PB" def _setup_drop(widget, callback): """Setup OS drag-and-drop onto a widget. Cross-platform with graceful fallback.""" if sys.platform == "win32": try: import windnd windnd.hook_dropfiles(widget, func=callback) return True except Exception: pass # tkinterdnd2 fallback (works on Linux/macOS if installed) try: widget.drop_target_register("DND_Files") widget.dnd_bind("<>", lambda e: callback(_parse_dnd_paths(e.data))) return True except Exception: pass return False def _parse_dnd_paths(data: str) -> list[str]: """Parse tkinterdnd2 drop data into list of file paths.""" paths = [] # tkinterdnd2 wraps paths with spaces in braces: {C:/path with spaces/file.txt} i = 0 while i < len(data): if data[i] == '{': end = data.index('}', i) paths.append(data[i + 1:end]) i = end + 2 # skip } and space elif data[i] == ' ': i += 1 else: end = data.find(' ', i) if end == -1: end = len(data) paths.append(data[i:end]) i = end + 1 return paths class S3Tab(ctk.CTkFrame): def __init__(self, master, store): super().__init__(master, fg_color="transparent") self.store = store self._current_alias: str | None = None self._client: S3Client | None = None self._current_bucket: str = "" self._current_prefix: str = "" self._nav_stack: list[str] = [] self._dnd_active = False self._build_ui() def _build_ui(self): apply_dark_scrollbar_style() # ── Header ── header = ctk.CTkFrame(self, fg_color="transparent") header.pack(fill="x", padx=15, pady=(15, 5)) self._title_label = ctk.CTkLabel( header, text=t("s3_objects"), font=ctk.CTkFont(size=18, weight="bold"), ) self._title_label.pack(side="left") self._status_label = ctk.CTkLabel( header, text="", font=ctk.CTkFont(size=12), text_color="#9ca3af", ) self._status_label.pack(side="left", padx=(15, 0)) # Buttons btn_frame = ctk.CTkFrame(header, fg_color="transparent") btn_frame.pack(side="right") self._back_btn = make_icon_button( btn_frame, "back", t("s3_back"), width=80, command=self._go_back, state="disabled", ) self._back_btn.pack(side="left", padx=(0, 5)) self._refresh_btn = make_icon_button( btn_frame, "refresh", t("s3_refresh"), width=100, command=self._refresh, ) self._refresh_btn.pack(side="left", padx=(0, 5)) self._mkdir_btn = make_icon_button( btn_frame, "add", t("s3_new_folder"), width=120, command=self._create_folder, ) self._mkdir_btn.pack(side="left", padx=(0, 5)) self._upload_btn = make_icon_button( btn_frame, "upload", t("s3_upload"), width=100, command=self._upload, ) self._upload_btn.pack(side="left", padx=(0, 5)) self._download_btn = make_icon_button( btn_frame, "download", t("s3_download"), width=110, command=self._download, ) self._download_btn.pack(side="left", padx=(0, 5)) self._delete_btn = make_icon_button( btn_frame, "delete", t("s3_delete"), width=100, fg_color="#dc2626", hover_color="#b91c1c", command=self._delete, ) self._delete_btn.pack(side="left") # ── Bucket selector row ── bucket_frame = ctk.CTkFrame(self, fg_color="transparent") bucket_frame.pack(fill="x", padx=15, pady=(5, 5)) ctk.CTkLabel(bucket_frame, text=t("s3_bucket"), font=ctk.CTkFont(size=12, weight="bold")).pack(side="left", padx=(0, 5)) self._bucket_var = ctk.StringVar(value="") self._bucket_menu = ctk.CTkOptionMenu( bucket_frame, variable=self._bucket_var, values=[""], width=200, command=self._on_bucket_change, ) self._bucket_menu.pack(side="left", padx=(0, 15)) # Path display self._path_label = ctk.CTkLabel( bucket_frame, text="/", font=ctk.CTkFont(family="Consolas", size=12), text_color="#60a5fa", ) self._path_label.pack(side="left", fill="x", expand=True) # ── Progress bar (hidden by default) ── self._progress_frame = ctk.CTkFrame(self, fg_color="transparent") # Don't pack yet — shown only during transfer self._progress_label = ctk.CTkLabel( self._progress_frame, text="", font=ctk.CTkFont(size=11), text_color="#9ca3af", ) self._progress_label.pack(side="left", padx=(0, 10)) self._progress_bar = ctk.CTkProgressBar(self._progress_frame, width=300, height=14) self._progress_bar.set(0) self._progress_bar.pack(side="left", fill="x", expand=True) self._progress_pct = ctk.CTkLabel( self._progress_frame, text="0%", font=ctk.CTkFont(size=11, weight="bold"), text_color="#60a5fa", width=45, ) self._progress_pct.pack(side="left", padx=(10, 0)) self._transfer_bytes = 0 self._transfer_total = 0 # ── Treeview for objects ── self._tree_frame = ctk.CTkFrame(self, fg_color="#1e1e1e", corner_radius=8) self._tree_frame.pack(fill="both", expand=True, padx=15, pady=(5, 15)) columns = ("name", "size", "modified") self._tree = ttk.Treeview( self._tree_frame, columns=columns, show="headings", selectmode="browse", style="Dark.Treeview", ) self._tree.heading("name", text=t("s3_col_name")) self._tree.heading("size", text=t("s3_col_size")) self._tree.heading("modified", text=t("s3_col_modified")) self._tree.column("name", width=400, minwidth=200) self._tree.column("size", width=100, minwidth=80, anchor="e") self._tree.column("modified", width=180, minwidth=120) scrollbar = ttk.Scrollbar(self._tree_frame, orient="vertical", command=self._tree.yview, style="Dark.Vertical.TScrollbar") self._tree.configure(yscrollcommand=scrollbar.set) self._tree.pack(side="left", fill="both", expand=True) scrollbar.pack(side="right", fill="y") # Double-click to enter prefix (folder) or download file self._tree.bind("", self._on_double_click) # Backspace / Alt+Left to go back self._tree.bind("", lambda e: self._go_back()) self._tree.bind("", lambda e: self._go_back()) # Right-click context menu self._ctx_menu = tk.Menu(self._tree, tearoff=0) self._ctx_menu.add_command( label=icon_text("copy", t("s3_copy_link_48h")), command=self._copy_link_48h, ) self._ctx_menu.add_command( label=icon_text("copy", t("s3_copy_link_permanent")), command=self._copy_link_permanent, ) self._ctx_menu.add_separator() self._ctx_menu.add_command( label=icon_text("download", t("s3_download")), command=self._download, ) self._ctx_menu.add_separator() self._ctx_menu.add_command( label=icon_text("add", t("s3_new_folder")), command=self._create_folder, ) self._ctx_menu.add_separator() self._ctx_menu.add_command( label=icon_text("delete", t("s3_delete")), command=self._delete, ) self._tree.bind("", self._on_right_click) # Empty area context menu (back + new folder) self._bg_menu = tk.Menu(self._tree, tearoff=0) self._bg_menu.add_command( label=icon_text("back", t("s3_back")), command=self._go_back, ) self._bg_menu.add_command( label=icon_text("add", t("s3_new_folder")), command=self._create_folder, ) # Dark treeview style style = ttk.Style() style.configure("Dark.Treeview", background="#2b2b2b", foreground="#e5e5e5", fieldbackground="#2b2b2b", borderwidth=0, rowheight=26) style.configure("Dark.Treeview.Heading", background="#333333", foreground="#e5e5e5", borderwidth=0, relief="flat") style.map("Dark.Treeview", background=[("selected", "#1e3a5f")], foreground=[("selected", "#ffffff")]) # ── Drop zone overlay (shown when no files / as hint) ── self._drop_hint = ctk.CTkLabel( self._tree_frame, text=t("s3_drop_hint"), font=ctk.CTkFont(size=14), text_color="#6b7280", ) # Setup OS drag-and-drop on the treeview area self.after(200, self._init_dnd) def _init_dnd(self): """Initialize drag-and-drop after widget is mapped.""" try: self._dnd_active = _setup_drop(self._tree, self._on_files_dropped) if not self._dnd_active: # Try on the frame too self._dnd_active = _setup_drop(self._tree_frame, self._on_files_dropped) except Exception: self._dnd_active = False def _on_files_dropped(self, files): """Handle files/folders dropped from OS file manager.""" if not self._client or not self._current_bucket: return # windnd gives list of bytes on Windows raw_paths = [] for f in files: if isinstance(f, bytes): raw_paths.append(f.decode("utf-8", errors="replace")) else: raw_paths.append(str(f)) # Collect (local_path, s3_key_suffix) pairs upload_pairs: list[tuple[str, str]] = [] for p in raw_paths: if os.path.isfile(p): upload_pairs.append((p, os.path.basename(p))) elif os.path.isdir(p): base = os.path.basename(p.rstrip("/\\")) for root, _dirs, fnames in os.walk(p): for fn in fnames: full = os.path.join(root, fn) rel = os.path.relpath(full, os.path.dirname(p)) rel = rel.replace("\\", "/") upload_pairs.append((full, rel)) if not upload_pairs: return self._upload_pairs(upload_pairs) def _show_progress(self, label: str, total_bytes: int): """Show and reset the progress bar.""" self._transfer_bytes = 0 self._transfer_total = max(total_bytes, 1) self._progress_bar.set(0) self._progress_pct.configure(text="0%") self._progress_label.configure(text=label) self._progress_frame.pack(fill="x", padx=15, pady=(2, 2), before=self._tree_frame) def _hide_progress(self): """Hide the progress bar.""" self._progress_frame.pack_forget() def _on_progress(self, chunk_bytes: int): """Called from transfer thread — schedule GUI update.""" self._transfer_bytes += chunk_bytes self.after(0, self._update_progress) def _update_progress(self): """Update progress bar on GUI thread.""" if self._transfer_total <= 0: return ratio = min(self._transfer_bytes / self._transfer_total, 1.0) self._progress_bar.set(ratio) pct = int(ratio * 100) self._progress_pct.configure(text=f"{pct}%") size_str = f"{_human_size(self._transfer_bytes)} / {_human_size(self._transfer_total)}" label = self._progress_label.cget("text").split(" — ")[0] self._progress_label.configure(text=f"{label} — {size_str}") def _on_transfer_status(self, message: str): """Called from transfer thread with retry/status info.""" # Note: do NOT reset _transfer_bytes here — resumable download # reports already-downloaded bytes via progress_cb, so resetting # would break the progress bar on resume. self.after(0, lambda: self._status_label.configure(text=message)) def _upload_files(self, paths: list[str]): """Upload multiple files to current prefix (flat — no subdirs).""" pairs = [(p, os.path.basename(p)) for p in paths if os.path.isfile(p)] if pairs: self._upload_pairs(pairs) def _upload_pairs(self, pairs: list[tuple[str, str]]): """Upload (local_path, relative_key) pairs to current prefix.""" if not self._client or not self._current_bucket: return total_files = len(pairs) total_bytes = sum(os.path.getsize(p) for p, _ in pairs if os.path.isfile(p)) label = (t("s3_uploading_n").format(count=total_files) if total_files > 1 else t("s3_uploading")) self._status_label.configure(text=label) self._show_progress(label, total_bytes) def _do(): ok_count = 0 for local_path, rel_key in pairs: key = self._current_prefix + rel_key if self._client.upload_file( local_path, self._current_bucket, key, progress_cb=self._on_progress, status_cb=self._on_transfer_status): ok_count += 1 self.after(0, lambda: self._on_upload_done(ok_count, total_files)) threading.Thread(target=_do, daemon=True).start() def _on_upload_done(self, ok_count: int, total: int): self._hide_progress() if ok_count == total: self._status_label.configure( text=t("s3_uploaded_n").format(count=ok_count)) else: self._status_label.configure( text=t("s3_upload_partial").format(ok=ok_count, total=total)) self._refresh() # -- server switch ---------------------------------------------------- def set_server(self, alias: str | None): """Called when user selects a server in sidebar.""" if alias == self._current_alias: return self._current_alias = alias if not alias: self._client = None self._tree.delete(*self._tree.get_children()) self._status_label.configure(text="") return self._client = None self._current_prefix = "" self._nav_stack.clear() self._tree.delete(*self._tree.get_children()) self._status_label.configure(text=t("s3_connecting")) server = self.store.get_server(alias) if not server: return self._current_bucket = server.get("bucket", "") def _connect(): client = S3Client(server) ok = client.connect() if ok: self._client = client self.after(0, self._load_buckets) else: self.after(0, lambda: self._status_label.configure( text=t("s3_connect_failed"))) threading.Thread(target=_connect, daemon=True).start() def _load_buckets(self): if not self._client: return def _fetch(): buckets = self._client.list_buckets() names = [b["Name"] for b in buckets] self.after(0, lambda: self._update_buckets(names)) threading.Thread(target=_fetch, daemon=True).start() def _update_buckets(self, names: list[str]): if not names: names = [""] self._bucket_menu.configure(values=names) if self._current_bucket and self._current_bucket in names: self._bucket_var.set(self._current_bucket) elif names: self._bucket_var.set(names[0]) self._current_bucket = names[0] self._refresh() def _on_bucket_change(self, value: str): self._current_bucket = value self._current_prefix = "" self._nav_stack.clear() self._refresh() # -- navigation ------------------------------------------------------- def _refresh(self): if not self._client or not self._current_bucket: return self._status_label.configure(text=t("s3_loading")) self._path_label.configure(text=f"/{self._current_prefix}" if self._current_prefix else "/") def _fetch(): objects, prefixes = self._client.list_objects( self._current_bucket, self._current_prefix) self.after(0, lambda: self._display(objects, prefixes)) threading.Thread(target=_fetch, daemon=True).start() def _display(self, objects: list[dict], prefixes: list[str]): self._tree.delete(*self._tree.get_children()) # Folders first for prefix in sorted(prefixes): display_name = prefix[len(self._current_prefix):] if display_name.endswith("/"): display_name = display_name[:-1] self._tree.insert("", "end", values=( f"\U0001f4c1 {display_name}/", "", ""), tags=("folder",), ) # Files for obj in sorted(objects, key=lambda o: o["Key"]): name = obj["Key"][len(self._current_prefix):] size = _human_size(obj.get("Size", 0)) modified = "" if obj.get("LastModified"): modified = obj["LastModified"].strftime("%Y-%m-%d %H:%M:%S") self._tree.insert("", "end", values=(name, size, modified), tags=("file",)) count = len(objects) + len(prefixes) self._status_label.configure(text=t("s3_items_count").format(count=count)) self._back_btn.configure( state="normal" if self._current_prefix else "disabled") # Show drop hint if empty if count == 0: self._drop_hint.place(relx=0.5, rely=0.5, anchor="center") else: self._drop_hint.place_forget() def _on_double_click(self, event): sel = self._tree.selection() if not sel: return item = self._tree.item(sel[0]) name = item["values"][0] if item["values"] else "" if not isinstance(name, str): name = str(name) # Check if it's a folder if name.endswith("/"): # Strip folder icon clean = name.replace("\U0001f4c1 ", "").strip() self._nav_stack.append(self._current_prefix) self._current_prefix = self._current_prefix + clean if not self._current_prefix.endswith("/"): self._current_prefix += "/" self._refresh() else: # Double-click file = download self._download() def _on_right_click(self, event): """Show context menu on right-click.""" item_id = self._tree.identify_row(event.y) if not item_id: # Clicked on empty area — show background menu self._bg_menu.entryconfigure(0, state="normal" if self._current_prefix else "disabled") self._bg_menu.tk_popup(event.x_root, event.y_root) return self._tree.selection_set(item_id) # Check if it's a file (not a folder) item = self._tree.item(item_id) name = item["values"][0] if item["values"] else "" if not isinstance(name, str): name = str(name) is_folder = name.endswith("/") # Enable/disable menu items based on type file_state = "normal" if not is_folder else "disabled" self._ctx_menu.entryconfigure(0, state=file_state) # Link 48h self._ctx_menu.entryconfigure(1, state=file_state) # Link permanent self._ctx_menu.entryconfigure(3, state="normal") # Download (files + folders) # 5 = New Folder — always enabled # 7 = Delete — enabled for both files and folders self._ctx_menu.tk_popup(event.x_root, event.y_root) def _get_selected_key(self) -> str | None: """Return the full S3 key for the selected file, or None.""" sel = self._tree.selection() if not sel or not self._client or not self._current_bucket: return None item = self._tree.item(sel[0]) name = item["values"][0] if item["values"] else "" if not isinstance(name, str): name = str(name) if name.endswith("/"): return None return self._current_prefix + name def _copy_link_48h(self): """Generate presigned URL (48h) and copy to clipboard.""" key = self._get_selected_key() if not key: return self._status_label.configure(text=t("s3_generating_link")) def _do(): url = self._client.generate_presigned_url( self._current_bucket, key, expires_in=48 * 3600) self.after(0, lambda: self._on_link_ready(url, "48h")) threading.Thread(target=_do, daemon=True).start() def _copy_link_permanent(self): """Build direct (permanent) URL and copy to clipboard.""" key = self._get_selected_key() if not key: return url = self._client.get_direct_url(self._current_bucket, key) if url: self.clipboard_clear() self.clipboard_append(url) self._status_label.configure(text=t("s3_link_copied")) else: self._status_label.configure(text=t("s3_link_failed")) def _on_link_ready(self, url: str | None, kind: str = ""): if url: self.clipboard_clear() self.clipboard_append(url) self._status_label.configure(text=t("s3_link_copied")) else: self._status_label.configure(text=t("s3_link_failed")) def _create_folder(self): """Prompt for folder name and create it as an empty prefix in S3.""" if not self._client or not self._current_bucket: return dialog = ctk.CTkInputDialog( text=t("s3_folder_name_prompt"), title=t("s3_new_folder"), ) name = dialog.get_input() if not name or not name.strip(): return name = name.strip().strip("/") key = self._current_prefix + name + "/" self._status_label.configure(text=t("s3_creating_folder")) def _do(): ok = self._client.create_folder(self._current_bucket, key) if ok: self.after(0, self._refresh) else: self.after(0, lambda: self._status_label.configure( text=t("s3_folder_failed"))) threading.Thread(target=_do, daemon=True).start() def _go_back(self): if self._nav_stack: self._current_prefix = self._nav_stack.pop() else: self._current_prefix = "" self._refresh() # -- actions ---------------------------------------------------------- def _upload(self): if not self._client or not self._current_bucket: return paths = filedialog.askopenfilenames() if not paths: return self._upload_files(list(paths)) def _download(self): sel = self._tree.selection() if not sel or not self._client or not self._current_bucket: return item = self._tree.item(sel[0]) name = item["values"][0] if item["values"] else "" if not isinstance(name, str): name = str(name) if name.endswith("/"): self._download_folder(name) else: self._download_file(name) def _download_file(self, name: str): """Download a single file.""" key = self._current_prefix + name save_path = filedialog.asksaveasfilename(initialfile=name) if not save_path: return total_bytes = self._client.get_object_size(self._current_bucket, key) label = t("s3_downloading") self._status_label.configure(text=label) self._show_progress(label, total_bytes) def _do(): ok = self._client.download_file( self._current_bucket, key, save_path, progress_cb=self._on_progress, status_cb=self._on_transfer_status) self.after(0, lambda: self._on_download_done(ok)) threading.Thread(target=_do, daemon=True).start() def _download_folder(self, display_name: str): """Download all objects under a prefix, preserving directory structure.""" clean = display_name.replace("\U0001f4c1 ", "").strip() prefix = self._current_prefix + clean # Ask user to pick a local directory dest_dir = filedialog.askdirectory(title=t("s3_download_folder_title")) if not dest_dir: return self._status_label.configure(text=t("s3_downloading")) def _do(): # List all objects recursively under this prefix objects = self._client.list_all_objects(self._current_bucket, prefix) if not objects: self.after(0, lambda: self._status_label.configure( text=t("s3_download_failed"))) return total_bytes = sum(o.get("Size", 0) for o in objects) self.after(0, lambda: self._show_progress( t("s3_downloading_n").format(count=len(objects)), total_bytes)) ok_count = 0 for obj in objects: obj_key = obj["Key"] # Relative path from the selected folder rel = obj_key[len(self._current_prefix):] local_path = os.path.join(dest_dir, rel.replace("/", os.sep)) os.makedirs(os.path.dirname(local_path), exist_ok=True) if self._client.download_file( self._current_bucket, obj_key, local_path, progress_cb=self._on_progress, status_cb=self._on_transfer_status): ok_count += 1 total = len(objects) self.after(0, lambda: self._on_folder_download_done(ok_count, total)) threading.Thread(target=_do, daemon=True).start() def _on_folder_download_done(self, ok_count: int, total: int): self._hide_progress() if ok_count == total: self._status_label.configure( text=t("s3_downloaded_n").format(count=ok_count)) else: self._status_label.configure( text=t("s3_download_partial").format(ok=ok_count, total=total)) def _on_download_done(self, success: bool): self._hide_progress() if success: self._status_label.configure(text=t("s3_download_ok")) else: self._status_label.configure(text=t("s3_download_failed")) def _delete(self): sel = self._tree.selection() if not sel or not self._client or not self._current_bucket: return item = self._tree.item(sel[0]) name = item["values"][0] if item["values"] else "" if not isinstance(name, str): name = str(name) if name.endswith("/"): # Folder — ask confirmation clean = name.replace("\U0001f4c1 ", "").strip() prefix = self._current_prefix + clean from tkinter import messagebox ok = messagebox.askyesno( t("s3_delete"), t("s3_delete_folder_confirm").format(folder=clean), ) if not ok: return self._status_label.configure(text=t("s3_deleting")) def _do_folder(): count = self._client.delete_prefix(self._current_bucket, prefix) self.after(0, lambda: self._status_label.configure( text=t("s3_deleted_n").format(count=count))) self.after(0, self._refresh) threading.Thread(target=_do_folder, daemon=True).start() else: # Single file key = self._current_prefix + name def _do(): ok = self._client.delete_object(self._current_bucket, key) if ok: self.after(0, self._refresh) else: self.after(0, lambda: self._status_label.configure( text=t("s3_delete_failed"))) self._status_label.configure(text=t("s3_deleting")) threading.Thread(target=_do, daemon=True).start()