Files
server-manager/gui/tabs/files_tab.py
chrome-storm-c442 8f55b210b3 v1.8.3: session pool + sidebar indicators
- SessionPool: LRU cache for SSH/SFTP sessions across server switches
- Sidebar: green dot indicators for servers with active sessions
- Sidebar: active sessions count label
- Terminal: buffer preservation on server switch via get_current_buffer()
- FilesTab/TerminalTab: pool integration for session reuse

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 03:05:05 -05:00

893 lines
33 KiB
Python

"""
Files tab — dual-pane SFTP file manager.
"""
import os
import platform
import stat
import string
import threading
from datetime import datetime
from tkinter import messagebox, filedialog
import customtkinter as ctk
from core.i18n import t
from core.ssh_client import SFTPSession
from gui.widgets.file_list import FileListWidget
def _format_size(size_bytes: int) -> str:
if size_bytes < 0:
return ""
for unit in ("B", "KB", "MB", "GB"):
if size_bytes < 1024:
return f"{size_bytes:.1f} {unit}" if unit != "B" else f"{size_bytes} B"
size_bytes /= 1024
return f"{size_bytes:.1f} TB"
def _format_date(mtime: float) -> str:
try:
dt = datetime.fromtimestamp(mtime)
return dt.strftime("%b %d %H:%M")
except Exception:
return ""
def _format_perm(mode: int) -> str:
parts = []
for who in (6, 3, 0):
m = (mode >> who) & 7
parts.append(
("r" if m & 4 else "-")
+ ("w" if m & 2 else "-")
+ ("x" if m & 1 else "-")
)
return "".join(parts)
def _get_windows_drives() -> list[str]:
"""Return list of available drive letters on Windows (e.g. ['C:\\', 'D:\\'])."""
drives = []
for letter in string.ascii_uppercase:
drive = f"{letter}:\\"
if os.path.exists(drive):
drives.append(drive)
return drives
class FilesTab(ctk.CTkFrame):
def __init__(self, master, store, session_pool=None):
super().__init__(master, fg_color="transparent")
self.store = store
self.session_pool = session_pool
self._current_alias: str | None = None
self._sftp: SFTPSession | None = None
self._local_path = os.path.expanduser("~")
self._remote_path = "/"
self._local_history: list[str] = []
self._remote_history: list[str] = []
self._transferring = False
self._sudo_var = ctk.BooleanVar(value=False)
self._build_ui()
self._refresh_local()
def _build_ui(self):
# === Panes area ===
panes = ctk.CTkFrame(self, fg_color="transparent")
panes.pack(fill="both", expand=True, padx=10, pady=(10, 5))
# Left pane — Local
left_pane = ctk.CTkFrame(panes, fg_color="transparent")
left_pane.pack(side="left", fill="both", expand=True, padx=(0, 5))
left_header = ctk.CTkFrame(left_pane, fg_color="transparent")
left_header.pack(fill="x", pady=(0, 4))
ctk.CTkLabel(left_header, text=t("local_files"),
font=ctk.CTkFont(size=13, weight="bold")).pack(side="left")
self._local_back_btn = ctk.CTkButton(
left_header, text="\u2190", width=30, height=28,
command=self._local_go_back,
)
self._local_back_btn.pack(side="left", padx=(8, 2))
self._local_up_btn = ctk.CTkButton(
left_header, text="\u2191", width=30, height=28,
command=self._local_go_up,
)
self._local_up_btn.pack(side="left", padx=2)
# Local refresh button
self._local_refresh_btn = ctk.CTkButton(
left_header, text="\u21BB", width=30, height=28,
command=self._refresh_local,
)
self._local_refresh_btn.pack(side="left", padx=2)
# Browse button
self._browse_btn = ctk.CTkButton(
left_header, text=t("browse"), width=60, height=28,
command=self._browse_local,
)
self._browse_btn.pack(side="left", padx=2)
# Windows drive selector
self._drive_menu = None
if platform.system() == "Windows":
drives = _get_windows_drives()
if drives:
current_drive = os.path.splitdrive(self._local_path)[0] + "\\"
self._drive_var = ctk.StringVar(value=current_drive)
self._drive_menu = ctk.CTkOptionMenu(
left_header, values=drives, variable=self._drive_var,
width=65, height=28, command=self._on_drive_change,
)
self._drive_menu.pack(side="left", padx=2)
self._local_path_entry = ctk.CTkEntry(left_header, height=28)
self._local_path_entry.pack(side="left", fill="x", expand=True, padx=(4, 0))
self._local_path_entry.bind("<Return>", lambda e: self._local_go_to_path())
self._local_list = FileListWidget(
left_pane,
columns=[(t("name_col"), 220), (t("size_col"), 80), (t("date_col"), 110)],
on_navigate=self._navigate_local,
on_drop=self._on_drop_to_local,
)
self._local_list.pack(fill="both", expand=True)
self._local_status = ctk.CTkLabel(
left_pane, text="", font=ctk.CTkFont(size=11),
text_color="#9ca3af", anchor="w",
)
self._local_status.pack(fill="x", pady=(2, 0))
# Right pane — Remote
right_pane = ctk.CTkFrame(panes, fg_color="transparent")
right_pane.pack(side="right", fill="both", expand=True, padx=(5, 0))
right_header = ctk.CTkFrame(right_pane, fg_color="transparent")
right_header.pack(fill="x", pady=(0, 4))
ctk.CTkLabel(right_header, text=t("remote_files"),
font=ctk.CTkFont(size=13, weight="bold")).pack(side="left")
self._remote_back_btn = ctk.CTkButton(
right_header, text="\u2190", width=30, height=28,
command=self._remote_go_back,
)
self._remote_back_btn.pack(side="left", padx=(8, 2))
self._remote_up_btn = ctk.CTkButton(
right_header, text="\u2191", width=30, height=28,
command=self._remote_go_up,
)
self._remote_up_btn.pack(side="left", padx=2)
self._remote_refresh_btn = ctk.CTkButton(
right_header, text="\u21BB", width=30, height=28,
command=self._refresh_remote,
)
self._remote_refresh_btn.pack(side="left", padx=2)
self._remote_path_entry = ctk.CTkEntry(right_header, height=28)
self._remote_path_entry.pack(side="left", fill="x", expand=True, padx=(4, 0))
self._remote_path_entry.bind("<Return>", lambda e: self._remote_go_to_path())
self._remote_list = FileListWidget(
right_pane,
columns=[
(t("name_col"), 200), (t("size_col"), 70),
(t("date_col"), 100), (t("perm_col"), 80),
],
on_navigate=self._navigate_remote,
on_drop=self._on_drop_to_remote,
)
self._remote_list.pack(fill="both", expand=True)
self._remote_status = ctk.CTkLabel(
right_pane, text=t("connect_to_browse"),
font=ctk.CTkFont(size=11), text_color="#9ca3af", anchor="w",
)
self._remote_status.pack(fill="x", pady=(2, 0))
# Wire drag-and-drop partners
self._local_list.set_drag_partner(self._remote_list)
self._remote_list.set_drag_partner(self._local_list)
# === Toolbar ===
toolbar = ctk.CTkFrame(self, fg_color="transparent")
toolbar.pack(fill="x", padx=10, pady=4)
self._upload_btn = ctk.CTkButton(
toolbar, text=f"{t('upload')} \u2192", width=110, height=30,
command=self._upload_selected,
)
self._upload_btn.pack(side="left", padx=(0, 4))
self._download_btn = ctk.CTkButton(
toolbar, text=f"\u2190 {t('download')}", width=110, height=30,
command=self._download_selected,
)
self._download_btn.pack(side="left", padx=4)
sep = ctk.CTkFrame(toolbar, width=2, height=24, fg_color="gray40")
sep.pack(side="left", padx=8)
self._mkdir_btn = ctk.CTkButton(
toolbar, text=t("new_folder"), width=100, height=30,
command=self._mkdir_remote,
)
self._mkdir_btn.pack(side="left", padx=4)
self._delete_btn = ctk.CTkButton(
toolbar, text=t("delete_files"), width=80, height=30,
fg_color="#dc2626", hover_color="#b91c1c",
command=self._delete_remote,
)
self._delete_btn.pack(side="left", padx=4)
self._rename_btn = ctk.CTkButton(
toolbar, text=t("rename_file"), width=100, height=30,
command=self._rename_remote,
)
self._rename_btn.pack(side="left", padx=4)
# Sudo toggle
sep2 = ctk.CTkFrame(toolbar, width=2, height=24, fg_color="gray40")
sep2.pack(side="left", padx=8)
self._sudo_switch = ctk.CTkSwitch(
toolbar, text=t("sudo_mode"), variable=self._sudo_var,
width=50, height=24, command=self._on_sudo_toggle,
)
self._sudo_switch.pack(side="left", padx=4)
self._set_remote_buttons_state("disabled")
# === Transfer / Log area ===
transfer_frame = ctk.CTkFrame(self, fg_color="transparent")
transfer_frame.pack(fill="x", padx=10, pady=(2, 2))
self._progress = ctk.CTkProgressBar(transfer_frame, height=14)
self._progress.pack(fill="x")
self._progress.set(0)
self._transfer_label = ctk.CTkLabel(
transfer_frame, text="", font=ctk.CTkFont(size=11),
text_color="#9ca3af", anchor="w",
)
self._transfer_label.pack(fill="x")
self._log = ctk.CTkTextbox(
self, height=80, font=ctk.CTkFont(family="Consolas", size=11),
state="disabled",
)
self._log.pack(fill="x", padx=10, pady=(0, 10))
# ── Server selection ──
def set_server(self, alias: str | None):
# Store state of current session before switching
if self._current_alias and self._sftp and self.session_pool:
self.session_pool.store_sftp_state(
self._current_alias,
self._remote_path,
self._sudo_var.get()
)
if self._current_alias == alias:
return
self._disconnect_sftp()
self._current_alias = alias
if alias:
# Restore state from session pool if available
if self.session_pool:
stored_path, stored_sudo = self.session_pool.get_sftp_state(alias)
if stored_path != "/":
self._remote_path = stored_path
# The stored sudo mode will be applied when the connection is established
self._connect_sftp()
else:
self._remote_list.populate([])
self._remote_status.configure(text=t("connect_to_browse"))
self._set_remote_buttons_state("disabled")
# ── SFTP connection ──
def _connect_sftp(self):
server = self.store.get_server(self._current_alias)
if not server:
return
self._remote_status.configure(text=t("connecting_sftp"))
self._set_remote_buttons_state("disabled")
def _do():
try:
# Use session pool if available
if self.session_pool:
sftp, is_new = self.session_pool.get_or_create_sftp_session(
self._current_alias,
server,
self.store.get_ssh_key_path()
)
# Get stored state
stored_path, stored_sudo = self.session_pool.get_sftp_state(self._current_alias)
# Set sudo mode before connecting if it was stored
sftp.sudo_mode = stored_sudo
if is_new:
sftp.connect()
# Normalize path after potential reconnection
home = sftp.normalize(".")
if stored_path != "/":
# Validate the stored path still exists, fall back to home if not
try:
sftp.listdir_attr(stored_path)
home = stored_path # Use stored path as home if accessible
except:
pass # Fall back to normalized home path
self.after(0, lambda: self._on_sftp_connected(sftp, home))
else:
# Legacy behavior without session pool
sftp = SFTPSession(server, self.store.get_ssh_key_path())
sftp.connect()
home = sftp.normalize(".")
self.after(0, lambda: self._on_sftp_connected(sftp, home))
except Exception as e:
self.after(0, lambda: self._on_sftp_error(str(e)))
threading.Thread(target=_do, daemon=True).start()
def _on_sftp_connected(self, sftp: SFTPSession, home: str):
self._sftp = sftp
# Update sudo var to match the restored session state
self._sudo_var.set(self._sftp.sudo_mode)
# Update remote path to match stored path if available
self._remote_path = home
self._remote_history.clear()
self._remote_status.configure(
text=t("connected_sftp").format(alias=self._current_alias)
)
self._set_remote_buttons_state("normal")
self._refresh_remote()
def _on_sftp_error(self, error: str):
self._remote_status.configure(text=t("sftp_error").format(e=error))
self._log_msg(t("sftp_error").format(e=error))
def _disconnect_sftp(self):
# Only disconnect if not using session pool (otherwise session stays alive)
if self._sftp and not self.session_pool:
try:
self._sftp.disconnect()
except Exception:
pass
self._sftp = None
# If using session pool, remove callbacks to prevent processing data after switch
elif self._sftp and self.session_pool:
# Don't actually disconnect, just remove the reference to avoid further interaction
self._sftp = None
# ── Browse / Drive ──
def _browse_local(self):
path = filedialog.askdirectory(initialdir=self._local_path)
if path:
self._local_history.append(self._local_path)
self._local_path = os.path.normpath(path)
self._refresh_local()
def _on_drive_change(self, drive: str):
self._local_history.append(self._local_path)
self._local_path = drive
self._refresh_local()
def _on_sudo_toggle(self):
if self._sftp:
self._sftp.sudo_mode = self._sudo_var.get()
self._refresh_remote()
# ── Local navigation ──
def _refresh_local(self):
self._local_path_entry.delete(0, "end")
self._local_path_entry.insert(0, self._local_path)
# Sync drive selector
if self._drive_menu and platform.system() == "Windows":
current_drive = os.path.splitdrive(self._local_path)[0] + "\\"
self._drive_var.set(current_drive)
try:
entries = os.listdir(self._local_path)
except PermissionError:
self._log_msg(t("permission_denied").format(path=self._local_path))
entries = []
items = []
# Parent dir
parent = os.path.dirname(self._local_path)
if parent != self._local_path:
items.append({"name": "..", "size": "", "date": "", "is_dir": True})
for name in entries:
full = os.path.join(self._local_path, name)
try:
st = os.stat(full)
is_dir = os.path.isdir(full)
items.append({
"name": name,
"size": "" if is_dir else _format_size(st.st_size),
"date": _format_date(st.st_mtime),
"is_dir": is_dir,
})
except (PermissionError, OSError):
items.append({
"name": name, "size": "?", "date": "?", "is_dir": False,
})
self._local_list.populate(items)
count = len([i for i in items if i["name"] != ".."])
self._local_status.configure(text=t("items_count").format(count=count))
def _navigate_local(self, name: str):
if name == "..":
self._local_go_up()
return
target = os.path.join(self._local_path, name)
if os.path.isdir(target):
self._local_history.append(self._local_path)
self._local_path = os.path.normpath(target)
self._refresh_local()
def _local_go_up(self):
parent = os.path.dirname(self._local_path)
if parent != self._local_path:
self._local_history.append(self._local_path)
self._local_path = parent
self._refresh_local()
def _local_go_back(self):
if self._local_history:
self._local_path = self._local_history.pop()
self._refresh_local()
def _local_go_to_path(self):
path = self._local_path_entry.get().strip()
if path and os.path.isdir(path):
self._local_history.append(self._local_path)
self._local_path = os.path.normpath(path)
self._refresh_local()
# ── Remote navigation ──
def _refresh_remote(self):
if not self._sftp:
return
self._remote_path_entry.delete(0, "end")
self._remote_path_entry.insert(0, self._remote_path)
def _list_remote():
"""Fetch remote listing, returns items list."""
if self._sftp.sudo_mode:
try:
attrs = self._sftp.listdir_attr_sudo(self._remote_path)
except Exception:
attrs = self._sftp.listdir_attr(self._remote_path)
else:
attrs = self._sftp.listdir_attr(self._remote_path)
items = []
if self._remote_path != "/":
items.append({
"name": "..", "size": "", "date": "", "perm": "", "is_dir": True,
})
for a in attrs:
is_dir = stat.S_ISDIR(a.st_mode) if a.st_mode else False
items.append({
"name": a.filename,
"size": "" if is_dir else _format_size(a.st_size or 0),
"date": _format_date(a.st_mtime or 0),
"perm": _format_perm(a.st_mode & 0o777) if a.st_mode else "",
"is_dir": is_dir,
})
return items
def _do():
# Ensure connection is alive, reconnect if needed
if not self._sftp.connected:
try:
self._sftp.reconnect()
except Exception as e:
self.after(0, lambda: self._on_sftp_error(str(e)))
return
if not self._sftp.connected:
self.after(0, lambda: self._on_sftp_error("Reconnect failed"))
return
try:
items = _list_remote()
self.after(0, lambda: self._populate_remote(items))
except PermissionError as e:
hint = f"\n{t('try_sudo_hint')}" if not self._sftp.sudo_mode else ""
self.after(0, lambda: self._on_sftp_error(str(e) + hint))
except Exception:
# Operation failed — one reconnect attempt
try:
self._sftp.reconnect()
items = _list_remote()
self.after(0, lambda: self._populate_remote(items))
except Exception as e2:
self.after(0, lambda: self._on_sftp_error(str(e2)))
threading.Thread(target=_do, daemon=True).start()
def _populate_remote(self, items: list[dict]):
self._remote_list.populate(items)
count = len([i for i in items if i["name"] != ".."])
self._remote_status.configure(text=t("items_count").format(count=count))
def _navigate_remote(self, name: str):
if not self._sftp:
return
if name == "..":
self._remote_go_up()
return
if self._remote_path == "/":
target = "/" + name
else:
target = self._remote_path + "/" + name
self._remote_history.append(self._remote_path)
self._remote_path = target
self._refresh_remote()
def _remote_go_up(self):
if self._remote_path == "/":
return
if not self._sftp:
return
parent = "/".join(self._remote_path.rstrip("/").split("/")[:-1]) or "/"
self._remote_history.append(self._remote_path)
self._remote_path = parent
self._refresh_remote()
def _remote_go_back(self):
if not self._sftp:
return
if self._remote_history:
self._remote_path = self._remote_history.pop()
self._refresh_remote()
def _remote_go_to_path(self):
if not self._sftp:
return
path = self._remote_path_entry.get().strip()
if path:
self._remote_history.append(self._remote_path)
self._remote_path = path
self._refresh_remote()
# ── Remote path helper ──
def _remote_join(self, name: str) -> str:
if self._remote_path == "/":
return "/" + name
return self._remote_path + "/" + name
# ── Upload / Download (shared logic) ──
def _do_upload(self, items: list[dict]):
"""Upload files and folders from local to remote. Runs in background thread."""
if not self._sftp or self._transferring:
return
items = [s for s in items if s["name"] != ".."]
if not items:
return
self._transferring = True
self._upload_btn.configure(state="disabled")
self._download_btn.configure(state="disabled")
def _do():
if not self._sftp.connected:
try:
self._sftp.reconnect()
except Exception as e:
self.after(0, lambda: self._on_sftp_error(str(e)))
self.after(0, self._on_transfer_done)
return
for item in items:
name = item["name"]
local = os.path.join(self._local_path, name)
remote = self._remote_join(name)
try:
if item.get("is_dir"):
self.after(0, lambda n=name: self._transfer_label.configure(
text=t("uploading_dir").format(name=n)
))
def _file_cb(cur, total, fname, n=name):
self.after(0, lambda c=cur, tt=total, fn=fname:
self._transfer_label.configure(
text=t("transfer_file_progress").format(cur=c, total=tt, name=fn)
))
def _progress(transferred, total):
if total > 0:
self.after(0, lambda f=transferred/total: self._progress.set(f))
self._sftp.upload_dir(local, remote, progress_cb=_progress, file_cb=_file_cb)
else:
file_size = os.path.getsize(local)
self.after(0, lambda n=name: self._transfer_label.configure(
text=t("uploading").format(name=n)
))
def _progress(transferred, total, n=name):
if total > 0:
frac = transferred / total
size_str = f"{_format_size(transferred)}/{_format_size(total)}"
self.after(0, lambda f=frac, s=size_str, nn=n:
self._update_progress(f, t("uploading").format(name=nn) + f" ({s})"))
if self._sftp.sudo_mode:
try:
self._sftp.upload_sudo(local, remote, progress_cb=_progress)
except Exception:
self._sftp.upload(local, remote, progress_cb=_progress)
else:
self._sftp.upload(local, remote, progress_cb=_progress)
self.after(0, lambda n=name: self._log_msg(
t("transfer_done").format(name=n)
))
except PermissionError as e:
hint = f" - {t('try_sudo_hint')}" if not self._sftp.sudo_mode else ""
self.after(0, lambda e=e, h=hint: self._log_msg(
t("transfer_failed").format(e=str(e)) + h
))
except Exception as e:
self.after(0, lambda e=e: self._log_msg(
t("transfer_failed").format(e=str(e))
))
self.after(0, self._on_transfer_done)
threading.Thread(target=_do, daemon=True).start()
def _do_download(self, items: list[dict]):
"""Download files and folders from remote to local. Runs in background thread."""
if not self._sftp or self._transferring:
return
items = [s for s in items if s["name"] != ".."]
if not items:
return
self._transferring = True
self._upload_btn.configure(state="disabled")
self._download_btn.configure(state="disabled")
def _do():
if not self._sftp.connected:
try:
self._sftp.reconnect()
except Exception as e:
self.after(0, lambda: self._on_sftp_error(str(e)))
self.after(0, self._on_transfer_done)
return
for item in items:
name = item["name"]
remote = self._remote_join(name)
local = os.path.join(self._local_path, name)
try:
if item.get("is_dir"):
self.after(0, lambda n=name: self._transfer_label.configure(
text=t("downloading_dir").format(name=n)
))
os.makedirs(local, exist_ok=True)
def _file_cb(cur, total, fname, n=name):
self.after(0, lambda c=cur, tt=total, fn=fname:
self._transfer_label.configure(
text=t("transfer_file_progress").format(cur=c, total=tt, name=fn)
))
def _progress(transferred, total):
if total > 0:
self.after(0, lambda f=transferred/total: self._progress.set(f))
self._sftp.download_dir(remote, local, progress_cb=_progress, file_cb=_file_cb)
else:
self.after(0, lambda n=name: self._transfer_label.configure(
text=t("downloading").format(name=n)
))
def _progress(transferred, total, n=name):
if total > 0:
frac = transferred / total
size_str = f"{_format_size(transferred)}/{_format_size(total)}"
self.after(0, lambda f=frac, s=size_str, nn=n:
self._update_progress(f, t("downloading").format(name=nn) + f" ({s})"))
if self._sftp.sudo_mode:
try:
self._sftp.download_sudo(remote, local, progress_cb=_progress)
except Exception:
self._sftp.download(remote, local, progress_cb=_progress)
else:
self._sftp.download(remote, local, progress_cb=_progress)
self.after(0, lambda n=name: self._log_msg(
t("transfer_done").format(name=n)
))
except PermissionError as e:
hint = f" - {t('try_sudo_hint')}" if not self._sftp.sudo_mode else ""
self.after(0, lambda e=e, h=hint: self._log_msg(
t("transfer_failed").format(e=str(e)) + h
))
except Exception as e:
self.after(0, lambda e=e: self._log_msg(
t("transfer_failed").format(e=str(e))
))
self.after(0, self._on_transfer_done)
threading.Thread(target=_do, daemon=True).start()
# ── Button handlers ──
def _upload_selected(self):
if not self._sftp or not self._sftp.connected or self._transferring:
return
selected = self._local_list.get_selected()
items = [s for s in selected if s["name"] != ".."]
if not items:
self._log_msg(t("no_server_selected"))
return
self._do_upload(items)
def _download_selected(self):
if not self._sftp or not self._sftp.connected or self._transferring:
return
selected = self._remote_list.get_selected()
items = [s for s in selected if s["name"] != ".."]
if not items:
return
self._do_download(items)
# ── Drag-and-drop callbacks ──
def _on_drop_to_remote(self, items: list[dict], source):
"""Called when items are dropped onto remote panel (upload)."""
self._do_upload(items)
def _on_drop_to_local(self, items: list[dict], source):
"""Called when items are dropped onto local panel (download)."""
self._do_download(items)
def _on_transfer_done(self):
self._transferring = False
self._progress.set(0)
self._transfer_label.configure(text="")
self._upload_btn.configure(state="normal")
self._download_btn.configure(state="normal")
self._refresh_local()
self._refresh_remote()
def _update_progress(self, fraction: float, text: str):
self._progress.set(fraction)
self._transfer_label.configure(text=text)
def _mkdir_remote(self):
if not self._sftp or not self._sftp.connected:
return
dialog = ctk.CTkInputDialog(
text=t("new_folder_name"), title=t("new_folder"),
)
name = dialog.get_input()
if not name or not name.strip():
return
name = name.strip()
path = self._remote_join(name)
def _do():
try:
self._sftp.mkdir(path)
self.after(0, self._refresh_remote)
except Exception as e:
self.after(0, lambda: self._log_msg(t("sftp_error").format(e=str(e))))
threading.Thread(target=_do, daemon=True).start()
def _delete_remote(self):
if not self._sftp or not self._sftp.connected:
return
selected = self._remote_list.get_selected()
items = [s for s in selected if s["name"] != ".."]
if not items:
return
# Check for directories — use recursive delete confirm
has_dirs = any(s.get("is_dir") for s in items)
if has_dirs and len(items) == 1 and items[0].get("is_dir"):
if not messagebox.askyesno(
t("delete_files"),
t("recursive_delete_confirm").format(name=items[0]["name"]),
):
return
else:
if not messagebox.askyesno(
t("delete_files"),
t("delete_files_confirm").format(count=len(items)),
):
return
def _do():
for item in items:
name = item["name"]
path = self._remote_join(name)
try:
if item.get("is_dir"):
self._sftp.rmdir_recursive(path)
else:
self._sftp.remove(path)
except Exception as e:
self.after(0, lambda: self._log_msg(
t("sftp_error").format(e=str(e))
))
self.after(0, self._refresh_remote)
threading.Thread(target=_do, daemon=True).start()
def _rename_remote(self):
if not self._sftp or not self._sftp.connected:
return
selected = self._remote_list.get_selected()
if not selected or selected[0]["name"] == "..":
return
old_name = selected[0]["name"]
dialog = ctk.CTkInputDialog(
text=t("rename_prompt"), title=t("rename_file"),
)
new_name = dialog.get_input()
if not new_name or not new_name.strip() or new_name.strip() == old_name:
return
new_name = new_name.strip()
old_path = self._remote_join(old_name)
new_path = self._remote_join(new_name)
def _do():
try:
self._sftp.rename(old_path, new_path)
self.after(0, self._refresh_remote)
except Exception as e:
self.after(0, lambda: self._log_msg(t("sftp_error").format(e=str(e))))
threading.Thread(target=_do, daemon=True).start()
# ── Helpers ──
def _set_remote_buttons_state(self, state: str):
for btn in (
self._upload_btn, self._download_btn,
self._mkdir_btn, self._delete_btn, self._rename_btn,
self._remote_refresh_btn, self._remote_back_btn, self._remote_up_btn,
):
btn.configure(state=state)
def _log_msg(self, text: str):
self._log.configure(state="normal")
self._log.insert("end", text + "\n")
self._log.configure(state="disabled")
self._log.see("end")