Fully reverted sidebar, files_tab, totp_tab, server_dialog to v1.8.10 base. Removed all entry_undo bindings that broke Ctrl+V paste globally. Only alias editing feature preserved (editable alias + rename support). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
953 lines
36 KiB
Python
953 lines
36 KiB
Python
"""
|
|
Files tab — dual-pane SFTP file manager.
|
|
"""
|
|
|
|
import os
|
|
import platform
|
|
import stat
|
|
import string
|
|
import threading
|
|
from datetime import datetime
|
|
from tkinter import messagebox, filedialog
|
|
|
|
import customtkinter as ctk
|
|
|
|
from core.i18n import t
|
|
from core.ssh_client import SFTPSession
|
|
from gui.widgets.file_list import FileListWidget
|
|
|
|
|
|
def _format_size(size_bytes: int) -> str:
|
|
if size_bytes < 0:
|
|
return ""
|
|
for unit in ("B", "KB", "MB", "GB"):
|
|
if size_bytes < 1024:
|
|
return f"{size_bytes:.1f} {unit}" if unit != "B" else f"{size_bytes} B"
|
|
size_bytes /= 1024
|
|
return f"{size_bytes:.1f} TB"
|
|
|
|
|
|
def _format_date(mtime: float) -> str:
|
|
try:
|
|
dt = datetime.fromtimestamp(mtime)
|
|
return dt.strftime("%b %d %H:%M")
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
def _format_perm(mode: int) -> str:
|
|
parts = []
|
|
for who in (6, 3, 0):
|
|
m = (mode >> who) & 7
|
|
parts.append(
|
|
("r" if m & 4 else "-")
|
|
+ ("w" if m & 2 else "-")
|
|
+ ("x" if m & 1 else "-")
|
|
)
|
|
return "".join(parts)
|
|
|
|
|
|
def _get_windows_drives() -> list[str]:
|
|
"""Return list of available drive letters on Windows (e.g. ['C:\\', 'D:\\'])."""
|
|
drives = []
|
|
for letter in string.ascii_uppercase:
|
|
drive = f"{letter}:\\"
|
|
if os.path.exists(drive):
|
|
drives.append(drive)
|
|
return drives
|
|
|
|
|
|
class FilesTab(ctk.CTkFrame):
|
|
def __init__(self, master, store, session_pool=None):
|
|
super().__init__(master, fg_color="transparent")
|
|
self.store = store
|
|
self.session_pool = session_pool
|
|
self._current_alias: str | None = None
|
|
self._sftp: SFTPSession | None = None
|
|
self._local_path = os.path.expanduser("~")
|
|
self._remote_path = "/"
|
|
self._local_history: list[str] = []
|
|
self._remote_history: list[str] = []
|
|
self._transferring = False
|
|
self._sudo_var = ctk.BooleanVar(value=False)
|
|
|
|
self._build_ui()
|
|
self._refresh_local()
|
|
|
|
def _build_ui(self):
|
|
# === Panes area ===
|
|
panes = ctk.CTkFrame(self, fg_color="transparent")
|
|
panes.pack(fill="both", expand=True, padx=10, pady=(10, 5))
|
|
|
|
# Left pane — Local
|
|
left_pane = ctk.CTkFrame(panes, fg_color="transparent")
|
|
left_pane.pack(side="left", fill="both", expand=True, padx=(0, 5))
|
|
|
|
left_header = ctk.CTkFrame(left_pane, fg_color="transparent")
|
|
left_header.pack(fill="x", pady=(0, 4))
|
|
|
|
ctk.CTkLabel(left_header, text=t("local_files"),
|
|
font=ctk.CTkFont(size=13, weight="bold")).pack(side="left")
|
|
|
|
self._local_back_btn = ctk.CTkButton(
|
|
left_header, text="\u2190", width=30, height=28,
|
|
command=self._local_go_back,
|
|
)
|
|
self._local_back_btn.pack(side="left", padx=(8, 2))
|
|
|
|
self._local_up_btn = ctk.CTkButton(
|
|
left_header, text="\u2191", width=30, height=28,
|
|
command=self._local_go_up,
|
|
)
|
|
self._local_up_btn.pack(side="left", padx=2)
|
|
|
|
# Local refresh button
|
|
self._local_refresh_btn = ctk.CTkButton(
|
|
left_header, text="\u21BB", width=30, height=28,
|
|
command=self._refresh_local,
|
|
)
|
|
self._local_refresh_btn.pack(side="left", padx=2)
|
|
|
|
# Browse button
|
|
self._browse_btn = ctk.CTkButton(
|
|
left_header, text=t("browse"), width=60, height=28,
|
|
command=self._browse_local,
|
|
)
|
|
self._browse_btn.pack(side="left", padx=2)
|
|
|
|
# Windows drive selector
|
|
self._drive_menu = None
|
|
if platform.system() == "Windows":
|
|
drives = _get_windows_drives()
|
|
if drives:
|
|
current_drive = os.path.splitdrive(self._local_path)[0] + "\\"
|
|
self._drive_var = ctk.StringVar(value=current_drive)
|
|
self._drive_menu = ctk.CTkOptionMenu(
|
|
left_header, values=drives, variable=self._drive_var,
|
|
width=65, height=28, command=self._on_drive_change,
|
|
)
|
|
self._drive_menu.pack(side="left", padx=2)
|
|
|
|
self._local_path_entry = ctk.CTkEntry(left_header, height=28)
|
|
self._local_path_entry.pack(side="left", fill="x", expand=True, padx=(4, 0))
|
|
self._local_path_entry.bind("<Return>", lambda e: self._local_go_to_path())
|
|
|
|
self._local_list = FileListWidget(
|
|
left_pane,
|
|
columns=[(t("name_col"), 220), (t("size_col"), 80), (t("date_col"), 110)],
|
|
on_navigate=self._navigate_local,
|
|
on_drop=self._on_drop_to_local,
|
|
)
|
|
self._local_list.pack(fill="both", expand=True)
|
|
|
|
self._local_status = ctk.CTkLabel(
|
|
left_pane, text="", font=ctk.CTkFont(size=11),
|
|
text_color="#9ca3af", anchor="w",
|
|
)
|
|
self._local_status.pack(fill="x", pady=(2, 0))
|
|
|
|
# Right pane — Remote
|
|
right_pane = ctk.CTkFrame(panes, fg_color="transparent")
|
|
right_pane.pack(side="right", fill="both", expand=True, padx=(5, 0))
|
|
|
|
right_header = ctk.CTkFrame(right_pane, fg_color="transparent")
|
|
right_header.pack(fill="x", pady=(0, 4))
|
|
|
|
ctk.CTkLabel(right_header, text=t("remote_files"),
|
|
font=ctk.CTkFont(size=13, weight="bold")).pack(side="left")
|
|
|
|
self._remote_back_btn = ctk.CTkButton(
|
|
right_header, text="\u2190", width=30, height=28,
|
|
command=self._remote_go_back,
|
|
)
|
|
self._remote_back_btn.pack(side="left", padx=(8, 2))
|
|
|
|
self._remote_up_btn = ctk.CTkButton(
|
|
right_header, text="\u2191", width=30, height=28,
|
|
command=self._remote_go_up,
|
|
)
|
|
self._remote_up_btn.pack(side="left", padx=2)
|
|
|
|
self._remote_refresh_btn = ctk.CTkButton(
|
|
right_header, text="\u21BB", width=30, height=28,
|
|
command=self._refresh_remote,
|
|
)
|
|
self._remote_refresh_btn.pack(side="left", padx=2)
|
|
|
|
self._remote_path_entry = ctk.CTkEntry(right_header, height=28)
|
|
self._remote_path_entry.pack(side="left", fill="x", expand=True, padx=(4, 0))
|
|
self._remote_path_entry.bind("<Return>", lambda e: self._remote_go_to_path())
|
|
|
|
self._remote_list = FileListWidget(
|
|
right_pane,
|
|
columns=[
|
|
(t("name_col"), 200), (t("size_col"), 70),
|
|
(t("date_col"), 100), (t("perm_col"), 80),
|
|
],
|
|
on_navigate=self._navigate_remote,
|
|
on_drop=self._on_drop_to_remote,
|
|
)
|
|
self._remote_list.pack(fill="both", expand=True)
|
|
|
|
self._remote_status = ctk.CTkLabel(
|
|
right_pane, text=t("connect_to_browse"),
|
|
font=ctk.CTkFont(size=11), text_color="#9ca3af", anchor="w",
|
|
)
|
|
self._remote_status.pack(fill="x", pady=(2, 0))
|
|
|
|
# Wire drag-and-drop partners
|
|
self._local_list.set_drag_partner(self._remote_list)
|
|
self._remote_list.set_drag_partner(self._local_list)
|
|
|
|
# === Toolbar ===
|
|
toolbar = ctk.CTkFrame(self, fg_color="transparent")
|
|
toolbar.pack(fill="x", padx=10, pady=4)
|
|
|
|
self._upload_btn = ctk.CTkButton(
|
|
toolbar, text=f"{t('upload')} \u2192", width=110, height=30,
|
|
command=self._upload_selected,
|
|
)
|
|
self._upload_btn.pack(side="left", padx=(0, 4))
|
|
|
|
self._download_btn = ctk.CTkButton(
|
|
toolbar, text=f"\u2190 {t('download')}", width=110, height=30,
|
|
command=self._download_selected,
|
|
)
|
|
self._download_btn.pack(side="left", padx=4)
|
|
|
|
sep = ctk.CTkFrame(toolbar, width=2, height=24, fg_color="gray40")
|
|
sep.pack(side="left", padx=8)
|
|
|
|
self._mkdir_btn = ctk.CTkButton(
|
|
toolbar, text=t("new_folder"), width=100, height=30,
|
|
command=self._mkdir_remote,
|
|
)
|
|
self._mkdir_btn.pack(side="left", padx=4)
|
|
|
|
self._delete_btn = ctk.CTkButton(
|
|
toolbar, text=t("delete_files"), width=80, height=30,
|
|
fg_color="#dc2626", hover_color="#b91c1c",
|
|
command=self._delete_remote,
|
|
)
|
|
self._delete_btn.pack(side="left", padx=4)
|
|
|
|
self._rename_btn = ctk.CTkButton(
|
|
toolbar, text=t("rename_file"), width=100, height=30,
|
|
command=self._rename_remote,
|
|
)
|
|
self._rename_btn.pack(side="left", padx=4)
|
|
|
|
# Sudo toggle
|
|
sep2 = ctk.CTkFrame(toolbar, width=2, height=24, fg_color="gray40")
|
|
sep2.pack(side="left", padx=8)
|
|
|
|
self._sudo_switch = ctk.CTkSwitch(
|
|
toolbar, text=t("sudo_mode"), variable=self._sudo_var,
|
|
width=50, height=24, command=self._on_sudo_toggle,
|
|
)
|
|
self._sudo_switch.pack(side="left", padx=4)
|
|
|
|
self._set_remote_buttons_state("disabled")
|
|
|
|
# === Transfer / Log area ===
|
|
transfer_frame = ctk.CTkFrame(self, fg_color="transparent")
|
|
transfer_frame.pack(fill="x", padx=10, pady=(2, 2))
|
|
|
|
self._progress = ctk.CTkProgressBar(transfer_frame, height=14)
|
|
self._progress.pack(fill="x")
|
|
self._progress.set(0)
|
|
|
|
self._transfer_label = ctk.CTkLabel(
|
|
transfer_frame, text="", font=ctk.CTkFont(size=11),
|
|
text_color="#9ca3af", anchor="w",
|
|
)
|
|
self._transfer_label.pack(fill="x")
|
|
|
|
self._log = ctk.CTkTextbox(
|
|
self, height=80, font=ctk.CTkFont(family="Consolas", size=11),
|
|
state="disabled",
|
|
)
|
|
self._log.pack(fill="x", padx=10, pady=(0, 10))
|
|
|
|
# ── Server selection ──
|
|
|
|
def set_server(self, alias: str | None):
|
|
# Store state of current session before switching
|
|
if self._current_alias and self._sftp and self.session_pool:
|
|
self.session_pool.store_sftp_state(
|
|
self._current_alias,
|
|
self._remote_path,
|
|
self._sudo_var.get()
|
|
)
|
|
|
|
if self._current_alias == alias:
|
|
return
|
|
|
|
# Clear remote panel immediately to avoid showing stale files
|
|
self._remote_list.populate([])
|
|
self._remote_status.configure(text=t("switching_servers") if alias else t("connect_to_browse"))
|
|
self._set_remote_buttons_state("disabled")
|
|
|
|
self._disconnect_sftp()
|
|
self._current_alias = alias
|
|
|
|
if alias:
|
|
# Restore state from session pool if available
|
|
if self.session_pool:
|
|
stored_path, stored_sudo = self.session_pool.get_sftp_state(alias)
|
|
if stored_path != "/":
|
|
self._remote_path = stored_path
|
|
# The stored sudo mode will be applied when the connection is established
|
|
self._connect_sftp()
|
|
else:
|
|
self._remote_list.populate([])
|
|
self._remote_status.configure(text=t("connect_to_browse"))
|
|
self._set_remote_buttons_state("disabled")
|
|
|
|
# ── SFTP connection ──
|
|
|
|
def _connect_sftp(self):
|
|
server = self.store.get_server(self._current_alias)
|
|
if not server:
|
|
self._remote_status.configure(text=t("sftp_server_not_found"))
|
|
self._set_remote_buttons_state("disabled")
|
|
return
|
|
|
|
# Capture current alias to prevent race condition when switching servers quickly
|
|
current_alias_at_call = self._current_alias
|
|
|
|
self._remote_status.configure(text=t("connecting_sftp"))
|
|
self._set_remote_buttons_state("disabled")
|
|
|
|
def _do():
|
|
# Only proceed if we're still on the same server
|
|
if self._current_alias != current_alias_at_call:
|
|
return
|
|
|
|
try:
|
|
# Use session pool if available
|
|
if self.session_pool:
|
|
sftp, is_new = self.session_pool.get_or_create_sftp_session(
|
|
current_alias_at_call, # Use the original alias to avoid race conditions
|
|
server,
|
|
self.store.get_ssh_key_path()
|
|
)
|
|
|
|
# Get stored state
|
|
stored_path, stored_sudo = self.session_pool.get_sftp_state(current_alias_at_call)
|
|
|
|
# Set sudo mode before connecting if it was stored
|
|
sftp.sudo_mode = stored_sudo
|
|
|
|
if is_new:
|
|
sftp.connect()
|
|
|
|
# Normalize path after potential reconnection
|
|
home = sftp.normalize(".")
|
|
if stored_path != "/":
|
|
# Validate the stored path still exists, fall back to home if not
|
|
try:
|
|
sftp.listdir_attr(stored_path)
|
|
home = stored_path # Use stored path as home if accessible
|
|
except:
|
|
pass # Fall back to normalized home path
|
|
|
|
# Final check before calling callback
|
|
if self._current_alias == current_alias_at_call:
|
|
self.after(0, lambda a=current_alias_at_call: self._on_sftp_connected(sftp, home, a))
|
|
else:
|
|
# Legacy behavior without session pool
|
|
sftp = SFTPSession(server, self.store.get_ssh_key_path())
|
|
sftp.connect()
|
|
home = sftp.normalize(".")
|
|
# Final check before calling callback
|
|
if self._current_alias == current_alias_at_call:
|
|
self.after(0, lambda a=current_alias_at_call: self._on_sftp_connected(sftp, home, a))
|
|
except Exception as e:
|
|
# Only show error if still on the same server
|
|
if self._current_alias == current_alias_at_call:
|
|
self.after(0, lambda: self._on_sftp_error(str(e)))
|
|
|
|
threading.Thread(target=_do, daemon=True).start()
|
|
|
|
def _on_sftp_connected(self, sftp: SFTPSession, home: str, expected_alias: str):
|
|
# Only update UI if we're still on the same server that requested this connection
|
|
if self._current_alias != expected_alias:
|
|
# This connection result is from a server we've already switched away from
|
|
# If we're not using session pooling, disconnect this session
|
|
if not self.session_pool:
|
|
try:
|
|
sftp.disconnect()
|
|
except:
|
|
pass
|
|
return
|
|
|
|
self._sftp = sftp
|
|
# Update sudo var to match the restored session state
|
|
self._sudo_var.set(self._sftp.sudo_mode)
|
|
# Update remote path to match stored path if available
|
|
self._remote_path = home
|
|
self._remote_history.clear()
|
|
self._remote_status.configure(
|
|
text=t("connected_sftp").format(alias=self._current_alias)
|
|
)
|
|
self._set_remote_buttons_state("normal")
|
|
self._refresh_remote()
|
|
|
|
def _on_sftp_error(self, error: str):
|
|
self._remote_status.configure(text=t("sftp_error").format(e=error))
|
|
self._log_msg(t("sftp_error").format(e=error))
|
|
|
|
def _disconnect_sftp(self):
|
|
# Clear the remote display immediately
|
|
self._remote_list.populate([])
|
|
self._remote_status.configure(text=t("disconnected"))
|
|
self._set_remote_buttons_state("disabled")
|
|
|
|
# Only disconnect if not using session pool (otherwise session stays alive)
|
|
if self._sftp and not self.session_pool:
|
|
try:
|
|
self._sftp.disconnect()
|
|
except Exception:
|
|
pass
|
|
self._sftp = None
|
|
# If using session pool, just clear our reference to prevent interaction with old session
|
|
elif self._sftp and self.session_pool:
|
|
self._sftp = None
|
|
|
|
# ── Browse / Drive ──
|
|
|
|
def _browse_local(self):
|
|
path = filedialog.askdirectory(initialdir=self._local_path)
|
|
if path:
|
|
self._local_history.append(self._local_path)
|
|
self._local_path = os.path.normpath(path)
|
|
self._refresh_local()
|
|
|
|
def _on_drive_change(self, drive: str):
|
|
self._local_history.append(self._local_path)
|
|
self._local_path = drive
|
|
self._refresh_local()
|
|
|
|
def _on_sudo_toggle(self):
|
|
if self._sftp:
|
|
self._sftp.sudo_mode = self._sudo_var.get()
|
|
self._refresh_remote()
|
|
|
|
# ── Local navigation ──
|
|
|
|
def _refresh_local(self):
|
|
self._local_path_entry.delete(0, "end")
|
|
self._local_path_entry.insert(0, self._local_path)
|
|
|
|
# Sync drive selector
|
|
if self._drive_menu and platform.system() == "Windows":
|
|
current_drive = os.path.splitdrive(self._local_path)[0] + "\\"
|
|
self._drive_var.set(current_drive)
|
|
|
|
try:
|
|
entries = os.listdir(self._local_path)
|
|
except PermissionError:
|
|
self._log_msg(t("permission_denied").format(path=self._local_path))
|
|
entries = []
|
|
|
|
items = []
|
|
# Parent dir
|
|
parent = os.path.dirname(self._local_path)
|
|
if parent != self._local_path:
|
|
items.append({"name": "..", "size": "", "date": "", "is_dir": True})
|
|
|
|
for name in entries:
|
|
full = os.path.join(self._local_path, name)
|
|
try:
|
|
st = os.stat(full)
|
|
is_dir = os.path.isdir(full)
|
|
items.append({
|
|
"name": name,
|
|
"size": "" if is_dir else _format_size(st.st_size),
|
|
"date": _format_date(st.st_mtime),
|
|
"is_dir": is_dir,
|
|
})
|
|
except (PermissionError, OSError):
|
|
items.append({
|
|
"name": name, "size": "?", "date": "?", "is_dir": False,
|
|
})
|
|
|
|
self._local_list.populate(items)
|
|
count = len([i for i in items if i["name"] != ".."])
|
|
self._local_status.configure(text=t("items_count").format(count=count))
|
|
|
|
def _navigate_local(self, name: str):
|
|
if name == "..":
|
|
self._local_go_up()
|
|
return
|
|
target = os.path.join(self._local_path, name)
|
|
if os.path.isdir(target):
|
|
self._local_history.append(self._local_path)
|
|
self._local_path = os.path.normpath(target)
|
|
self._refresh_local()
|
|
|
|
def _local_go_up(self):
|
|
parent = os.path.dirname(self._local_path)
|
|
if parent != self._local_path:
|
|
self._local_history.append(self._local_path)
|
|
self._local_path = parent
|
|
self._refresh_local()
|
|
|
|
def _local_go_back(self):
|
|
if self._local_history:
|
|
self._local_path = self._local_history.pop()
|
|
self._refresh_local()
|
|
|
|
def _local_go_to_path(self):
|
|
path = self._local_path_entry.get().strip()
|
|
if path and os.path.isdir(path):
|
|
self._local_history.append(self._local_path)
|
|
self._local_path = os.path.normpath(path)
|
|
self._refresh_local()
|
|
|
|
# ── Remote navigation ──
|
|
|
|
def _refresh_remote(self):
|
|
if not self._sftp:
|
|
return
|
|
self._remote_path_entry.delete(0, "end")
|
|
self._remote_path_entry.insert(0, self._remote_path)
|
|
|
|
# Capture current server alias to prevent race condition when switching servers
|
|
current_alias_at_call = self._current_alias
|
|
|
|
def _list_remote():
|
|
"""Fetch remote listing, returns items list."""
|
|
if self._sftp.sudo_mode:
|
|
try:
|
|
attrs = self._sftp.listdir_attr_sudo(self._remote_path)
|
|
except Exception:
|
|
attrs = self._sftp.listdir_attr(self._remote_path)
|
|
else:
|
|
attrs = self._sftp.listdir_attr(self._remote_path)
|
|
items = []
|
|
if self._remote_path != "/":
|
|
items.append({
|
|
"name": "..", "size": "", "date": "", "perm": "", "is_dir": True,
|
|
})
|
|
for a in attrs:
|
|
is_dir = stat.S_ISDIR(a.st_mode) if a.st_mode else False
|
|
items.append({
|
|
"name": a.filename,
|
|
"size": "" if is_dir else _format_size(a.st_size or 0),
|
|
"date": _format_date(a.st_mtime or 0),
|
|
"perm": _format_perm(a.st_mode & 0o777) if a.st_mode else "",
|
|
"is_dir": is_dir,
|
|
})
|
|
return items
|
|
|
|
def _do():
|
|
# Check if we're still on the same server after async operations
|
|
if self._current_alias != current_alias_at_call:
|
|
return # Cancel if server has changed
|
|
|
|
# Ensure connection is alive, reconnect if needed
|
|
if not self._sftp.connected:
|
|
try:
|
|
self._sftp.reconnect()
|
|
except Exception as e:
|
|
self.after(0, lambda: self._on_sftp_error(str(e)))
|
|
return
|
|
if not self._sftp.connected:
|
|
self.after(0, lambda: self._on_sftp_error("Reconnect failed"))
|
|
return
|
|
|
|
# Final check if server changed during connection attempt
|
|
if self._current_alias != current_alias_at_call:
|
|
return # Cancel if server has changed
|
|
|
|
try:
|
|
items = _list_remote()
|
|
# Final check before updating UI
|
|
if self._current_alias == current_alias_at_call:
|
|
self.after(0, lambda: self._populate_remote(items))
|
|
except PermissionError as e:
|
|
if self._current_alias == current_alias_at_call:
|
|
hint = f"\n{t('try_sudo_hint')}" if not self._sftp.sudo_mode else ""
|
|
self.after(0, lambda: self._on_sftp_error(str(e) + hint))
|
|
except Exception:
|
|
# Final check before attempting reconnect
|
|
if self._current_alias != current_alias_at_call:
|
|
return # Cancel if server has changed
|
|
|
|
# Operation failed — one reconnect attempt
|
|
try:
|
|
self._sftp.reconnect()
|
|
|
|
# Final check after reconnection attempt
|
|
if self._current_alias != current_alias_at_call:
|
|
return # Cancel if server has changed
|
|
|
|
items = _list_remote()
|
|
self.after(0, lambda: self._populate_remote(items))
|
|
except Exception as e2:
|
|
if self._current_alias == current_alias_at_call:
|
|
self.after(0, lambda: self._on_sftp_error(str(e2)))
|
|
|
|
threading.Thread(target=_do, daemon=True).start()
|
|
|
|
def _populate_remote(self, items: list[dict]):
|
|
self._remote_list.populate(items)
|
|
count = len([i for i in items if i["name"] != ".."])
|
|
self._remote_status.configure(text=t("items_count").format(count=count))
|
|
|
|
def _navigate_remote(self, name: str):
|
|
if not self._sftp:
|
|
return
|
|
if name == "..":
|
|
self._remote_go_up()
|
|
return
|
|
if self._remote_path == "/":
|
|
target = "/" + name
|
|
else:
|
|
target = self._remote_path + "/" + name
|
|
self._remote_history.append(self._remote_path)
|
|
self._remote_path = target
|
|
self._refresh_remote()
|
|
|
|
def _remote_go_up(self):
|
|
if self._remote_path == "/":
|
|
return
|
|
if not self._sftp:
|
|
return
|
|
parent = "/".join(self._remote_path.rstrip("/").split("/")[:-1]) or "/"
|
|
self._remote_history.append(self._remote_path)
|
|
self._remote_path = parent
|
|
self._refresh_remote()
|
|
|
|
def _remote_go_back(self):
|
|
if not self._sftp:
|
|
return
|
|
if self._remote_history:
|
|
self._remote_path = self._remote_history.pop()
|
|
self._refresh_remote()
|
|
|
|
def _remote_go_to_path(self):
|
|
if not self._sftp:
|
|
return
|
|
path = self._remote_path_entry.get().strip()
|
|
if path:
|
|
self._remote_history.append(self._remote_path)
|
|
self._remote_path = path
|
|
self._refresh_remote()
|
|
|
|
# ── Remote path helper ──
|
|
|
|
def _remote_join(self, name: str) -> str:
|
|
if self._remote_path == "/":
|
|
return "/" + name
|
|
return self._remote_path + "/" + name
|
|
|
|
# ── Upload / Download (shared logic) ──
|
|
|
|
def _do_upload(self, items: list[dict]):
|
|
"""Upload files and folders from local to remote. Runs in background thread."""
|
|
if not self._sftp or self._transferring:
|
|
return
|
|
items = [s for s in items if s["name"] != ".."]
|
|
if not items:
|
|
return
|
|
|
|
self._transferring = True
|
|
self._upload_btn.configure(state="disabled")
|
|
self._download_btn.configure(state="disabled")
|
|
|
|
def _do():
|
|
if not self._sftp.connected:
|
|
try:
|
|
self._sftp.reconnect()
|
|
except Exception as e:
|
|
self.after(0, lambda: self._on_sftp_error(str(e)))
|
|
self.after(0, self._on_transfer_done)
|
|
return
|
|
for item in items:
|
|
name = item["name"]
|
|
local = os.path.join(self._local_path, name)
|
|
remote = self._remote_join(name)
|
|
try:
|
|
if item.get("is_dir"):
|
|
self.after(0, lambda n=name: self._transfer_label.configure(
|
|
text=t("uploading_dir").format(name=n)
|
|
))
|
|
|
|
def _file_cb(cur, total, fname, n=name):
|
|
self.after(0, lambda c=cur, tt=total, fn=fname:
|
|
self._transfer_label.configure(
|
|
text=t("transfer_file_progress").format(cur=c, total=tt, name=fn)
|
|
))
|
|
|
|
def _progress(transferred, total):
|
|
if total > 0:
|
|
self.after(0, lambda f=transferred/total: self._progress.set(f))
|
|
|
|
self._sftp.upload_dir(local, remote, progress_cb=_progress, file_cb=_file_cb)
|
|
else:
|
|
file_size = os.path.getsize(local)
|
|
self.after(0, lambda n=name: self._transfer_label.configure(
|
|
text=t("uploading").format(name=n)
|
|
))
|
|
|
|
def _progress(transferred, total, n=name):
|
|
if total > 0:
|
|
frac = transferred / total
|
|
size_str = f"{_format_size(transferred)}/{_format_size(total)}"
|
|
self.after(0, lambda f=frac, s=size_str, nn=n:
|
|
self._update_progress(f, t("uploading").format(name=nn) + f" ({s})"))
|
|
|
|
if self._sftp.sudo_mode:
|
|
try:
|
|
self._sftp.upload_sudo(local, remote, progress_cb=_progress)
|
|
except Exception:
|
|
self._sftp.upload(local, remote, progress_cb=_progress)
|
|
else:
|
|
self._sftp.upload(local, remote, progress_cb=_progress)
|
|
|
|
self.after(0, lambda n=name: self._log_msg(
|
|
t("transfer_done").format(name=n)
|
|
))
|
|
except PermissionError as e:
|
|
hint = f" - {t('try_sudo_hint')}" if not self._sftp.sudo_mode else ""
|
|
self.after(0, lambda e=e, h=hint: self._log_msg(
|
|
t("transfer_failed").format(e=str(e)) + h
|
|
))
|
|
except Exception as e:
|
|
self.after(0, lambda e=e: self._log_msg(
|
|
t("transfer_failed").format(e=str(e))
|
|
))
|
|
|
|
self.after(0, self._on_transfer_done)
|
|
|
|
threading.Thread(target=_do, daemon=True).start()
|
|
|
|
def _do_download(self, items: list[dict]):
|
|
"""Download files and folders from remote to local. Runs in background thread."""
|
|
if not self._sftp or self._transferring:
|
|
return
|
|
items = [s for s in items if s["name"] != ".."]
|
|
if not items:
|
|
return
|
|
|
|
self._transferring = True
|
|
self._upload_btn.configure(state="disabled")
|
|
self._download_btn.configure(state="disabled")
|
|
|
|
def _do():
|
|
if not self._sftp.connected:
|
|
try:
|
|
self._sftp.reconnect()
|
|
except Exception as e:
|
|
self.after(0, lambda: self._on_sftp_error(str(e)))
|
|
self.after(0, self._on_transfer_done)
|
|
return
|
|
for item in items:
|
|
name = item["name"]
|
|
remote = self._remote_join(name)
|
|
local = os.path.join(self._local_path, name)
|
|
try:
|
|
if item.get("is_dir"):
|
|
self.after(0, lambda n=name: self._transfer_label.configure(
|
|
text=t("downloading_dir").format(name=n)
|
|
))
|
|
os.makedirs(local, exist_ok=True)
|
|
|
|
def _file_cb(cur, total, fname, n=name):
|
|
self.after(0, lambda c=cur, tt=total, fn=fname:
|
|
self._transfer_label.configure(
|
|
text=t("transfer_file_progress").format(cur=c, total=tt, name=fn)
|
|
))
|
|
|
|
def _progress(transferred, total):
|
|
if total > 0:
|
|
self.after(0, lambda f=transferred/total: self._progress.set(f))
|
|
|
|
self._sftp.download_dir(remote, local, progress_cb=_progress, file_cb=_file_cb)
|
|
else:
|
|
self.after(0, lambda n=name: self._transfer_label.configure(
|
|
text=t("downloading").format(name=n)
|
|
))
|
|
|
|
def _progress(transferred, total, n=name):
|
|
if total > 0:
|
|
frac = transferred / total
|
|
size_str = f"{_format_size(transferred)}/{_format_size(total)}"
|
|
self.after(0, lambda f=frac, s=size_str, nn=n:
|
|
self._update_progress(f, t("downloading").format(name=nn) + f" ({s})"))
|
|
|
|
if self._sftp.sudo_mode:
|
|
try:
|
|
self._sftp.download_sudo(remote, local, progress_cb=_progress)
|
|
except Exception:
|
|
self._sftp.download(remote, local, progress_cb=_progress)
|
|
else:
|
|
self._sftp.download(remote, local, progress_cb=_progress)
|
|
|
|
self.after(0, lambda n=name: self._log_msg(
|
|
t("transfer_done").format(name=n)
|
|
))
|
|
except PermissionError as e:
|
|
hint = f" - {t('try_sudo_hint')}" if not self._sftp.sudo_mode else ""
|
|
self.after(0, lambda e=e, h=hint: self._log_msg(
|
|
t("transfer_failed").format(e=str(e)) + h
|
|
))
|
|
except Exception as e:
|
|
self.after(0, lambda e=e: self._log_msg(
|
|
t("transfer_failed").format(e=str(e))
|
|
))
|
|
|
|
self.after(0, self._on_transfer_done)
|
|
|
|
threading.Thread(target=_do, daemon=True).start()
|
|
|
|
# ── Button handlers ──
|
|
|
|
def _upload_selected(self):
|
|
if not self._sftp or not self._sftp.connected or self._transferring:
|
|
return
|
|
selected = self._local_list.get_selected()
|
|
items = [s for s in selected if s["name"] != ".."]
|
|
if not items:
|
|
self._log_msg(t("no_server_selected"))
|
|
return
|
|
self._do_upload(items)
|
|
|
|
def _download_selected(self):
|
|
if not self._sftp or not self._sftp.connected or self._transferring:
|
|
return
|
|
selected = self._remote_list.get_selected()
|
|
items = [s for s in selected if s["name"] != ".."]
|
|
if not items:
|
|
return
|
|
self._do_download(items)
|
|
|
|
# ── Drag-and-drop callbacks ──
|
|
|
|
def _on_drop_to_remote(self, items: list[dict], source):
|
|
"""Called when items are dropped onto remote panel (upload)."""
|
|
self._do_upload(items)
|
|
|
|
def _on_drop_to_local(self, items: list[dict], source):
|
|
"""Called when items are dropped onto local panel (download)."""
|
|
self._do_download(items)
|
|
|
|
def _on_transfer_done(self):
|
|
self._transferring = False
|
|
self._progress.set(0)
|
|
self._transfer_label.configure(text="")
|
|
self._upload_btn.configure(state="normal")
|
|
self._download_btn.configure(state="normal")
|
|
self._refresh_local()
|
|
self._refresh_remote()
|
|
|
|
def _update_progress(self, fraction: float, text: str):
|
|
self._progress.set(fraction)
|
|
self._transfer_label.configure(text=text)
|
|
|
|
def _mkdir_remote(self):
|
|
if not self._sftp or not self._sftp.connected:
|
|
return
|
|
dialog = ctk.CTkInputDialog(
|
|
text=t("new_folder_name"), title=t("new_folder"),
|
|
)
|
|
name = dialog.get_input()
|
|
if not name or not name.strip():
|
|
return
|
|
name = name.strip()
|
|
path = self._remote_join(name)
|
|
|
|
def _do():
|
|
try:
|
|
self._sftp.mkdir(path)
|
|
self.after(0, self._refresh_remote)
|
|
except Exception as e:
|
|
self.after(0, lambda: self._log_msg(t("sftp_error").format(e=str(e))))
|
|
|
|
threading.Thread(target=_do, daemon=True).start()
|
|
|
|
def _delete_remote(self):
|
|
if not self._sftp or not self._sftp.connected:
|
|
return
|
|
selected = self._remote_list.get_selected()
|
|
items = [s for s in selected if s["name"] != ".."]
|
|
if not items:
|
|
return
|
|
|
|
# Check for directories — use recursive delete confirm
|
|
has_dirs = any(s.get("is_dir") for s in items)
|
|
if has_dirs and len(items) == 1 and items[0].get("is_dir"):
|
|
if not messagebox.askyesno(
|
|
t("delete_files"),
|
|
t("recursive_delete_confirm").format(name=items[0]["name"]),
|
|
):
|
|
return
|
|
else:
|
|
if not messagebox.askyesno(
|
|
t("delete_files"),
|
|
t("delete_files_confirm").format(count=len(items)),
|
|
):
|
|
return
|
|
|
|
def _do():
|
|
for item in items:
|
|
name = item["name"]
|
|
path = self._remote_join(name)
|
|
try:
|
|
if item.get("is_dir"):
|
|
self._sftp.rmdir_recursive(path)
|
|
else:
|
|
self._sftp.remove(path)
|
|
except Exception as e:
|
|
self.after(0, lambda: self._log_msg(
|
|
t("sftp_error").format(e=str(e))
|
|
))
|
|
self.after(0, self._refresh_remote)
|
|
|
|
threading.Thread(target=_do, daemon=True).start()
|
|
|
|
def _rename_remote(self):
|
|
if not self._sftp or not self._sftp.connected:
|
|
return
|
|
selected = self._remote_list.get_selected()
|
|
if not selected or selected[0]["name"] == "..":
|
|
return
|
|
old_name = selected[0]["name"]
|
|
dialog = ctk.CTkInputDialog(
|
|
text=t("rename_prompt"), title=t("rename_file"),
|
|
)
|
|
new_name = dialog.get_input()
|
|
if not new_name or not new_name.strip() or new_name.strip() == old_name:
|
|
return
|
|
new_name = new_name.strip()
|
|
old_path = self._remote_join(old_name)
|
|
new_path = self._remote_join(new_name)
|
|
|
|
def _do():
|
|
try:
|
|
self._sftp.rename(old_path, new_path)
|
|
self.after(0, self._refresh_remote)
|
|
except Exception as e:
|
|
self.after(0, lambda: self._log_msg(t("sftp_error").format(e=str(e))))
|
|
|
|
threading.Thread(target=_do, daemon=True).start()
|
|
|
|
# ── Helpers ──
|
|
|
|
def _set_remote_buttons_state(self, state: str):
|
|
for btn in (
|
|
self._upload_btn, self._download_btn,
|
|
self._mkdir_btn, self._delete_btn, self._rename_btn,
|
|
self._remote_refresh_btn, self._remote_back_btn, self._remote_up_btn,
|
|
):
|
|
btn.configure(state=state)
|
|
|
|
def _log_msg(self, text: str):
|
|
self._log.configure(state="normal")
|
|
self._log.insert("end", text + "\n")
|
|
self._log.configure(state="disabled")
|
|
self._log.see("end")
|