""" Files tab — dual-pane SFTP file manager. """ import os import platform import stat import string import threading from datetime import datetime from tkinter import messagebox, filedialog import customtkinter as ctk from core.i18n import t from core.ssh_client import SFTPSession from gui.widgets.file_list import FileListWidget def _format_size(size_bytes: int) -> str: if size_bytes < 0: return "" for unit in ("B", "KB", "MB", "GB"): if size_bytes < 1024: return f"{size_bytes:.1f} {unit}" if unit != "B" else f"{size_bytes} B" size_bytes /= 1024 return f"{size_bytes:.1f} TB" def _format_date(mtime: float) -> str: try: dt = datetime.fromtimestamp(mtime) return dt.strftime("%b %d %H:%M") except Exception: return "" def _format_perm(mode: int) -> str: parts = [] for who in (6, 3, 0): m = (mode >> who) & 7 parts.append( ("r" if m & 4 else "-") + ("w" if m & 2 else "-") + ("x" if m & 1 else "-") ) return "".join(parts) def _get_windows_drives() -> list[str]: """Return list of available drive letters on Windows (e.g. ['C:\\', 'D:\\']).""" drives = [] for letter in string.ascii_uppercase: drive = f"{letter}:\\" if os.path.exists(drive): drives.append(drive) return drives class FilesTab(ctk.CTkFrame): def __init__(self, master, store, session_pool=None): super().__init__(master, fg_color="transparent") self.store = store self.session_pool = session_pool self._current_alias: str | None = None self._sftp: SFTPSession | None = None self._local_path = os.path.expanduser("~") self._remote_path = "/" self._local_history: list[str] = [] self._remote_history: list[str] = [] self._transferring = False self._sudo_var = ctk.BooleanVar(value=False) self._build_ui() self._refresh_local() def _build_ui(self): # === Panes area === panes = ctk.CTkFrame(self, fg_color="transparent") panes.pack(fill="both", expand=True, padx=10, pady=(10, 5)) # Left pane — Local left_pane = ctk.CTkFrame(panes, fg_color="transparent") left_pane.pack(side="left", fill="both", expand=True, padx=(0, 5)) left_header = ctk.CTkFrame(left_pane, fg_color="transparent") left_header.pack(fill="x", pady=(0, 4)) ctk.CTkLabel(left_header, text=t("local_files"), font=ctk.CTkFont(size=13, weight="bold")).pack(side="left") self._local_back_btn = ctk.CTkButton( left_header, text="\u2190", width=30, height=28, command=self._local_go_back, ) self._local_back_btn.pack(side="left", padx=(8, 2)) self._local_up_btn = ctk.CTkButton( left_header, text="\u2191", width=30, height=28, command=self._local_go_up, ) self._local_up_btn.pack(side="left", padx=2) # Local refresh button self._local_refresh_btn = ctk.CTkButton( left_header, text="\u21BB", width=30, height=28, command=self._refresh_local, ) self._local_refresh_btn.pack(side="left", padx=2) # Browse button self._browse_btn = ctk.CTkButton( left_header, text=t("browse"), width=60, height=28, command=self._browse_local, ) self._browse_btn.pack(side="left", padx=2) # Windows drive selector self._drive_menu = None if platform.system() == "Windows": drives = _get_windows_drives() if drives: current_drive = os.path.splitdrive(self._local_path)[0] + "\\" self._drive_var = ctk.StringVar(value=current_drive) self._drive_menu = ctk.CTkOptionMenu( left_header, values=drives, variable=self._drive_var, width=65, height=28, command=self._on_drive_change, ) self._drive_menu.pack(side="left", padx=2) self._local_path_entry = ctk.CTkEntry(left_header, height=28) self._local_path_entry.pack(side="left", fill="x", expand=True, padx=(4, 0)) self._local_path_entry.bind("", lambda e: self._local_go_to_path()) self._local_list = FileListWidget( left_pane, columns=[(t("name_col"), 220), (t("size_col"), 80), (t("date_col"), 110)], on_navigate=self._navigate_local, on_drop=self._on_drop_to_local, ) self._local_list.pack(fill="both", expand=True) self._local_status = ctk.CTkLabel( left_pane, text="", font=ctk.CTkFont(size=11), text_color="#9ca3af", anchor="w", ) self._local_status.pack(fill="x", pady=(2, 0)) # Right pane — Remote right_pane = ctk.CTkFrame(panes, fg_color="transparent") right_pane.pack(side="right", fill="both", expand=True, padx=(5, 0)) right_header = ctk.CTkFrame(right_pane, fg_color="transparent") right_header.pack(fill="x", pady=(0, 4)) ctk.CTkLabel(right_header, text=t("remote_files"), font=ctk.CTkFont(size=13, weight="bold")).pack(side="left") self._remote_back_btn = ctk.CTkButton( right_header, text="\u2190", width=30, height=28, command=self._remote_go_back, ) self._remote_back_btn.pack(side="left", padx=(8, 2)) self._remote_up_btn = ctk.CTkButton( right_header, text="\u2191", width=30, height=28, command=self._remote_go_up, ) self._remote_up_btn.pack(side="left", padx=2) self._remote_refresh_btn = ctk.CTkButton( right_header, text="\u21BB", width=30, height=28, command=self._refresh_remote, ) self._remote_refresh_btn.pack(side="left", padx=2) self._remote_path_entry = ctk.CTkEntry(right_header, height=28) self._remote_path_entry.pack(side="left", fill="x", expand=True, padx=(4, 0)) self._remote_path_entry.bind("", lambda e: self._remote_go_to_path()) self._remote_list = FileListWidget( right_pane, columns=[ (t("name_col"), 200), (t("size_col"), 70), (t("date_col"), 100), (t("perm_col"), 80), ], on_navigate=self._navigate_remote, on_drop=self._on_drop_to_remote, ) self._remote_list.pack(fill="both", expand=True) self._remote_status = ctk.CTkLabel( right_pane, text=t("connect_to_browse"), font=ctk.CTkFont(size=11), text_color="#9ca3af", anchor="w", ) self._remote_status.pack(fill="x", pady=(2, 0)) # Wire drag-and-drop partners self._local_list.set_drag_partner(self._remote_list) self._remote_list.set_drag_partner(self._local_list) # === Toolbar === toolbar = ctk.CTkFrame(self, fg_color="transparent") toolbar.pack(fill="x", padx=10, pady=4) self._upload_btn = ctk.CTkButton( toolbar, text=f"{t('upload')} \u2192", width=110, height=30, command=self._upload_selected, ) self._upload_btn.pack(side="left", padx=(0, 4)) self._download_btn = ctk.CTkButton( toolbar, text=f"\u2190 {t('download')}", width=110, height=30, command=self._download_selected, ) self._download_btn.pack(side="left", padx=4) sep = ctk.CTkFrame(toolbar, width=2, height=24, fg_color="gray40") sep.pack(side="left", padx=8) self._mkdir_btn = ctk.CTkButton( toolbar, text=t("new_folder"), width=100, height=30, command=self._mkdir_remote, ) self._mkdir_btn.pack(side="left", padx=4) self._delete_btn = ctk.CTkButton( toolbar, text=t("delete_files"), width=80, height=30, fg_color="#dc2626", hover_color="#b91c1c", command=self._delete_remote, ) self._delete_btn.pack(side="left", padx=4) self._rename_btn = ctk.CTkButton( toolbar, text=t("rename_file"), width=100, height=30, command=self._rename_remote, ) self._rename_btn.pack(side="left", padx=4) # Sudo toggle sep2 = ctk.CTkFrame(toolbar, width=2, height=24, fg_color="gray40") sep2.pack(side="left", padx=8) self._sudo_switch = ctk.CTkSwitch( toolbar, text=t("sudo_mode"), variable=self._sudo_var, width=50, height=24, command=self._on_sudo_toggle, ) self._sudo_switch.pack(side="left", padx=4) self._set_remote_buttons_state("disabled") # === Transfer / Log area === transfer_frame = ctk.CTkFrame(self, fg_color="transparent") transfer_frame.pack(fill="x", padx=10, pady=(2, 2)) self._progress = ctk.CTkProgressBar(transfer_frame, height=14) self._progress.pack(fill="x") self._progress.set(0) self._transfer_label = ctk.CTkLabel( transfer_frame, text="", font=ctk.CTkFont(size=11), text_color="#9ca3af", anchor="w", ) self._transfer_label.pack(fill="x") self._log = ctk.CTkTextbox( self, height=80, font=ctk.CTkFont(family="Consolas", size=11), state="disabled", ) self._log.pack(fill="x", padx=10, pady=(0, 10)) # ── Server selection ── def set_server(self, alias: str | None): # Store state of current session before switching if self._current_alias and self._sftp and self.session_pool: self.session_pool.store_sftp_state( self._current_alias, self._remote_path, self._sudo_var.get() ) if self._current_alias == alias: return # Clear remote panel immediately to avoid showing stale files self._remote_list.populate([]) self._remote_status.configure(text=t("switching_servers") if alias else t("connect_to_browse")) self._set_remote_buttons_state("disabled") self._disconnect_sftp() self._current_alias = alias if alias: # Restore state from session pool if available if self.session_pool: stored_path, stored_sudo = self.session_pool.get_sftp_state(alias) if stored_path != "/": self._remote_path = stored_path # The stored sudo mode will be applied when the connection is established self._connect_sftp() else: self._remote_list.populate([]) self._remote_status.configure(text=t("connect_to_browse")) self._set_remote_buttons_state("disabled") # ── SFTP connection ── def _connect_sftp(self): server = self.store.get_server(self._current_alias) if not server: self._remote_status.configure(text=t("sftp_server_not_found")) self._set_remote_buttons_state("disabled") return # Capture current alias to prevent race condition when switching servers quickly current_alias_at_call = self._current_alias self._remote_status.configure(text=t("connecting_sftp")) self._set_remote_buttons_state("disabled") def _do(): # Only proceed if we're still on the same server if self._current_alias != current_alias_at_call: return try: # Use session pool if available if self.session_pool: sftp, is_new = self.session_pool.get_or_create_sftp_session( current_alias_at_call, # Use the original alias to avoid race conditions server, self.store.get_ssh_key_path() ) # Get stored state stored_path, stored_sudo = self.session_pool.get_sftp_state(current_alias_at_call) # Set sudo mode before connecting if it was stored sftp.sudo_mode = stored_sudo if is_new: sftp.connect() # Normalize path after potential reconnection home = sftp.normalize(".") if stored_path != "/": # Validate the stored path still exists, fall back to home if not try: sftp.listdir_attr(stored_path) home = stored_path # Use stored path as home if accessible except: pass # Fall back to normalized home path # Final check before calling callback if self._current_alias == current_alias_at_call: self.after(0, lambda a=current_alias_at_call: self._on_sftp_connected(sftp, home, a)) else: # Legacy behavior without session pool sftp = SFTPSession(server, self.store.get_ssh_key_path()) sftp.connect() home = sftp.normalize(".") # Final check before calling callback if self._current_alias == current_alias_at_call: self.after(0, lambda a=current_alias_at_call: self._on_sftp_connected(sftp, home, a)) except Exception as e: # Only show error if still on the same server if self._current_alias == current_alias_at_call: self.after(0, lambda: self._on_sftp_error(str(e))) threading.Thread(target=_do, daemon=True).start() def _on_sftp_connected(self, sftp: SFTPSession, home: str, expected_alias: str): # Only update UI if we're still on the same server that requested this connection if self._current_alias != expected_alias: # This connection result is from a server we've already switched away from # If we're not using session pooling, disconnect this session if not self.session_pool: try: sftp.disconnect() except: pass return self._sftp = sftp # Update sudo var to match the restored session state self._sudo_var.set(self._sftp.sudo_mode) # Update remote path to match stored path if available self._remote_path = home self._remote_history.clear() self._remote_status.configure( text=t("connected_sftp").format(alias=self._current_alias) ) self._set_remote_buttons_state("normal") self._refresh_remote() def _on_sftp_error(self, error: str): self._remote_status.configure(text=t("sftp_error").format(e=error)) self._log_msg(t("sftp_error").format(e=error)) def _disconnect_sftp(self): # Clear the remote display immediately self._remote_list.populate([]) self._remote_status.configure(text=t("disconnected")) self._set_remote_buttons_state("disabled") # Only disconnect if not using session pool (otherwise session stays alive) if self._sftp and not self.session_pool: try: self._sftp.disconnect() except Exception: pass self._sftp = None # If using session pool, just clear our reference to prevent interaction with old session elif self._sftp and self.session_pool: self._sftp = None # ── Browse / Drive ── def _browse_local(self): path = filedialog.askdirectory(initialdir=self._local_path) if path: self._local_history.append(self._local_path) self._local_path = os.path.normpath(path) self._refresh_local() def _on_drive_change(self, drive: str): self._local_history.append(self._local_path) self._local_path = drive self._refresh_local() def _on_sudo_toggle(self): if self._sftp: self._sftp.sudo_mode = self._sudo_var.get() self._refresh_remote() # ── Local navigation ── def _refresh_local(self): self._local_path_entry.delete(0, "end") self._local_path_entry.insert(0, self._local_path) # Sync drive selector if self._drive_menu and platform.system() == "Windows": current_drive = os.path.splitdrive(self._local_path)[0] + "\\" self._drive_var.set(current_drive) try: entries = os.listdir(self._local_path) except PermissionError: self._log_msg(t("permission_denied").format(path=self._local_path)) entries = [] items = [] # Parent dir parent = os.path.dirname(self._local_path) if parent != self._local_path: items.append({"name": "..", "size": "", "date": "", "is_dir": True}) for name in entries: full = os.path.join(self._local_path, name) try: st = os.stat(full) is_dir = os.path.isdir(full) items.append({ "name": name, "size": "" if is_dir else _format_size(st.st_size), "date": _format_date(st.st_mtime), "is_dir": is_dir, }) except (PermissionError, OSError): items.append({ "name": name, "size": "?", "date": "?", "is_dir": False, }) self._local_list.populate(items) count = len([i for i in items if i["name"] != ".."]) self._local_status.configure(text=t("items_count").format(count=count)) def _navigate_local(self, name: str): if name == "..": self._local_go_up() return target = os.path.join(self._local_path, name) if os.path.isdir(target): self._local_history.append(self._local_path) self._local_path = os.path.normpath(target) self._refresh_local() def _local_go_up(self): parent = os.path.dirname(self._local_path) if parent != self._local_path: self._local_history.append(self._local_path) self._local_path = parent self._refresh_local() def _local_go_back(self): if self._local_history: self._local_path = self._local_history.pop() self._refresh_local() def _local_go_to_path(self): path = self._local_path_entry.get().strip() if path and os.path.isdir(path): self._local_history.append(self._local_path) self._local_path = os.path.normpath(path) self._refresh_local() # ── Remote navigation ── def _refresh_remote(self): if not self._sftp: return self._remote_path_entry.delete(0, "end") self._remote_path_entry.insert(0, self._remote_path) # Capture current server alias to prevent race condition when switching servers current_alias_at_call = self._current_alias def _list_remote(): """Fetch remote listing, returns items list.""" if self._sftp.sudo_mode: try: attrs = self._sftp.listdir_attr_sudo(self._remote_path) except Exception: attrs = self._sftp.listdir_attr(self._remote_path) else: attrs = self._sftp.listdir_attr(self._remote_path) items = [] if self._remote_path != "/": items.append({ "name": "..", "size": "", "date": "", "perm": "", "is_dir": True, }) for a in attrs: is_dir = stat.S_ISDIR(a.st_mode) if a.st_mode else False items.append({ "name": a.filename, "size": "" if is_dir else _format_size(a.st_size or 0), "date": _format_date(a.st_mtime or 0), "perm": _format_perm(a.st_mode & 0o777) if a.st_mode else "", "is_dir": is_dir, }) return items def _do(): # Check if we're still on the same server after async operations if self._current_alias != current_alias_at_call: return # Cancel if server has changed # Ensure connection is alive, reconnect if needed if not self._sftp.connected: try: self._sftp.reconnect() except Exception as e: self.after(0, lambda: self._on_sftp_error(str(e))) return if not self._sftp.connected: self.after(0, lambda: self._on_sftp_error("Reconnect failed")) return # Final check if server changed during connection attempt if self._current_alias != current_alias_at_call: return # Cancel if server has changed try: items = _list_remote() # Final check before updating UI if self._current_alias == current_alias_at_call: self.after(0, lambda: self._populate_remote(items)) except PermissionError as e: if self._current_alias == current_alias_at_call: hint = f"\n{t('try_sudo_hint')}" if not self._sftp.sudo_mode else "" self.after(0, lambda: self._on_sftp_error(str(e) + hint)) except Exception: # Final check before attempting reconnect if self._current_alias != current_alias_at_call: return # Cancel if server has changed # Operation failed — one reconnect attempt try: self._sftp.reconnect() # Final check after reconnection attempt if self._current_alias != current_alias_at_call: return # Cancel if server has changed items = _list_remote() self.after(0, lambda: self._populate_remote(items)) except Exception as e2: if self._current_alias == current_alias_at_call: self.after(0, lambda: self._on_sftp_error(str(e2))) threading.Thread(target=_do, daemon=True).start() def _populate_remote(self, items: list[dict]): self._remote_list.populate(items) count = len([i for i in items if i["name"] != ".."]) self._remote_status.configure(text=t("items_count").format(count=count)) def _navigate_remote(self, name: str): if not self._sftp: return if name == "..": self._remote_go_up() return if self._remote_path == "/": target = "/" + name else: target = self._remote_path + "/" + name self._remote_history.append(self._remote_path) self._remote_path = target self._refresh_remote() def _remote_go_up(self): if self._remote_path == "/": return if not self._sftp: return parent = "/".join(self._remote_path.rstrip("/").split("/")[:-1]) or "/" self._remote_history.append(self._remote_path) self._remote_path = parent self._refresh_remote() def _remote_go_back(self): if not self._sftp: return if self._remote_history: self._remote_path = self._remote_history.pop() self._refresh_remote() def _remote_go_to_path(self): if not self._sftp: return path = self._remote_path_entry.get().strip() if path: self._remote_history.append(self._remote_path) self._remote_path = path self._refresh_remote() # ── Remote path helper ── def _remote_join(self, name: str) -> str: if self._remote_path == "/": return "/" + name return self._remote_path + "/" + name # ── Upload / Download (shared logic) ── def _do_upload(self, items: list[dict]): """Upload files and folders from local to remote. Runs in background thread.""" if not self._sftp or self._transferring: return items = [s for s in items if s["name"] != ".."] if not items: return self._transferring = True self._upload_btn.configure(state="disabled") self._download_btn.configure(state="disabled") def _do(): if not self._sftp.connected: try: self._sftp.reconnect() except Exception as e: self.after(0, lambda: self._on_sftp_error(str(e))) self.after(0, self._on_transfer_done) return for item in items: name = item["name"] local = os.path.join(self._local_path, name) remote = self._remote_join(name) try: if item.get("is_dir"): self.after(0, lambda n=name: self._transfer_label.configure( text=t("uploading_dir").format(name=n) )) def _file_cb(cur, total, fname, n=name): self.after(0, lambda c=cur, tt=total, fn=fname: self._transfer_label.configure( text=t("transfer_file_progress").format(cur=c, total=tt, name=fn) )) def _progress(transferred, total): if total > 0: self.after(0, lambda f=transferred/total: self._progress.set(f)) self._sftp.upload_dir(local, remote, progress_cb=_progress, file_cb=_file_cb) else: file_size = os.path.getsize(local) self.after(0, lambda n=name: self._transfer_label.configure( text=t("uploading").format(name=n) )) def _progress(transferred, total, n=name): if total > 0: frac = transferred / total size_str = f"{_format_size(transferred)}/{_format_size(total)}" self.after(0, lambda f=frac, s=size_str, nn=n: self._update_progress(f, t("uploading").format(name=nn) + f" ({s})")) if self._sftp.sudo_mode: try: self._sftp.upload_sudo(local, remote, progress_cb=_progress) except Exception: self._sftp.upload(local, remote, progress_cb=_progress) else: self._sftp.upload(local, remote, progress_cb=_progress) self.after(0, lambda n=name: self._log_msg( t("transfer_done").format(name=n) )) except PermissionError as e: hint = f" - {t('try_sudo_hint')}" if not self._sftp.sudo_mode else "" self.after(0, lambda e=e, h=hint: self._log_msg( t("transfer_failed").format(e=str(e)) + h )) except Exception as e: self.after(0, lambda e=e: self._log_msg( t("transfer_failed").format(e=str(e)) )) self.after(0, self._on_transfer_done) threading.Thread(target=_do, daemon=True).start() def _do_download(self, items: list[dict]): """Download files and folders from remote to local. Runs in background thread.""" if not self._sftp or self._transferring: return items = [s for s in items if s["name"] != ".."] if not items: return self._transferring = True self._upload_btn.configure(state="disabled") self._download_btn.configure(state="disabled") def _do(): if not self._sftp.connected: try: self._sftp.reconnect() except Exception as e: self.after(0, lambda: self._on_sftp_error(str(e))) self.after(0, self._on_transfer_done) return for item in items: name = item["name"] remote = self._remote_join(name) local = os.path.join(self._local_path, name) try: if item.get("is_dir"): self.after(0, lambda n=name: self._transfer_label.configure( text=t("downloading_dir").format(name=n) )) os.makedirs(local, exist_ok=True) def _file_cb(cur, total, fname, n=name): self.after(0, lambda c=cur, tt=total, fn=fname: self._transfer_label.configure( text=t("transfer_file_progress").format(cur=c, total=tt, name=fn) )) def _progress(transferred, total): if total > 0: self.after(0, lambda f=transferred/total: self._progress.set(f)) self._sftp.download_dir(remote, local, progress_cb=_progress, file_cb=_file_cb) else: self.after(0, lambda n=name: self._transfer_label.configure( text=t("downloading").format(name=n) )) def _progress(transferred, total, n=name): if total > 0: frac = transferred / total size_str = f"{_format_size(transferred)}/{_format_size(total)}" self.after(0, lambda f=frac, s=size_str, nn=n: self._update_progress(f, t("downloading").format(name=nn) + f" ({s})")) if self._sftp.sudo_mode: try: self._sftp.download_sudo(remote, local, progress_cb=_progress) except Exception: self._sftp.download(remote, local, progress_cb=_progress) else: self._sftp.download(remote, local, progress_cb=_progress) self.after(0, lambda n=name: self._log_msg( t("transfer_done").format(name=n) )) except PermissionError as e: hint = f" - {t('try_sudo_hint')}" if not self._sftp.sudo_mode else "" self.after(0, lambda e=e, h=hint: self._log_msg( t("transfer_failed").format(e=str(e)) + h )) except Exception as e: self.after(0, lambda e=e: self._log_msg( t("transfer_failed").format(e=str(e)) )) self.after(0, self._on_transfer_done) threading.Thread(target=_do, daemon=True).start() # ── Button handlers ── def _upload_selected(self): if not self._sftp or not self._sftp.connected or self._transferring: return selected = self._local_list.get_selected() items = [s for s in selected if s["name"] != ".."] if not items: self._log_msg(t("no_server_selected")) return self._do_upload(items) def _download_selected(self): if not self._sftp or not self._sftp.connected or self._transferring: return selected = self._remote_list.get_selected() items = [s for s in selected if s["name"] != ".."] if not items: return self._do_download(items) # ── Drag-and-drop callbacks ── def _on_drop_to_remote(self, items: list[dict], source): """Called when items are dropped onto remote panel (upload).""" self._do_upload(items) def _on_drop_to_local(self, items: list[dict], source): """Called when items are dropped onto local panel (download).""" self._do_download(items) def _on_transfer_done(self): self._transferring = False self._progress.set(0) self._transfer_label.configure(text="") self._upload_btn.configure(state="normal") self._download_btn.configure(state="normal") self._refresh_local() self._refresh_remote() def _update_progress(self, fraction: float, text: str): self._progress.set(fraction) self._transfer_label.configure(text=text) def _mkdir_remote(self): if not self._sftp or not self._sftp.connected: return dialog = ctk.CTkInputDialog( text=t("new_folder_name"), title=t("new_folder"), ) name = dialog.get_input() if not name or not name.strip(): return name = name.strip() path = self._remote_join(name) def _do(): try: self._sftp.mkdir(path) self.after(0, self._refresh_remote) except Exception as e: self.after(0, lambda: self._log_msg(t("sftp_error").format(e=str(e)))) threading.Thread(target=_do, daemon=True).start() def _delete_remote(self): if not self._sftp or not self._sftp.connected: return selected = self._remote_list.get_selected() items = [s for s in selected if s["name"] != ".."] if not items: return # Check for directories — use recursive delete confirm has_dirs = any(s.get("is_dir") for s in items) if has_dirs and len(items) == 1 and items[0].get("is_dir"): if not messagebox.askyesno( t("delete_files"), t("recursive_delete_confirm").format(name=items[0]["name"]), ): return else: if not messagebox.askyesno( t("delete_files"), t("delete_files_confirm").format(count=len(items)), ): return def _do(): for item in items: name = item["name"] path = self._remote_join(name) try: if item.get("is_dir"): self._sftp.rmdir_recursive(path) else: self._sftp.remove(path) except Exception as e: self.after(0, lambda: self._log_msg( t("sftp_error").format(e=str(e)) )) self.after(0, self._refresh_remote) threading.Thread(target=_do, daemon=True).start() def _rename_remote(self): if not self._sftp or not self._sftp.connected: return selected = self._remote_list.get_selected() if not selected or selected[0]["name"] == "..": return old_name = selected[0]["name"] dialog = ctk.CTkInputDialog( text=t("rename_prompt"), title=t("rename_file"), ) new_name = dialog.get_input() if not new_name or not new_name.strip() or new_name.strip() == old_name: return new_name = new_name.strip() old_path = self._remote_join(old_name) new_path = self._remote_join(new_name) def _do(): try: self._sftp.rename(old_path, new_path) self.after(0, self._refresh_remote) except Exception as e: self.after(0, lambda: self._log_msg(t("sftp_error").format(e=str(e)))) threading.Thread(target=_do, daemon=True).start() # ── Helpers ── def _set_remote_buttons_state(self, state: str): for btn in ( self._upload_btn, self._download_btn, self._mkdir_btn, self._delete_btn, self._rename_btn, self._remote_refresh_btn, self._remote_back_btn, self._remote_up_btn, ): btn.configure(state=state) def _log_msg(self, text: str): self._log.configure(state="normal") self._log.insert("end", text + "\n") self._log.configure(state="disabled") self._log.see("end")