""" S3 tab — bucket/object browser with upload, download, delete actions. Supports drag-and-drop from OS file manager for upload. """ import os import sys import tempfile import threading import tkinter as tk from tkinter import ttk, filedialog import customtkinter as ctk from core.s3_client import S3Client from core.i18n import t from core.icons import icon_text, make_icon_button from gui.tabs.query_tab import apply_dark_scrollbar_style def _human_size(size_bytes: int) -> str: """Format bytes to human-readable string.""" if size_bytes < 1024: return f"{size_bytes} B" for unit in ("KB", "MB", "GB", "TB"): size_bytes /= 1024 if size_bytes < 1024: return f"{size_bytes:.1f} {unit}" return f"{size_bytes:.1f} PB" def _setup_drop(widget, callback): """Setup OS drag-and-drop onto a widget. Cross-platform with graceful fallback.""" if sys.platform == "win32": try: import windnd windnd.hook_dropfiles(widget, func=callback) return True except Exception: pass # tkinterdnd2 fallback (works on Linux/macOS if installed) try: widget.drop_target_register("DND_Files") widget.dnd_bind("<>", lambda e: callback(_parse_dnd_paths(e.data))) return True except Exception: pass return False def _parse_dnd_paths(data: str) -> list[str]: """Parse tkinterdnd2 drop data into list of file paths.""" paths = [] # tkinterdnd2 wraps paths with spaces in braces: {C:/path with spaces/file.txt} i = 0 while i < len(data): if data[i] == '{': end = data.index('}', i) paths.append(data[i + 1:end]) i = end + 2 # skip } and space elif data[i] == ' ': i += 1 else: end = data.find(' ', i) if end == -1: end = len(data) paths.append(data[i:end]) i = end + 1 return paths class S3Tab(ctk.CTkFrame): def __init__(self, master, store): super().__init__(master, fg_color="transparent") self.store = store self._current_alias: str | None = None self._client: S3Client | None = None self._current_bucket: str = "" self._current_prefix: str = "" self._nav_stack: list[str] = [] self._dnd_active = False self._build_ui() def _build_ui(self): apply_dark_scrollbar_style() # ── Header ── header = ctk.CTkFrame(self, fg_color="transparent") header.pack(fill="x", padx=15, pady=(15, 5)) self._title_label = ctk.CTkLabel( header, text=t("s3_objects"), font=ctk.CTkFont(size=18, weight="bold"), ) self._title_label.pack(side="left") self._status_label = ctk.CTkLabel( header, text="", font=ctk.CTkFont(size=12), text_color="#9ca3af", ) self._status_label.pack(side="left", padx=(15, 0)) # Buttons btn_frame = ctk.CTkFrame(header, fg_color="transparent") btn_frame.pack(side="right") self._back_btn = make_icon_button( btn_frame, "back", t("s3_back"), width=80, command=self._go_back, state="disabled", ) self._back_btn.pack(side="left", padx=(0, 5)) self._refresh_btn = make_icon_button( btn_frame, "refresh", t("s3_refresh"), width=100, command=self._refresh, ) self._refresh_btn.pack(side="left", padx=(0, 5)) self._upload_btn = make_icon_button( btn_frame, "upload", t("s3_upload"), width=100, command=self._upload, ) self._upload_btn.pack(side="left", padx=(0, 5)) self._download_btn = make_icon_button( btn_frame, "download", t("s3_download"), width=110, command=self._download, ) self._download_btn.pack(side="left", padx=(0, 5)) self._delete_btn = make_icon_button( btn_frame, "delete", t("s3_delete"), width=100, fg_color="#dc2626", hover_color="#b91c1c", command=self._delete, ) self._delete_btn.pack(side="left") # ── Bucket selector row ── bucket_frame = ctk.CTkFrame(self, fg_color="transparent") bucket_frame.pack(fill="x", padx=15, pady=(5, 5)) ctk.CTkLabel(bucket_frame, text=t("s3_bucket"), font=ctk.CTkFont(size=12, weight="bold")).pack(side="left", padx=(0, 5)) self._bucket_var = ctk.StringVar(value="") self._bucket_menu = ctk.CTkOptionMenu( bucket_frame, variable=self._bucket_var, values=[""], width=200, command=self._on_bucket_change, ) self._bucket_menu.pack(side="left", padx=(0, 15)) # Path display self._path_label = ctk.CTkLabel( bucket_frame, text="/", font=ctk.CTkFont(family="Consolas", size=12), text_color="#60a5fa", ) self._path_label.pack(side="left", fill="x", expand=True) # ── Progress bar (hidden by default) ── self._progress_frame = ctk.CTkFrame(self, fg_color="transparent") # Don't pack yet — shown only during transfer self._progress_label = ctk.CTkLabel( self._progress_frame, text="", font=ctk.CTkFont(size=11), text_color="#9ca3af", ) self._progress_label.pack(side="left", padx=(0, 10)) self._progress_bar = ctk.CTkProgressBar(self._progress_frame, width=300, height=14) self._progress_bar.set(0) self._progress_bar.pack(side="left", fill="x", expand=True) self._progress_pct = ctk.CTkLabel( self._progress_frame, text="0%", font=ctk.CTkFont(size=11, weight="bold"), text_color="#60a5fa", width=45, ) self._progress_pct.pack(side="left", padx=(10, 0)) self._transfer_bytes = 0 self._transfer_total = 0 # ── Treeview for objects ── self._tree_frame = ctk.CTkFrame(self, fg_color="#1e1e1e", corner_radius=8) self._tree_frame.pack(fill="both", expand=True, padx=15, pady=(5, 15)) columns = ("name", "size", "modified") self._tree = ttk.Treeview( self._tree_frame, columns=columns, show="headings", selectmode="browse", style="Dark.Treeview", ) self._tree.heading("name", text=t("s3_col_name")) self._tree.heading("size", text=t("s3_col_size")) self._tree.heading("modified", text=t("s3_col_modified")) self._tree.column("name", width=400, minwidth=200) self._tree.column("size", width=100, minwidth=80, anchor="e") self._tree.column("modified", width=180, minwidth=120) scrollbar = ttk.Scrollbar(self._tree_frame, orient="vertical", command=self._tree.yview, style="Dark.Vertical.TScrollbar") self._tree.configure(yscrollcommand=scrollbar.set) self._tree.pack(side="left", fill="both", expand=True) scrollbar.pack(side="right", fill="y") # Double-click to enter prefix (folder) or download file self._tree.bind("", self._on_double_click) # Right-click context menu self._ctx_menu = tk.Menu(self._tree, tearoff=0) self._ctx_menu.add_command( label=icon_text("copy", t("s3_copy_link")), command=self._copy_link, ) self._ctx_menu.add_command( label=icon_text("download", t("s3_download")), command=self._download, ) self._ctx_menu.add_separator() self._ctx_menu.add_command( label=icon_text("delete", t("s3_delete")), command=self._delete, ) self._tree.bind("", self._on_right_click) # Dark treeview style style = ttk.Style() style.configure("Dark.Treeview", background="#2b2b2b", foreground="#e5e5e5", fieldbackground="#2b2b2b", borderwidth=0, rowheight=26) style.configure("Dark.Treeview.Heading", background="#333333", foreground="#e5e5e5", borderwidth=0, relief="flat") style.map("Dark.Treeview", background=[("selected", "#1e3a5f")], foreground=[("selected", "#ffffff")]) # ── Drop zone overlay (shown when no files / as hint) ── self._drop_hint = ctk.CTkLabel( self._tree_frame, text=t("s3_drop_hint"), font=ctk.CTkFont(size=14), text_color="#6b7280", ) # Setup OS drag-and-drop on the treeview area self.after(200, self._init_dnd) def _init_dnd(self): """Initialize drag-and-drop after widget is mapped.""" try: self._dnd_active = _setup_drop(self._tree, self._on_files_dropped) if not self._dnd_active: # Try on the frame too self._dnd_active = _setup_drop(self._tree_frame, self._on_files_dropped) except Exception: self._dnd_active = False def _on_files_dropped(self, files): """Handle files dropped from OS file manager.""" if not self._client or not self._current_bucket: return # windnd gives list of bytes on Windows paths = [] for f in files: if isinstance(f, bytes): paths.append(f.decode("utf-8", errors="replace")) else: paths.append(str(f)) paths = [p for p in paths if os.path.isfile(p)] if not paths: return self._upload_files(paths) def _show_progress(self, label: str, total_bytes: int): """Show and reset the progress bar.""" self._transfer_bytes = 0 self._transfer_total = max(total_bytes, 1) self._progress_bar.set(0) self._progress_pct.configure(text="0%") self._progress_label.configure(text=label) self._progress_frame.pack(fill="x", padx=15, pady=(2, 2), before=self._tree_frame) def _hide_progress(self): """Hide the progress bar.""" self._progress_frame.pack_forget() def _on_progress(self, chunk_bytes: int): """Called from transfer thread — schedule GUI update.""" self._transfer_bytes += chunk_bytes self.after(0, self._update_progress) def _update_progress(self): """Update progress bar on GUI thread.""" if self._transfer_total <= 0: return ratio = min(self._transfer_bytes / self._transfer_total, 1.0) self._progress_bar.set(ratio) pct = int(ratio * 100) self._progress_pct.configure(text=f"{pct}%") size_str = f"{_human_size(self._transfer_bytes)} / {_human_size(self._transfer_total)}" label = self._progress_label.cget("text").split(" — ")[0] self._progress_label.configure(text=f"{label} — {size_str}") def _on_transfer_status(self, message: str): """Called from transfer thread with retry/status info.""" # Reset progress on retry (boto3 restarts the transfer) self._transfer_bytes = 0 self.after(0, lambda: self._status_label.configure(text=message)) def _upload_files(self, paths: list[str]): """Upload multiple files to current prefix.""" if not self._client or not self._current_bucket: return total_files = len(paths) total_bytes = sum(os.path.getsize(p) for p in paths if os.path.isfile(p)) label = (t("s3_uploading_n").format(count=total_files) if total_files > 1 else t("s3_uploading")) self._status_label.configure(text=label) self._show_progress(label, total_bytes) def _do(): ok_count = 0 for path in paths: filename = os.path.basename(path) key = self._current_prefix + filename if self._client.upload_file( path, self._current_bucket, key, progress_cb=self._on_progress, status_cb=self._on_transfer_status): ok_count += 1 self.after(0, lambda: self._on_upload_done(ok_count, total_files)) threading.Thread(target=_do, daemon=True).start() def _on_upload_done(self, ok_count: int, total: int): self._hide_progress() if ok_count == total: self._status_label.configure( text=t("s3_uploaded_n").format(count=ok_count)) else: self._status_label.configure( text=t("s3_upload_partial").format(ok=ok_count, total=total)) self._refresh() # -- server switch ---------------------------------------------------- def set_server(self, alias: str | None): """Called when user selects a server in sidebar.""" if alias == self._current_alias: return self._current_alias = alias if not alias: self._client = None self._tree.delete(*self._tree.get_children()) self._status_label.configure(text="") return self._client = None self._current_prefix = "" self._nav_stack.clear() self._tree.delete(*self._tree.get_children()) self._status_label.configure(text=t("s3_connecting")) server = self.store.get_server(alias) if not server: return self._current_bucket = server.get("bucket", "") def _connect(): client = S3Client(server) ok = client.connect() if ok: self._client = client self.after(0, self._load_buckets) else: self.after(0, lambda: self._status_label.configure( text=t("s3_connect_failed"))) threading.Thread(target=_connect, daemon=True).start() def _load_buckets(self): if not self._client: return def _fetch(): buckets = self._client.list_buckets() names = [b["Name"] for b in buckets] self.after(0, lambda: self._update_buckets(names)) threading.Thread(target=_fetch, daemon=True).start() def _update_buckets(self, names: list[str]): if not names: names = [""] self._bucket_menu.configure(values=names) if self._current_bucket and self._current_bucket in names: self._bucket_var.set(self._current_bucket) elif names: self._bucket_var.set(names[0]) self._current_bucket = names[0] self._refresh() def _on_bucket_change(self, value: str): self._current_bucket = value self._current_prefix = "" self._nav_stack.clear() self._refresh() # -- navigation ------------------------------------------------------- def _refresh(self): if not self._client or not self._current_bucket: return self._status_label.configure(text=t("s3_loading")) self._path_label.configure(text=f"/{self._current_prefix}" if self._current_prefix else "/") def _fetch(): objects, prefixes = self._client.list_objects( self._current_bucket, self._current_prefix) self.after(0, lambda: self._display(objects, prefixes)) threading.Thread(target=_fetch, daemon=True).start() def _display(self, objects: list[dict], prefixes: list[str]): self._tree.delete(*self._tree.get_children()) # Folders first for prefix in sorted(prefixes): display_name = prefix[len(self._current_prefix):] if display_name.endswith("/"): display_name = display_name[:-1] self._tree.insert("", "end", values=( f"\U0001f4c1 {display_name}/", "", ""), tags=("folder",), ) # Files for obj in sorted(objects, key=lambda o: o["Key"]): name = obj["Key"][len(self._current_prefix):] size = _human_size(obj.get("Size", 0)) modified = "" if obj.get("LastModified"): modified = obj["LastModified"].strftime("%Y-%m-%d %H:%M:%S") self._tree.insert("", "end", values=(name, size, modified), tags=("file",)) count = len(objects) + len(prefixes) self._status_label.configure(text=t("s3_items_count").format(count=count)) self._back_btn.configure( state="normal" if self._current_prefix else "disabled") # Show drop hint if empty if count == 0: self._drop_hint.place(relx=0.5, rely=0.5, anchor="center") else: self._drop_hint.place_forget() def _on_double_click(self, event): sel = self._tree.selection() if not sel: return item = self._tree.item(sel[0]) name = item["values"][0] if item["values"] else "" if not isinstance(name, str): name = str(name) # Check if it's a folder if name.endswith("/"): # Strip folder icon clean = name.replace("\U0001f4c1 ", "").strip() self._nav_stack.append(self._current_prefix) self._current_prefix = self._current_prefix + clean if not self._current_prefix.endswith("/"): self._current_prefix += "/" self._refresh() else: # Double-click file = download self._download() def _on_right_click(self, event): """Show context menu on right-click.""" item_id = self._tree.identify_row(event.y) if not item_id: return self._tree.selection_set(item_id) # Check if it's a file (not a folder) item = self._tree.item(item_id) name = item["values"][0] if item["values"] else "" if not isinstance(name, str): name = str(name) is_folder = name.endswith("/") # Enable/disable menu items based on type self._ctx_menu.entryconfigure(0, state="normal" if not is_folder else "disabled") # Copy link self._ctx_menu.entryconfigure(1, state="normal" if not is_folder else "disabled") # Download self._ctx_menu.entryconfigure(3, state="normal" if not is_folder else "disabled") # Delete self._ctx_menu.tk_popup(event.x_root, event.y_root) def _copy_link(self): """Generate presigned URL and copy to clipboard.""" sel = self._tree.selection() if not sel or not self._client or not self._current_bucket: return item = self._tree.item(sel[0]) name = item["values"][0] if item["values"] else "" if not isinstance(name, str): name = str(name) if name.endswith("/"): return key = self._current_prefix + name self._status_label.configure(text=t("s3_generating_link")) def _do(): url = self._client.generate_presigned_url(self._current_bucket, key) self.after(0, lambda: self._on_link_ready(url)) threading.Thread(target=_do, daemon=True).start() def _on_link_ready(self, url: str | None): if url: self.clipboard_clear() self.clipboard_append(url) self._status_label.configure(text=t("s3_link_copied")) else: self._status_label.configure(text=t("s3_link_failed")) def _go_back(self): if self._nav_stack: self._current_prefix = self._nav_stack.pop() else: self._current_prefix = "" self._refresh() # -- actions ---------------------------------------------------------- def _upload(self): if not self._client or not self._current_bucket: return paths = filedialog.askopenfilenames() if not paths: return self._upload_files(list(paths)) def _download(self): sel = self._tree.selection() if not sel: return item = self._tree.item(sel[0]) name = item["values"][0] if item["values"] else "" if not isinstance(name, str): name = str(name) if name.endswith("/"): return # Can't download a folder key = self._current_prefix + name save_path = filedialog.asksaveasfilename(initialfile=name) if not save_path: return # Get file size for progress total_bytes = self._client.get_object_size(self._current_bucket, key) label = t("s3_downloading") self._status_label.configure(text=label) self._show_progress(label, total_bytes) def _do(): ok = self._client.download_file( self._current_bucket, key, save_path, progress_cb=self._on_progress, status_cb=self._on_transfer_status) if ok: self.after(0, lambda: self._on_download_done(True)) else: self.after(0, lambda: self._on_download_done(False)) threading.Thread(target=_do, daemon=True).start() def _on_download_done(self, success: bool): self._hide_progress() if success: self._status_label.configure(text=t("s3_download_ok")) else: self._status_label.configure(text=t("s3_download_failed")) def _delete(self): sel = self._tree.selection() if not sel: return item = self._tree.item(sel[0]) name = item["values"][0] if item["values"] else "" if not isinstance(name, str): name = str(name) if name.endswith("/"): return # Don't delete prefixes key = self._current_prefix + name def _do(): ok = self._client.delete_object(self._current_bucket, key) if ok: self.after(0, self._refresh) else: self.after(0, lambda: self._status_label.configure( text=t("s3_delete_failed"))) self._status_label.configure(text=t("s3_deleting")) threading.Thread(target=_do, daemon=True).start()