Files
server-manager/gui/tabs/files_tab.py
2026-03-06 05:09:54 -05:00

967 lines
37 KiB
Python

"""
Files tab — dual-pane SFTP file manager.
"""
import os
import platform
import stat
import string
import threading
from datetime import datetime
from tkinter import messagebox, filedialog
import customtkinter as ctk
from core.i18n import t
from core.icons import icon_text, ctk_icon, make_icon_button
from core.ssh_client import SFTPSession
from gui.widgets.file_list import FileListWidget
def _format_size(size_bytes: int) -> str:
if size_bytes < 0:
return ""
for unit in ("B", "KB", "MB", "GB"):
if size_bytes < 1024:
return f"{size_bytes:.1f} {unit}" if unit != "B" else f"{size_bytes} B"
size_bytes /= 1024
return f"{size_bytes:.1f} TB"
def _format_date(mtime: float) -> str:
try:
dt = datetime.fromtimestamp(mtime)
return dt.strftime("%b %d %H:%M")
except Exception:
return ""
def _format_perm(mode: int) -> str:
parts = []
for who in (6, 3, 0):
m = (mode >> who) & 7
parts.append(
("r" if m & 4 else "-")
+ ("w" if m & 2 else "-")
+ ("x" if m & 1 else "-")
)
return "".join(parts)
def _get_windows_drives() -> list[str]:
"""Return list of available drive letters on Windows (e.g. ['C:\\', 'D:\\'])."""
drives = []
for letter in string.ascii_uppercase:
drive = f"{letter}:\\"
if os.path.exists(drive):
drives.append(drive)
return drives
class FilesTab(ctk.CTkFrame):
def __init__(self, master, store, session_pool=None):
super().__init__(master, fg_color="transparent")
self.store = store
self.session_pool = session_pool
self._current_alias: str | None = None
self._sftp: SFTPSession | None = None
self._local_path = os.path.expanduser("~")
self._remote_path = "/"
self._local_history: list[str] = []
self._remote_history: list[str] = []
self._transferring = False
self._sudo_var = ctk.BooleanVar(value=False)
self._build_ui()
self._refresh_local()
def _build_ui(self):
# === Panes area ===
panes = ctk.CTkFrame(self, fg_color="transparent")
panes.pack(fill="both", expand=True, padx=10, pady=(10, 5))
# Left pane — Local
left_pane = ctk.CTkFrame(panes, fg_color="transparent")
left_pane.pack(side="left", fill="both", expand=True, padx=(0, 5))
left_header = ctk.CTkFrame(left_pane, fg_color="transparent")
left_header.pack(fill="x", pady=(0, 4))
ctk.CTkLabel(left_header, text=t("local_files"),
font=ctk.CTkFont(size=13, weight="bold")).pack(side="left")
_back_img = ctk_icon("back", 16)
self._local_back_btn = ctk.CTkButton(
left_header, text="" if _back_img else "\u2190",
image=_back_img, width=30, height=28,
command=self._local_go_back,
)
self._local_back_btn.pack(side="left", padx=(8, 2))
_up_img = ctk_icon("up", 16)
self._local_up_btn = ctk.CTkButton(
left_header, text="" if _up_img else "\u2191",
image=_up_img, width=30, height=28,
command=self._local_go_up,
)
self._local_up_btn.pack(side="left", padx=2)
# Local refresh button
_ref_img = ctk_icon("refresh", 16)
self._local_refresh_btn = ctk.CTkButton(
left_header, text="" if _ref_img else "\u21BB",
image=_ref_img, width=30, height=28,
command=self._refresh_local,
)
self._local_refresh_btn.pack(side="left", padx=2)
# Browse button
self._browse_btn = make_icon_button(
left_header, "folder_open", t("browse"), width=75, height=28,
command=self._browse_local,
)
self._browse_btn.pack(side="left", padx=2)
# Windows drive selector
self._drive_menu = None
if platform.system() == "Windows":
drives = _get_windows_drives()
if drives:
current_drive = os.path.splitdrive(self._local_path)[0] + "\\"
self._drive_var = ctk.StringVar(value=current_drive)
self._drive_menu = ctk.CTkOptionMenu(
left_header, values=drives, variable=self._drive_var,
width=65, height=28, command=self._on_drive_change,
)
self._drive_menu.pack(side="left", padx=2)
self._local_path_entry = ctk.CTkEntry(left_header, height=28)
self._local_path_entry.pack(side="left", fill="x", expand=True, padx=(4, 0))
self._local_path_entry.bind("<Return>", lambda e: self._local_go_to_path())
self._local_list = FileListWidget(
left_pane,
columns=[(t("name_col"), 220), (t("size_col"), 80), (t("date_col"), 110)],
on_navigate=self._navigate_local,
on_drop=self._on_drop_to_local,
)
self._local_list.pack(fill="both", expand=True)
self._local_status = ctk.CTkLabel(
left_pane, text="", font=ctk.CTkFont(size=11),
text_color="#9ca3af", anchor="w",
)
self._local_status.pack(fill="x", pady=(2, 0))
# Right pane — Remote
right_pane = ctk.CTkFrame(panes, fg_color="transparent")
right_pane.pack(side="right", fill="both", expand=True, padx=(5, 0))
right_header = ctk.CTkFrame(right_pane, fg_color="transparent")
right_header.pack(fill="x", pady=(0, 4))
ctk.CTkLabel(right_header, text=t("remote_files"),
font=ctk.CTkFont(size=13, weight="bold")).pack(side="left")
self._remote_back_btn = ctk.CTkButton(
right_header, text="" if _back_img else "\u2190",
image=_back_img, width=30, height=28,
command=self._remote_go_back,
)
self._remote_back_btn.pack(side="left", padx=(8, 2))
self._remote_up_btn = ctk.CTkButton(
right_header, text="" if _up_img else "\u2191",
image=_up_img, width=30, height=28,
command=self._remote_go_up,
)
self._remote_up_btn.pack(side="left", padx=2)
self._remote_refresh_btn = ctk.CTkButton(
right_header, text="" if _ref_img else "\u21BB",
image=_ref_img, width=30, height=28,
command=self._refresh_remote,
)
self._remote_refresh_btn.pack(side="left", padx=2)
self._remote_path_entry = ctk.CTkEntry(right_header, height=28)
self._remote_path_entry.pack(side="left", fill="x", expand=True, padx=(4, 0))
self._remote_path_entry.bind("<Return>", lambda e: self._remote_go_to_path())
self._remote_list = FileListWidget(
right_pane,
columns=[
(t("name_col"), 200), (t("size_col"), 70),
(t("date_col"), 100), (t("perm_col"), 80),
],
on_navigate=self._navigate_remote,
on_drop=self._on_drop_to_remote,
)
self._remote_list.pack(fill="both", expand=True)
self._remote_status = ctk.CTkLabel(
right_pane, text=t("connect_to_browse"),
font=ctk.CTkFont(size=11), text_color="#9ca3af", anchor="w",
)
self._remote_status.pack(fill="x", pady=(2, 0))
# Wire drag-and-drop partners
self._local_list.set_drag_partner(self._remote_list)
self._remote_list.set_drag_partner(self._local_list)
# === Toolbar ===
toolbar = ctk.CTkFrame(self, fg_color="transparent")
toolbar.pack(fill="x", padx=10, pady=4)
self._upload_btn = make_icon_button(
toolbar, "upload", t("upload"), width=110, height=30,
command=self._upload_selected,
)
self._upload_btn.pack(side="left", padx=(0, 4))
self._download_btn = make_icon_button(
toolbar, "download", t("download"), width=110, height=30,
command=self._download_selected,
)
self._download_btn.pack(side="left", padx=4)
sep = ctk.CTkFrame(toolbar, width=2, height=24, fg_color="gray40")
sep.pack(side="left", padx=8)
self._mkdir_btn = make_icon_button(
toolbar, "folder", t("new_folder"), width=110, height=30,
command=self._mkdir_remote,
)
self._mkdir_btn.pack(side="left", padx=4)
self._delete_btn = make_icon_button(
toolbar, "delete", t("delete_files"), width=90, height=30,
fg_color="#dc2626", hover_color="#b91c1c",
command=self._delete_remote,
)
self._delete_btn.pack(side="left", padx=4)
self._rename_btn = make_icon_button(
toolbar, "edit", t("rename_file"), width=110, height=30,
command=self._rename_remote,
)
self._rename_btn.pack(side="left", padx=4)
# Sudo toggle
sep2 = ctk.CTkFrame(toolbar, width=2, height=24, fg_color="gray40")
sep2.pack(side="left", padx=8)
self._sudo_switch = ctk.CTkSwitch(
toolbar, text=t("sudo_mode"), variable=self._sudo_var,
width=50, height=24, command=self._on_sudo_toggle,
)
self._sudo_switch.pack(side="left", padx=4)
self._set_remote_buttons_state("disabled")
# === Transfer / Log area ===
transfer_frame = ctk.CTkFrame(self, fg_color="transparent")
transfer_frame.pack(fill="x", padx=10, pady=(2, 2))
self._progress = ctk.CTkProgressBar(transfer_frame, height=14)
self._progress.pack(fill="x")
self._progress.set(0)
self._transfer_label = ctk.CTkLabel(
transfer_frame, text="", font=ctk.CTkFont(size=11),
text_color="#9ca3af", anchor="w",
)
self._transfer_label.pack(fill="x")
self._log = ctk.CTkTextbox(
self, height=80, font=ctk.CTkFont(family="Consolas", size=11),
state="disabled",
)
self._log.pack(fill="x", padx=10, pady=(0, 10))
# ── Server selection ──
def set_server(self, alias: str | None):
# Store state of current session before switching
if self._current_alias and self._sftp and self.session_pool:
self.session_pool.store_sftp_state(
self._current_alias,
self._remote_path,
self._sudo_var.get()
)
if self._current_alias == alias:
return
# Clear remote panel immediately to avoid showing stale files
self._remote_list.populate([])
self._remote_status.configure(text=t("switching_servers") if alias else t("connect_to_browse"))
self._set_remote_buttons_state("disabled")
self._disconnect_sftp()
self._current_alias = alias
if alias:
# Restore state from session pool if available
if self.session_pool:
stored_path, stored_sudo = self.session_pool.get_sftp_state(alias)
if stored_path != "/":
self._remote_path = stored_path
self._remote_status.configure(text=t("sftp_click_to_connect"))
else:
self._remote_list.populate([])
self._remote_status.configure(text=t("connect_to_browse"))
self._set_remote_buttons_state("disabled")
def connect(self):
"""Explicitly connect SFTP (double-click or context menu)."""
if self._current_alias and not self._sftp:
self._connect_sftp()
# ── SFTP connection ──
def _connect_sftp(self):
server = self.store.get_server(self._current_alias)
if not server:
self._remote_status.configure(text=t("sftp_server_not_found"))
self._set_remote_buttons_state("disabled")
return
# Capture current alias to prevent race condition when switching servers quickly
current_alias_at_call = self._current_alias
self._remote_status.configure(text=t("connecting_sftp"))
self._set_remote_buttons_state("disabled")
def _do():
# Only proceed if we're still on the same server
if self._current_alias != current_alias_at_call:
return
try:
# Use session pool if available
if self.session_pool:
sftp, is_new = self.session_pool.get_or_create_sftp_session(
current_alias_at_call, # Use the original alias to avoid race conditions
server,
self.store.get_ssh_key_path()
)
# Get stored state
stored_path, stored_sudo = self.session_pool.get_sftp_state(current_alias_at_call)
# Set sudo mode before connecting if it was stored
sftp.sudo_mode = stored_sudo
if is_new:
sftp.connect()
# Normalize path after potential reconnection
home = sftp.normalize(".")
if stored_path != "/":
# Validate the stored path still exists, fall back to home if not
try:
sftp.listdir_attr(stored_path)
home = stored_path # Use stored path as home if accessible
except:
pass # Fall back to normalized home path
# Final check before calling callback
if self._current_alias == current_alias_at_call:
self.after(0, lambda a=current_alias_at_call: self._on_sftp_connected(sftp, home, a))
else:
# Legacy behavior without session pool
sftp = SFTPSession(server, self.store.get_ssh_key_path())
sftp.connect()
home = sftp.normalize(".")
# Final check before calling callback
if self._current_alias == current_alias_at_call:
self.after(0, lambda a=current_alias_at_call: self._on_sftp_connected(sftp, home, a))
except Exception as e:
# Only show error if still on the same server
if self._current_alias == current_alias_at_call:
self.after(0, lambda: self._on_sftp_error(str(e)))
threading.Thread(target=_do, daemon=True).start()
def _on_sftp_connected(self, sftp: SFTPSession, home: str, expected_alias: str):
# Only update UI if we're still on the same server that requested this connection
if self._current_alias != expected_alias:
# This connection result is from a server we've already switched away from
# If we're not using session pooling, disconnect this session
if not self.session_pool:
try:
sftp.disconnect()
except:
pass
return
self._sftp = sftp
# Update sudo var to match the restored session state
self._sudo_var.set(self._sftp.sudo_mode)
# Update remote path to match stored path if available
self._remote_path = home
self._remote_history.clear()
self._remote_status.configure(
text=t("connected_sftp").format(alias=self._current_alias)
)
self._set_remote_buttons_state("normal")
self._refresh_remote()
def _on_sftp_error(self, error: str):
self._remote_status.configure(text=t("sftp_error").format(e=error))
self._log_msg(t("sftp_error").format(e=error))
def _disconnect_sftp(self):
# Clear the remote display immediately
self._remote_list.populate([])
self._remote_status.configure(text=t("disconnected"))
self._set_remote_buttons_state("disabled")
# Only disconnect if not using session pool (otherwise session stays alive)
if self._sftp and not self.session_pool:
try:
self._sftp.disconnect()
except Exception:
pass
self._sftp = None
# If using session pool, just clear our reference to prevent interaction with old session
elif self._sftp and self.session_pool:
self._sftp = None
# ── Browse / Drive ──
def _browse_local(self):
path = filedialog.askdirectory(initialdir=self._local_path)
if path:
self._local_history.append(self._local_path)
self._local_path = os.path.normpath(path)
self._refresh_local()
def _on_drive_change(self, drive: str):
self._local_history.append(self._local_path)
self._local_path = drive
self._refresh_local()
def _on_sudo_toggle(self):
if self._sftp:
self._sftp.sudo_mode = self._sudo_var.get()
self._refresh_remote()
# ── Local navigation ──
def _refresh_local(self):
self._local_path_entry.delete(0, "end")
self._local_path_entry.insert(0, self._local_path)
# Sync drive selector
if self._drive_menu and platform.system() == "Windows":
current_drive = os.path.splitdrive(self._local_path)[0] + "\\"
self._drive_var.set(current_drive)
try:
entries = os.listdir(self._local_path)
except PermissionError:
self._log_msg(t("permission_denied").format(path=self._local_path))
entries = []
items = []
# Parent dir
parent = os.path.dirname(self._local_path)
if parent != self._local_path:
items.append({"name": "..", "size": "", "date": "", "is_dir": True})
for name in entries:
full = os.path.join(self._local_path, name)
try:
st = os.stat(full)
is_dir = os.path.isdir(full)
items.append({
"name": name,
"size": "" if is_dir else _format_size(st.st_size),
"date": _format_date(st.st_mtime),
"is_dir": is_dir,
})
except (PermissionError, OSError):
items.append({
"name": name, "size": "?", "date": "?", "is_dir": False,
})
self._local_list.populate(items)
count = len([i for i in items if i["name"] != ".."])
self._local_status.configure(text=t("items_count").format(count=count))
def _navigate_local(self, name: str):
if name == "..":
self._local_go_up()
return
target = os.path.join(self._local_path, name)
if os.path.isdir(target):
self._local_history.append(self._local_path)
self._local_path = os.path.normpath(target)
self._refresh_local()
def _local_go_up(self):
parent = os.path.dirname(self._local_path)
if parent != self._local_path:
self._local_history.append(self._local_path)
self._local_path = parent
self._refresh_local()
def _local_go_back(self):
if self._local_history:
self._local_path = self._local_history.pop()
self._refresh_local()
def _local_go_to_path(self):
path = self._local_path_entry.get().strip()
if path and os.path.isdir(path):
self._local_history.append(self._local_path)
self._local_path = os.path.normpath(path)
self._refresh_local()
# ── Remote navigation ──
def _refresh_remote(self):
if not self._sftp:
return
self._remote_path_entry.delete(0, "end")
self._remote_path_entry.insert(0, self._remote_path)
# Capture current server alias to prevent race condition when switching servers
current_alias_at_call = self._current_alias
def _list_remote():
"""Fetch remote listing, returns items list."""
if self._sftp.sudo_mode:
try:
attrs = self._sftp.listdir_attr_sudo(self._remote_path)
except Exception:
attrs = self._sftp.listdir_attr(self._remote_path)
else:
attrs = self._sftp.listdir_attr(self._remote_path)
items = []
if self._remote_path != "/":
items.append({
"name": "..", "size": "", "date": "", "perm": "", "is_dir": True,
})
for a in attrs:
is_dir = stat.S_ISDIR(a.st_mode) if a.st_mode else False
items.append({
"name": a.filename,
"size": "" if is_dir else _format_size(a.st_size or 0),
"date": _format_date(a.st_mtime or 0),
"perm": _format_perm(a.st_mode & 0o777) if a.st_mode else "",
"is_dir": is_dir,
})
return items
def _do():
# Check if we're still on the same server after async operations
if self._current_alias != current_alias_at_call:
return # Cancel if server has changed
# Ensure connection is alive, reconnect if needed
if not self._sftp.connected:
try:
self._sftp.reconnect()
except Exception as e:
self.after(0, lambda: self._on_sftp_error(str(e)))
return
if not self._sftp.connected:
self.after(0, lambda: self._on_sftp_error("Reconnect failed"))
return
# Final check if server changed during connection attempt
if self._current_alias != current_alias_at_call:
return # Cancel if server has changed
try:
items = _list_remote()
# Final check before updating UI
if self._current_alias == current_alias_at_call:
self.after(0, lambda: self._populate_remote(items))
except PermissionError as e:
if self._current_alias == current_alias_at_call:
hint = f"\n{t('try_sudo_hint')}" if not self._sftp.sudo_mode else ""
self.after(0, lambda: self._on_sftp_error(str(e) + hint))
except Exception:
# Final check before attempting reconnect
if self._current_alias != current_alias_at_call:
return # Cancel if server has changed
# Operation failed — one reconnect attempt
try:
self._sftp.reconnect()
# Final check after reconnection attempt
if self._current_alias != current_alias_at_call:
return # Cancel if server has changed
items = _list_remote()
self.after(0, lambda: self._populate_remote(items))
except Exception as e2:
if self._current_alias == current_alias_at_call:
self.after(0, lambda: self._on_sftp_error(str(e2)))
threading.Thread(target=_do, daemon=True).start()
def _populate_remote(self, items: list[dict]):
self._remote_list.populate(items)
count = len([i for i in items if i["name"] != ".."])
self._remote_status.configure(text=t("items_count").format(count=count))
def _navigate_remote(self, name: str):
if not self._sftp:
return
if name == "..":
self._remote_go_up()
return
if self._remote_path == "/":
target = "/" + name
else:
target = self._remote_path + "/" + name
self._remote_history.append(self._remote_path)
self._remote_path = target
self._refresh_remote()
def _remote_go_up(self):
if self._remote_path == "/":
return
if not self._sftp:
return
parent = "/".join(self._remote_path.rstrip("/").split("/")[:-1]) or "/"
self._remote_history.append(self._remote_path)
self._remote_path = parent
self._refresh_remote()
def _remote_go_back(self):
if not self._sftp:
return
if self._remote_history:
self._remote_path = self._remote_history.pop()
self._refresh_remote()
def _remote_go_to_path(self):
if not self._sftp:
return
path = self._remote_path_entry.get().strip()
if path:
self._remote_history.append(self._remote_path)
self._remote_path = path
self._refresh_remote()
# ── Remote path helper ──
def _remote_join(self, name: str) -> str:
if self._remote_path == "/":
return "/" + name
return self._remote_path + "/" + name
# ── Upload / Download (shared logic) ──
def _do_upload(self, items: list[dict]):
"""Upload files and folders from local to remote. Runs in background thread."""
if not self._sftp or self._transferring:
return
items = [s for s in items if s["name"] != ".."]
if not items:
return
self._transferring = True
self._upload_btn.configure(state="disabled")
self._download_btn.configure(state="disabled")
def _do():
if not self._sftp.connected:
try:
self._sftp.reconnect()
except Exception as e:
self.after(0, lambda: self._on_sftp_error(str(e)))
self.after(0, self._on_transfer_done)
return
for item in items:
name = item["name"]
local = os.path.join(self._local_path, name)
remote = self._remote_join(name)
try:
if item.get("is_dir"):
self.after(0, lambda n=name: self._transfer_label.configure(
text=t("uploading_dir").format(name=n)
))
def _file_cb(cur, total, fname, n=name):
self.after(0, lambda c=cur, tt=total, fn=fname:
self._transfer_label.configure(
text=t("transfer_file_progress").format(cur=c, total=tt, name=fn)
))
def _progress(transferred, total):
if total > 0:
self.after(0, lambda f=transferred/total: self._progress.set(f))
self._sftp.upload_dir(local, remote, progress_cb=_progress, file_cb=_file_cb)
else:
file_size = os.path.getsize(local)
self.after(0, lambda n=name: self._transfer_label.configure(
text=t("uploading").format(name=n)
))
def _progress(transferred, total, n=name):
if total > 0:
frac = transferred / total
size_str = f"{_format_size(transferred)}/{_format_size(total)}"
self.after(0, lambda f=frac, s=size_str, nn=n:
self._update_progress(f, t("uploading").format(name=nn) + f" ({s})"))
if self._sftp.sudo_mode:
try:
self._sftp.upload_sudo(local, remote, progress_cb=_progress)
except Exception:
self._sftp.upload(local, remote, progress_cb=_progress)
else:
self._sftp.upload(local, remote, progress_cb=_progress)
self.after(0, lambda n=name: self._log_msg(
t("transfer_done").format(name=n)
))
except PermissionError as e:
hint = f" - {t('try_sudo_hint')}" if not self._sftp.sudo_mode else ""
self.after(0, lambda e=e, h=hint: self._log_msg(
t("transfer_failed").format(e=str(e)) + h
))
except Exception as e:
self.after(0, lambda e=e: self._log_msg(
t("transfer_failed").format(e=str(e))
))
self.after(0, self._on_transfer_done)
threading.Thread(target=_do, daemon=True).start()
def _do_download(self, items: list[dict]):
"""Download files and folders from remote to local. Runs in background thread."""
if not self._sftp or self._transferring:
return
items = [s for s in items if s["name"] != ".."]
if not items:
return
self._transferring = True
self._upload_btn.configure(state="disabled")
self._download_btn.configure(state="disabled")
def _do():
if not self._sftp.connected:
try:
self._sftp.reconnect()
except Exception as e:
self.after(0, lambda: self._on_sftp_error(str(e)))
self.after(0, self._on_transfer_done)
return
for item in items:
name = item["name"]
remote = self._remote_join(name)
local = os.path.join(self._local_path, name)
try:
if item.get("is_dir"):
self.after(0, lambda n=name: self._transfer_label.configure(
text=t("downloading_dir").format(name=n)
))
os.makedirs(local, exist_ok=True)
def _file_cb(cur, total, fname, n=name):
self.after(0, lambda c=cur, tt=total, fn=fname:
self._transfer_label.configure(
text=t("transfer_file_progress").format(cur=c, total=tt, name=fn)
))
def _progress(transferred, total):
if total > 0:
self.after(0, lambda f=transferred/total: self._progress.set(f))
self._sftp.download_dir(remote, local, progress_cb=_progress, file_cb=_file_cb)
else:
self.after(0, lambda n=name: self._transfer_label.configure(
text=t("downloading").format(name=n)
))
def _progress(transferred, total, n=name):
if total > 0:
frac = transferred / total
size_str = f"{_format_size(transferred)}/{_format_size(total)}"
self.after(0, lambda f=frac, s=size_str, nn=n:
self._update_progress(f, t("downloading").format(name=nn) + f" ({s})"))
if self._sftp.sudo_mode:
try:
self._sftp.download_sudo(remote, local, progress_cb=_progress)
except Exception:
self._sftp.download(remote, local, progress_cb=_progress)
else:
self._sftp.download(remote, local, progress_cb=_progress)
self.after(0, lambda n=name: self._log_msg(
t("transfer_done").format(name=n)
))
except PermissionError as e:
hint = f" - {t('try_sudo_hint')}" if not self._sftp.sudo_mode else ""
self.after(0, lambda e=e, h=hint: self._log_msg(
t("transfer_failed").format(e=str(e)) + h
))
except Exception as e:
self.after(0, lambda e=e: self._log_msg(
t("transfer_failed").format(e=str(e))
))
self.after(0, self._on_transfer_done)
threading.Thread(target=_do, daemon=True).start()
# ── Button handlers ──
def _upload_selected(self):
if not self._sftp or not self._sftp.connected or self._transferring:
return
selected = self._local_list.get_selected()
items = [s for s in selected if s["name"] != ".."]
if not items:
self._log_msg(t("no_server_selected"))
return
self._do_upload(items)
def _download_selected(self):
if not self._sftp or not self._sftp.connected or self._transferring:
return
selected = self._remote_list.get_selected()
items = [s for s in selected if s["name"] != ".."]
if not items:
return
self._do_download(items)
# ── Drag-and-drop callbacks ──
def _on_drop_to_remote(self, items: list[dict], source):
"""Called when items are dropped onto remote panel (upload)."""
self._do_upload(items)
def _on_drop_to_local(self, items: list[dict], source):
"""Called when items are dropped onto local panel (download)."""
self._do_download(items)
def _on_transfer_done(self):
self._transferring = False
self._progress.set(0)
self._transfer_label.configure(text="")
self._upload_btn.configure(state="normal")
self._download_btn.configure(state="normal")
self._refresh_local()
self._refresh_remote()
def _update_progress(self, fraction: float, text: str):
self._progress.set(fraction)
self._transfer_label.configure(text=text)
def _mkdir_remote(self):
if not self._sftp or not self._sftp.connected:
return
dialog = ctk.CTkInputDialog(
text=t("new_folder_name"), title=t("new_folder"),
)
name = dialog.get_input()
if not name or not name.strip():
return
name = name.strip()
path = self._remote_join(name)
def _do():
try:
self._sftp.mkdir(path)
self.after(0, self._refresh_remote)
except Exception as e:
self.after(0, lambda: self._log_msg(t("sftp_error").format(e=str(e))))
threading.Thread(target=_do, daemon=True).start()
def _delete_remote(self):
if not self._sftp or not self._sftp.connected:
return
selected = self._remote_list.get_selected()
items = [s for s in selected if s["name"] != ".."]
if not items:
return
# Check for directories — use recursive delete confirm
has_dirs = any(s.get("is_dir") for s in items)
if has_dirs and len(items) == 1 and items[0].get("is_dir"):
if not messagebox.askyesno(
t("delete_files"),
t("recursive_delete_confirm").format(name=items[0]["name"]),
):
return
else:
if not messagebox.askyesno(
t("delete_files"),
t("delete_files_confirm").format(count=len(items)),
):
return
def _do():
for item in items:
name = item["name"]
path = self._remote_join(name)
try:
if item.get("is_dir"):
self._sftp.rmdir_recursive(path)
else:
self._sftp.remove(path)
except Exception as e:
self.after(0, lambda: self._log_msg(
t("sftp_error").format(e=str(e))
))
self.after(0, self._refresh_remote)
threading.Thread(target=_do, daemon=True).start()
def _rename_remote(self):
if not self._sftp or not self._sftp.connected:
return
selected = self._remote_list.get_selected()
if not selected or selected[0]["name"] == "..":
return
old_name = selected[0]["name"]
dialog = ctk.CTkInputDialog(
text=t("rename_prompt"), title=t("rename_file"),
)
new_name = dialog.get_input()
if not new_name or not new_name.strip() or new_name.strip() == old_name:
return
new_name = new_name.strip()
old_path = self._remote_join(old_name)
new_path = self._remote_join(new_name)
def _do():
try:
self._sftp.rename(old_path, new_path)
self.after(0, self._refresh_remote)
except Exception as e:
self.after(0, lambda: self._log_msg(t("sftp_error").format(e=str(e))))
threading.Thread(target=_do, daemon=True).start()
# ── Helpers ──
def _set_remote_buttons_state(self, state: str):
for btn in (
self._upload_btn, self._download_btn,
self._mkdir_btn, self._delete_btn, self._rename_btn,
self._remote_refresh_btn, self._remote_back_btn, self._remote_up_btn,
):
btn.configure(state=state)
def _log_msg(self, text: str):
self._log.configure(state="normal")
self._log.insert("end", text + "\n")
self._log.configure(state="disabled")
self._log.see("end")