Files
server-manager/gui/tabs/files_tab.py
chrome-storm-c442 3e9aeababe v1.8.0: file manager improvements
- SFTP cleanup on app close and language switch
- Windows drive selector in local panel
- Browse and Refresh buttons for local panel
- Recursive upload/download/delete of folders
- Drag-and-drop between local and remote panels
- Sudo mode toggle for privileged file operations
- New i18n keys for EN/RU/ZH

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 16:30:56 -05:00

798 lines
30 KiB
Python

"""
Files tab — dual-pane SFTP file manager.
"""
import os
import platform
import stat
import string
import threading
from datetime import datetime
from tkinter import messagebox, filedialog
import customtkinter as ctk
from core.i18n import t
from core.ssh_client import SFTPSession
from gui.widgets.file_list import FileListWidget
def _format_size(size_bytes: int) -> str:
if size_bytes < 0:
return ""
for unit in ("B", "KB", "MB", "GB"):
if size_bytes < 1024:
return f"{size_bytes:.1f} {unit}" if unit != "B" else f"{size_bytes} B"
size_bytes /= 1024
return f"{size_bytes:.1f} TB"
def _format_date(mtime: float) -> str:
try:
dt = datetime.fromtimestamp(mtime)
return dt.strftime("%b %d %H:%M")
except Exception:
return ""
def _format_perm(mode: int) -> str:
parts = []
for who in (6, 3, 0):
m = (mode >> who) & 7
parts.append(
("r" if m & 4 else "-")
+ ("w" if m & 2 else "-")
+ ("x" if m & 1 else "-")
)
return "".join(parts)
def _get_windows_drives() -> list[str]:
"""Return list of available drive letters on Windows (e.g. ['C:\\', 'D:\\'])."""
drives = []
for letter in string.ascii_uppercase:
drive = f"{letter}:\\"
if os.path.exists(drive):
drives.append(drive)
return drives
class FilesTab(ctk.CTkFrame):
def __init__(self, master, store):
super().__init__(master, fg_color="transparent")
self.store = store
self._current_alias: str | None = None
self._sftp: SFTPSession | None = None
self._local_path = os.path.expanduser("~")
self._remote_path = "/"
self._local_history: list[str] = []
self._remote_history: list[str] = []
self._transferring = False
self._sudo_var = ctk.BooleanVar(value=False)
self._build_ui()
self._refresh_local()
def _build_ui(self):
# === Panes area ===
panes = ctk.CTkFrame(self, fg_color="transparent")
panes.pack(fill="both", expand=True, padx=10, pady=(10, 5))
# Left pane — Local
left_pane = ctk.CTkFrame(panes, fg_color="transparent")
left_pane.pack(side="left", fill="both", expand=True, padx=(0, 5))
left_header = ctk.CTkFrame(left_pane, fg_color="transparent")
left_header.pack(fill="x", pady=(0, 4))
ctk.CTkLabel(left_header, text=t("local_files"),
font=ctk.CTkFont(size=13, weight="bold")).pack(side="left")
self._local_back_btn = ctk.CTkButton(
left_header, text="\u2190", width=30, height=28,
command=self._local_go_back,
)
self._local_back_btn.pack(side="left", padx=(8, 2))
self._local_up_btn = ctk.CTkButton(
left_header, text="\u2191", width=30, height=28,
command=self._local_go_up,
)
self._local_up_btn.pack(side="left", padx=2)
# Local refresh button
self._local_refresh_btn = ctk.CTkButton(
left_header, text="\u21BB", width=30, height=28,
command=self._refresh_local,
)
self._local_refresh_btn.pack(side="left", padx=2)
# Browse button
self._browse_btn = ctk.CTkButton(
left_header, text=t("browse"), width=60, height=28,
command=self._browse_local,
)
self._browse_btn.pack(side="left", padx=2)
# Windows drive selector
self._drive_menu = None
if platform.system() == "Windows":
drives = _get_windows_drives()
if drives:
current_drive = os.path.splitdrive(self._local_path)[0] + "\\"
self._drive_var = ctk.StringVar(value=current_drive)
self._drive_menu = ctk.CTkOptionMenu(
left_header, values=drives, variable=self._drive_var,
width=65, height=28, command=self._on_drive_change,
)
self._drive_menu.pack(side="left", padx=2)
self._local_path_entry = ctk.CTkEntry(left_header, height=28)
self._local_path_entry.pack(side="left", fill="x", expand=True, padx=(4, 0))
self._local_path_entry.bind("<Return>", lambda e: self._local_go_to_path())
self._local_list = FileListWidget(
left_pane,
columns=[(t("name_col"), 220), (t("size_col"), 80), (t("date_col"), 110)],
on_navigate=self._navigate_local,
on_drop=self._on_drop_to_local,
)
self._local_list.pack(fill="both", expand=True)
self._local_status = ctk.CTkLabel(
left_pane, text="", font=ctk.CTkFont(size=11),
text_color="#9ca3af", anchor="w",
)
self._local_status.pack(fill="x", pady=(2, 0))
# Right pane — Remote
right_pane = ctk.CTkFrame(panes, fg_color="transparent")
right_pane.pack(side="right", fill="both", expand=True, padx=(5, 0))
right_header = ctk.CTkFrame(right_pane, fg_color="transparent")
right_header.pack(fill="x", pady=(0, 4))
ctk.CTkLabel(right_header, text=t("remote_files"),
font=ctk.CTkFont(size=13, weight="bold")).pack(side="left")
self._remote_back_btn = ctk.CTkButton(
right_header, text="\u2190", width=30, height=28,
command=self._remote_go_back,
)
self._remote_back_btn.pack(side="left", padx=(8, 2))
self._remote_up_btn = ctk.CTkButton(
right_header, text="\u2191", width=30, height=28,
command=self._remote_go_up,
)
self._remote_up_btn.pack(side="left", padx=2)
self._remote_refresh_btn = ctk.CTkButton(
right_header, text="\u21BB", width=30, height=28,
command=self._refresh_remote,
)
self._remote_refresh_btn.pack(side="left", padx=2)
self._remote_path_entry = ctk.CTkEntry(right_header, height=28)
self._remote_path_entry.pack(side="left", fill="x", expand=True, padx=(4, 0))
self._remote_path_entry.bind("<Return>", lambda e: self._remote_go_to_path())
self._remote_list = FileListWidget(
right_pane,
columns=[
(t("name_col"), 200), (t("size_col"), 70),
(t("date_col"), 100), (t("perm_col"), 80),
],
on_navigate=self._navigate_remote,
on_drop=self._on_drop_to_remote,
)
self._remote_list.pack(fill="both", expand=True)
self._remote_status = ctk.CTkLabel(
right_pane, text=t("connect_to_browse"),
font=ctk.CTkFont(size=11), text_color="#9ca3af", anchor="w",
)
self._remote_status.pack(fill="x", pady=(2, 0))
# Wire drag-and-drop partners
self._local_list.set_drag_partner(self._remote_list)
self._remote_list.set_drag_partner(self._local_list)
# === Toolbar ===
toolbar = ctk.CTkFrame(self, fg_color="transparent")
toolbar.pack(fill="x", padx=10, pady=4)
self._upload_btn = ctk.CTkButton(
toolbar, text=f"{t('upload')} \u2192", width=110, height=30,
command=self._upload_selected,
)
self._upload_btn.pack(side="left", padx=(0, 4))
self._download_btn = ctk.CTkButton(
toolbar, text=f"\u2190 {t('download')}", width=110, height=30,
command=self._download_selected,
)
self._download_btn.pack(side="left", padx=4)
sep = ctk.CTkFrame(toolbar, width=2, height=24, fg_color="gray40")
sep.pack(side="left", padx=8)
self._mkdir_btn = ctk.CTkButton(
toolbar, text=t("new_folder"), width=100, height=30,
command=self._mkdir_remote,
)
self._mkdir_btn.pack(side="left", padx=4)
self._delete_btn = ctk.CTkButton(
toolbar, text=t("delete_files"), width=80, height=30,
fg_color="#dc2626", hover_color="#b91c1c",
command=self._delete_remote,
)
self._delete_btn.pack(side="left", padx=4)
self._rename_btn = ctk.CTkButton(
toolbar, text=t("rename_file"), width=100, height=30,
command=self._rename_remote,
)
self._rename_btn.pack(side="left", padx=4)
# Sudo toggle
sep2 = ctk.CTkFrame(toolbar, width=2, height=24, fg_color="gray40")
sep2.pack(side="left", padx=8)
self._sudo_switch = ctk.CTkSwitch(
toolbar, text=t("sudo_mode"), variable=self._sudo_var,
width=50, height=24, command=self._on_sudo_toggle,
)
self._sudo_switch.pack(side="left", padx=4)
self._set_remote_buttons_state("disabled")
# === Transfer / Log area ===
transfer_frame = ctk.CTkFrame(self, fg_color="transparent")
transfer_frame.pack(fill="x", padx=10, pady=(2, 2))
self._progress = ctk.CTkProgressBar(transfer_frame, height=14)
self._progress.pack(fill="x")
self._progress.set(0)
self._transfer_label = ctk.CTkLabel(
transfer_frame, text="", font=ctk.CTkFont(size=11),
text_color="#9ca3af", anchor="w",
)
self._transfer_label.pack(fill="x")
self._log = ctk.CTkTextbox(
self, height=80, font=ctk.CTkFont(family="Consolas", size=11),
state="disabled",
)
self._log.pack(fill="x", padx=10, pady=(0, 10))
# ── Server selection ──
def set_server(self, alias: str | None):
if self._current_alias == alias:
return
self._disconnect_sftp()
self._current_alias = alias
if alias:
self._connect_sftp()
else:
self._remote_list.populate([])
self._remote_status.configure(text=t("connect_to_browse"))
self._set_remote_buttons_state("disabled")
# ── SFTP connection ──
def _connect_sftp(self):
server = self.store.get_server(self._current_alias)
if not server:
return
self._remote_status.configure(text=t("connecting_sftp"))
self._set_remote_buttons_state("disabled")
def _do():
try:
sftp = SFTPSession(server, self.store.get_ssh_key_path())
sftp.connect()
home = sftp.normalize(".")
self.after(0, lambda: self._on_sftp_connected(sftp, home))
except Exception as e:
self.after(0, lambda: self._on_sftp_error(str(e)))
threading.Thread(target=_do, daemon=True).start()
def _on_sftp_connected(self, sftp: SFTPSession, home: str):
self._sftp = sftp
self._sftp.sudo_mode = self._sudo_var.get()
self._remote_path = home
self._remote_history.clear()
self._remote_status.configure(
text=t("connected_sftp").format(alias=self._current_alias)
)
self._set_remote_buttons_state("normal")
self._refresh_remote()
def _on_sftp_error(self, error: str):
self._remote_status.configure(text=t("sftp_error").format(e=error))
self._log_msg(t("sftp_error").format(e=error))
def _disconnect_sftp(self):
if self._sftp:
try:
self._sftp.disconnect()
except Exception:
pass
self._sftp = None
# ── Browse / Drive ──
def _browse_local(self):
path = filedialog.askdirectory(initialdir=self._local_path)
if path:
self._local_history.append(self._local_path)
self._local_path = os.path.normpath(path)
self._refresh_local()
def _on_drive_change(self, drive: str):
self._local_history.append(self._local_path)
self._local_path = drive
self._refresh_local()
def _on_sudo_toggle(self):
if self._sftp:
self._sftp.sudo_mode = self._sudo_var.get()
self._refresh_remote()
# ── Local navigation ──
def _refresh_local(self):
self._local_path_entry.delete(0, "end")
self._local_path_entry.insert(0, self._local_path)
# Sync drive selector
if self._drive_menu and platform.system() == "Windows":
current_drive = os.path.splitdrive(self._local_path)[0] + "\\"
self._drive_var.set(current_drive)
try:
entries = os.listdir(self._local_path)
except PermissionError:
self._log_msg(t("permission_denied").format(path=self._local_path))
entries = []
items = []
# Parent dir
parent = os.path.dirname(self._local_path)
if parent != self._local_path:
items.append({"name": "..", "size": "", "date": "", "is_dir": True})
for name in entries:
full = os.path.join(self._local_path, name)
try:
st = os.stat(full)
is_dir = os.path.isdir(full)
items.append({
"name": name,
"size": "" if is_dir else _format_size(st.st_size),
"date": _format_date(st.st_mtime),
"is_dir": is_dir,
})
except (PermissionError, OSError):
items.append({
"name": name, "size": "?", "date": "?", "is_dir": False,
})
self._local_list.populate(items)
count = len([i for i in items if i["name"] != ".."])
self._local_status.configure(text=t("items_count").format(count=count))
def _navigate_local(self, name: str):
if name == "..":
self._local_go_up()
return
target = os.path.join(self._local_path, name)
if os.path.isdir(target):
self._local_history.append(self._local_path)
self._local_path = os.path.normpath(target)
self._refresh_local()
def _local_go_up(self):
parent = os.path.dirname(self._local_path)
if parent != self._local_path:
self._local_history.append(self._local_path)
self._local_path = parent
self._refresh_local()
def _local_go_back(self):
if self._local_history:
self._local_path = self._local_history.pop()
self._refresh_local()
def _local_go_to_path(self):
path = self._local_path_entry.get().strip()
if path and os.path.isdir(path):
self._local_history.append(self._local_path)
self._local_path = os.path.normpath(path)
self._refresh_local()
# ── Remote navigation ──
def _refresh_remote(self):
if not self._sftp or not self._sftp.connected:
return
self._remote_path_entry.delete(0, "end")
self._remote_path_entry.insert(0, self._remote_path)
def _do():
try:
if self._sftp.sudo_mode:
try:
attrs = self._sftp.listdir_attr_sudo(self._remote_path)
except Exception:
attrs = self._sftp.listdir_attr(self._remote_path)
else:
attrs = self._sftp.listdir_attr(self._remote_path)
items = []
# Parent dir
if self._remote_path != "/":
items.append({
"name": "..", "size": "", "date": "", "perm": "", "is_dir": True,
})
for a in attrs:
is_dir = stat.S_ISDIR(a.st_mode) if a.st_mode else False
items.append({
"name": a.filename,
"size": "" if is_dir else _format_size(a.st_size or 0),
"date": _format_date(a.st_mtime or 0),
"perm": _format_perm(a.st_mode & 0o777) if a.st_mode else "",
"is_dir": is_dir,
})
self.after(0, lambda: self._populate_remote(items))
except PermissionError as e:
hint = f"\n{t('try_sudo_hint')}" if not self._sftp.sudo_mode else ""
self.after(0, lambda: self._on_sftp_error(str(e) + hint))
except Exception as e:
self.after(0, lambda: self._on_sftp_error(str(e)))
threading.Thread(target=_do, daemon=True).start()
def _populate_remote(self, items: list[dict]):
self._remote_list.populate(items)
count = len([i for i in items if i["name"] != ".."])
self._remote_status.configure(text=t("items_count").format(count=count))
def _navigate_remote(self, name: str):
if name == "..":
self._remote_go_up()
return
if self._remote_path == "/":
target = "/" + name
else:
target = self._remote_path + "/" + name
self._remote_history.append(self._remote_path)
self._remote_path = target
self._refresh_remote()
def _remote_go_up(self):
if self._remote_path == "/":
return
parent = "/".join(self._remote_path.rstrip("/").split("/")[:-1]) or "/"
self._remote_history.append(self._remote_path)
self._remote_path = parent
self._refresh_remote()
def _remote_go_back(self):
if self._remote_history:
self._remote_path = self._remote_history.pop()
self._refresh_remote()
def _remote_go_to_path(self):
if not self._sftp or not self._sftp.connected:
return
path = self._remote_path_entry.get().strip()
if path:
self._remote_history.append(self._remote_path)
self._remote_path = path
self._refresh_remote()
# ── Remote path helper ──
def _remote_join(self, name: str) -> str:
if self._remote_path == "/":
return "/" + name
return self._remote_path + "/" + name
# ── Upload / Download (shared logic) ──
def _do_upload(self, items: list[dict]):
"""Upload files and folders from local to remote. Runs in background thread."""
if not self._sftp or not self._sftp.connected or self._transferring:
return
items = [s for s in items if s["name"] != ".."]
if not items:
return
self._transferring = True
self._upload_btn.configure(state="disabled")
self._download_btn.configure(state="disabled")
def _do():
for item in items:
name = item["name"]
local = os.path.join(self._local_path, name)
remote = self._remote_join(name)
try:
if item.get("is_dir"):
self.after(0, lambda n=name: self._transfer_label.configure(
text=t("uploading_dir").format(name=n)
))
def _file_cb(cur, total, fname, n=name):
self.after(0, lambda c=cur, tt=total, fn=fname:
self._transfer_label.configure(
text=t("transfer_file_progress").format(cur=c, total=tt, name=fn)
))
def _progress(transferred, total):
if total > 0:
self.after(0, lambda f=transferred/total: self._progress.set(f))
self._sftp.upload_dir(local, remote, progress_cb=_progress, file_cb=_file_cb)
else:
file_size = os.path.getsize(local)
self.after(0, lambda n=name: self._transfer_label.configure(
text=t("uploading").format(name=n)
))
def _progress(transferred, total, n=name):
if total > 0:
frac = transferred / total
size_str = f"{_format_size(transferred)}/{_format_size(total)}"
self.after(0, lambda f=frac, s=size_str, nn=n:
self._update_progress(f, t("uploading").format(name=nn) + f" ({s})"))
if self._sftp.sudo_mode:
try:
self._sftp.upload_sudo(local, remote, progress_cb=_progress)
except Exception:
self._sftp.upload(local, remote, progress_cb=_progress)
else:
self._sftp.upload(local, remote, progress_cb=_progress)
self.after(0, lambda n=name: self._log_msg(
t("transfer_done").format(name=n)
))
except PermissionError as e:
hint = f" - {t('try_sudo_hint')}" if not self._sftp.sudo_mode else ""
self.after(0, lambda e=e, h=hint: self._log_msg(
t("transfer_failed").format(e=str(e)) + h
))
except Exception as e:
self.after(0, lambda e=e: self._log_msg(
t("transfer_failed").format(e=str(e))
))
self.after(0, self._on_transfer_done)
threading.Thread(target=_do, daemon=True).start()
def _do_download(self, items: list[dict]):
"""Download files and folders from remote to local. Runs in background thread."""
if not self._sftp or not self._sftp.connected or self._transferring:
return
items = [s for s in items if s["name"] != ".."]
if not items:
return
self._transferring = True
self._upload_btn.configure(state="disabled")
self._download_btn.configure(state="disabled")
def _do():
for item in items:
name = item["name"]
remote = self._remote_join(name)
local = os.path.join(self._local_path, name)
try:
if item.get("is_dir"):
self.after(0, lambda n=name: self._transfer_label.configure(
text=t("downloading_dir").format(name=n)
))
os.makedirs(local, exist_ok=True)
def _file_cb(cur, total, fname, n=name):
self.after(0, lambda c=cur, tt=total, fn=fname:
self._transfer_label.configure(
text=t("transfer_file_progress").format(cur=c, total=tt, name=fn)
))
def _progress(transferred, total):
if total > 0:
self.after(0, lambda f=transferred/total: self._progress.set(f))
self._sftp.download_dir(remote, local, progress_cb=_progress, file_cb=_file_cb)
else:
self.after(0, lambda n=name: self._transfer_label.configure(
text=t("downloading").format(name=n)
))
def _progress(transferred, total, n=name):
if total > 0:
frac = transferred / total
size_str = f"{_format_size(transferred)}/{_format_size(total)}"
self.after(0, lambda f=frac, s=size_str, nn=n:
self._update_progress(f, t("downloading").format(name=nn) + f" ({s})"))
if self._sftp.sudo_mode:
try:
self._sftp.download_sudo(remote, local, progress_cb=_progress)
except Exception:
self._sftp.download(remote, local, progress_cb=_progress)
else:
self._sftp.download(remote, local, progress_cb=_progress)
self.after(0, lambda n=name: self._log_msg(
t("transfer_done").format(name=n)
))
except PermissionError as e:
hint = f" - {t('try_sudo_hint')}" if not self._sftp.sudo_mode else ""
self.after(0, lambda e=e, h=hint: self._log_msg(
t("transfer_failed").format(e=str(e)) + h
))
except Exception as e:
self.after(0, lambda e=e: self._log_msg(
t("transfer_failed").format(e=str(e))
))
self.after(0, self._on_transfer_done)
threading.Thread(target=_do, daemon=True).start()
# ── Button handlers ──
def _upload_selected(self):
if not self._sftp or not self._sftp.connected or self._transferring:
return
selected = self._local_list.get_selected()
items = [s for s in selected if s["name"] != ".."]
if not items:
self._log_msg(t("no_server_selected"))
return
self._do_upload(items)
def _download_selected(self):
if not self._sftp or not self._sftp.connected or self._transferring:
return
selected = self._remote_list.get_selected()
items = [s for s in selected if s["name"] != ".."]
if not items:
return
self._do_download(items)
# ── Drag-and-drop callbacks ──
def _on_drop_to_remote(self, items: list[dict], source):
"""Called when items are dropped onto remote panel (upload)."""
self._do_upload(items)
def _on_drop_to_local(self, items: list[dict], source):
"""Called when items are dropped onto local panel (download)."""
self._do_download(items)
def _on_transfer_done(self):
self._transferring = False
self._progress.set(0)
self._transfer_label.configure(text="")
self._upload_btn.configure(state="normal")
self._download_btn.configure(state="normal")
self._refresh_local()
self._refresh_remote()
def _update_progress(self, fraction: float, text: str):
self._progress.set(fraction)
self._transfer_label.configure(text=text)
def _mkdir_remote(self):
if not self._sftp or not self._sftp.connected:
return
dialog = ctk.CTkInputDialog(
text=t("new_folder_name"), title=t("new_folder"),
)
name = dialog.get_input()
if not name or not name.strip():
return
name = name.strip()
path = self._remote_join(name)
def _do():
try:
self._sftp.mkdir(path)
self.after(0, self._refresh_remote)
except Exception as e:
self.after(0, lambda: self._log_msg(t("sftp_error").format(e=str(e))))
threading.Thread(target=_do, daemon=True).start()
def _delete_remote(self):
if not self._sftp or not self._sftp.connected:
return
selected = self._remote_list.get_selected()
items = [s for s in selected if s["name"] != ".."]
if not items:
return
# Check for directories — use recursive delete confirm
has_dirs = any(s.get("is_dir") for s in items)
if has_dirs and len(items) == 1 and items[0].get("is_dir"):
if not messagebox.askyesno(
t("delete_files"),
t("recursive_delete_confirm").format(name=items[0]["name"]),
):
return
else:
if not messagebox.askyesno(
t("delete_files"),
t("delete_files_confirm").format(count=len(items)),
):
return
def _do():
for item in items:
name = item["name"]
path = self._remote_join(name)
try:
if item.get("is_dir"):
self._sftp.rmdir_recursive(path)
else:
self._sftp.remove(path)
except Exception as e:
self.after(0, lambda: self._log_msg(
t("sftp_error").format(e=str(e))
))
self.after(0, self._refresh_remote)
threading.Thread(target=_do, daemon=True).start()
def _rename_remote(self):
if not self._sftp or not self._sftp.connected:
return
selected = self._remote_list.get_selected()
if not selected or selected[0]["name"] == "..":
return
old_name = selected[0]["name"]
dialog = ctk.CTkInputDialog(
text=t("rename_prompt"), title=t("rename_file"),
)
new_name = dialog.get_input()
if not new_name or not new_name.strip() or new_name.strip() == old_name:
return
new_name = new_name.strip()
old_path = self._remote_join(old_name)
new_path = self._remote_join(new_name)
def _do():
try:
self._sftp.rename(old_path, new_path)
self.after(0, self._refresh_remote)
except Exception as e:
self.after(0, lambda: self._log_msg(t("sftp_error").format(e=str(e))))
threading.Thread(target=_do, daemon=True).start()
# ── Helpers ──
def _set_remote_buttons_state(self, state: str):
for btn in (
self._upload_btn, self._download_btn,
self._mkdir_btn, self._delete_btn, self._rename_btn,
self._remote_refresh_btn, self._remote_back_btn, self._remote_up_btn,
):
btn.configure(state=state)
def _log_msg(self, text: str):
self._log.configure(state="normal")
self._log.insert("end", text + "\n")
self._log.configure(state="disabled")
self._log.see("end")