""" Files tab — dual-pane SFTP file manager. """ import os import stat import threading from datetime import datetime from tkinter import messagebox import customtkinter as ctk from core.i18n import t from core.ssh_client import SFTPSession from gui.widgets.file_list import FileListWidget def _format_size(size_bytes: int) -> str: if size_bytes < 0: return "" for unit in ("B", "KB", "MB", "GB"): if size_bytes < 1024: return f"{size_bytes:.1f} {unit}" if unit != "B" else f"{size_bytes} B" size_bytes /= 1024 return f"{size_bytes:.1f} TB" def _format_date(mtime: float) -> str: try: dt = datetime.fromtimestamp(mtime) return dt.strftime("%b %d %H:%M") except Exception: return "" def _format_perm(mode: int) -> str: parts = [] for who in (6, 3, 0): m = (mode >> who) & 7 parts.append( ("r" if m & 4 else "-") + ("w" if m & 2 else "-") + ("x" if m & 1 else "-") ) return "".join(parts) class FilesTab(ctk.CTkFrame): def __init__(self, master, store): super().__init__(master, fg_color="transparent") self.store = store self._current_alias: str | None = None self._sftp: SFTPSession | None = None self._local_path = os.path.expanduser("~") self._remote_path = "/" self._local_history: list[str] = [] self._remote_history: list[str] = [] self._transferring = False self._build_ui() self._refresh_local() def _build_ui(self): # === Panes area === panes = ctk.CTkFrame(self, fg_color="transparent") panes.pack(fill="both", expand=True, padx=10, pady=(10, 5)) # Left pane — Local left_pane = ctk.CTkFrame(panes, fg_color="transparent") left_pane.pack(side="left", fill="both", expand=True, padx=(0, 5)) left_header = ctk.CTkFrame(left_pane, fg_color="transparent") left_header.pack(fill="x", pady=(0, 4)) ctk.CTkLabel(left_header, text=t("local_files"), font=ctk.CTkFont(size=13, weight="bold")).pack(side="left") self._local_back_btn = ctk.CTkButton( left_header, text="\u2190", width=30, height=28, command=self._local_go_back, ) self._local_back_btn.pack(side="left", padx=(8, 2)) self._local_up_btn = ctk.CTkButton( left_header, text="\u2191", width=30, height=28, command=self._local_go_up, ) self._local_up_btn.pack(side="left", padx=2) self._local_path_entry = ctk.CTkEntry(left_header, height=28) self._local_path_entry.pack(side="left", fill="x", expand=True, padx=(4, 0)) self._local_path_entry.bind("", lambda e: self._local_go_to_path()) self._local_list = FileListWidget( left_pane, columns=[(t("name_col"), 220), (t("size_col"), 80), (t("date_col"), 110)], on_navigate=self._navigate_local, ) self._local_list.pack(fill="both", expand=True) self._local_status = ctk.CTkLabel( left_pane, text="", font=ctk.CTkFont(size=11), text_color="#9ca3af", anchor="w", ) self._local_status.pack(fill="x", pady=(2, 0)) # Right pane — Remote right_pane = ctk.CTkFrame(panes, fg_color="transparent") right_pane.pack(side="right", fill="both", expand=True, padx=(5, 0)) right_header = ctk.CTkFrame(right_pane, fg_color="transparent") right_header.pack(fill="x", pady=(0, 4)) ctk.CTkLabel(right_header, text=t("remote_files"), font=ctk.CTkFont(size=13, weight="bold")).pack(side="left") self._remote_back_btn = ctk.CTkButton( right_header, text="\u2190", width=30, height=28, command=self._remote_go_back, ) self._remote_back_btn.pack(side="left", padx=(8, 2)) self._remote_up_btn = ctk.CTkButton( right_header, text="\u2191", width=30, height=28, command=self._remote_go_up, ) self._remote_up_btn.pack(side="left", padx=2) self._remote_refresh_btn = ctk.CTkButton( right_header, text="\u21BB", width=30, height=28, command=self._refresh_remote, ) self._remote_refresh_btn.pack(side="left", padx=2) self._remote_path_entry = ctk.CTkEntry(right_header, height=28) self._remote_path_entry.pack(side="left", fill="x", expand=True, padx=(4, 0)) self._remote_path_entry.bind("", lambda e: self._remote_go_to_path()) self._remote_list = FileListWidget( right_pane, columns=[ (t("name_col"), 200), (t("size_col"), 70), (t("date_col"), 100), (t("perm_col"), 80), ], on_navigate=self._navigate_remote, ) self._remote_list.pack(fill="both", expand=True) self._remote_status = ctk.CTkLabel( right_pane, text=t("connect_to_browse"), font=ctk.CTkFont(size=11), text_color="#9ca3af", anchor="w", ) self._remote_status.pack(fill="x", pady=(2, 0)) # === Toolbar === toolbar = ctk.CTkFrame(self, fg_color="transparent") toolbar.pack(fill="x", padx=10, pady=4) self._upload_btn = ctk.CTkButton( toolbar, text=f"{t('upload')} \u2192", width=110, height=30, command=self._upload_selected, ) self._upload_btn.pack(side="left", padx=(0, 4)) self._download_btn = ctk.CTkButton( toolbar, text=f"\u2190 {t('download')}", width=110, height=30, command=self._download_selected, ) self._download_btn.pack(side="left", padx=4) sep = ctk.CTkFrame(toolbar, width=2, height=24, fg_color="gray40") sep.pack(side="left", padx=8) self._mkdir_btn = ctk.CTkButton( toolbar, text=t("new_folder"), width=100, height=30, command=self._mkdir_remote, ) self._mkdir_btn.pack(side="left", padx=4) self._delete_btn = ctk.CTkButton( toolbar, text=t("delete_files"), width=80, height=30, fg_color="#dc2626", hover_color="#b91c1c", command=self._delete_remote, ) self._delete_btn.pack(side="left", padx=4) self._rename_btn = ctk.CTkButton( toolbar, text=t("rename_file"), width=100, height=30, command=self._rename_remote, ) self._rename_btn.pack(side="left", padx=4) self._set_remote_buttons_state("disabled") # === Transfer / Log area === transfer_frame = ctk.CTkFrame(self, fg_color="transparent") transfer_frame.pack(fill="x", padx=10, pady=(2, 2)) self._progress = ctk.CTkProgressBar(transfer_frame, height=14) self._progress.pack(fill="x") self._progress.set(0) self._transfer_label = ctk.CTkLabel( transfer_frame, text="", font=ctk.CTkFont(size=11), text_color="#9ca3af", anchor="w", ) self._transfer_label.pack(fill="x") self._log = ctk.CTkTextbox( self, height=80, font=ctk.CTkFont(family="Consolas", size=11), state="disabled", ) self._log.pack(fill="x", padx=10, pady=(0, 10)) # ── Server selection ── def set_server(self, alias: str | None): if self._current_alias == alias: return self._disconnect_sftp() self._current_alias = alias if alias: self._connect_sftp() else: self._remote_list.populate([]) self._remote_status.configure(text=t("connect_to_browse")) self._set_remote_buttons_state("disabled") # ── SFTP connection ── def _connect_sftp(self): server = self.store.get_server(self._current_alias) if not server: return self._remote_status.configure(text=t("connecting_sftp")) self._set_remote_buttons_state("disabled") def _do(): try: sftp = SFTPSession(server, self.store.get_ssh_key_path()) sftp.connect() home = sftp.normalize(".") self.after(0, lambda: self._on_sftp_connected(sftp, home)) except Exception as e: self.after(0, lambda: self._on_sftp_error(str(e))) threading.Thread(target=_do, daemon=True).start() def _on_sftp_connected(self, sftp: SFTPSession, home: str): self._sftp = sftp self._remote_path = home self._remote_history.clear() self._remote_status.configure( text=t("connected_sftp").format(alias=self._current_alias) ) self._set_remote_buttons_state("normal") self._refresh_remote() def _on_sftp_error(self, error: str): self._remote_status.configure(text=t("sftp_error").format(e=error)) self._log_msg(t("sftp_error").format(e=error)) def _disconnect_sftp(self): if self._sftp: try: self._sftp.disconnect() except Exception: pass self._sftp = None # ── Local navigation ── def _refresh_local(self): self._local_path_entry.delete(0, "end") self._local_path_entry.insert(0, self._local_path) try: entries = os.listdir(self._local_path) except PermissionError: self._log_msg(t("permission_denied").format(path=self._local_path)) entries = [] items = [] # Parent dir parent = os.path.dirname(self._local_path) if parent != self._local_path: items.append({"name": "..", "size": "", "date": "", "is_dir": True}) for name in entries: full = os.path.join(self._local_path, name) try: st = os.stat(full) is_dir = os.path.isdir(full) items.append({ "name": name, "size": "" if is_dir else _format_size(st.st_size), "date": _format_date(st.st_mtime), "is_dir": is_dir, }) except (PermissionError, OSError): items.append({ "name": name, "size": "?", "date": "?", "is_dir": False, }) self._local_list.populate(items) count = len([i for i in items if i["name"] != ".."]) self._local_status.configure(text=t("items_count").format(count=count)) def _navigate_local(self, name: str): if name == "..": self._local_go_up() return target = os.path.join(self._local_path, name) if os.path.isdir(target): self._local_history.append(self._local_path) self._local_path = os.path.normpath(target) self._refresh_local() def _local_go_up(self): parent = os.path.dirname(self._local_path) if parent != self._local_path: self._local_history.append(self._local_path) self._local_path = parent self._refresh_local() def _local_go_back(self): if self._local_history: self._local_path = self._local_history.pop() self._refresh_local() def _local_go_to_path(self): path = self._local_path_entry.get().strip() if path and os.path.isdir(path): self._local_history.append(self._local_path) self._local_path = os.path.normpath(path) self._refresh_local() # ── Remote navigation ── def _refresh_remote(self): if not self._sftp or not self._sftp.connected: return self._remote_path_entry.delete(0, "end") self._remote_path_entry.insert(0, self._remote_path) def _do(): try: attrs = self._sftp.listdir_attr(self._remote_path) items = [] # Parent dir if self._remote_path != "/": items.append({ "name": "..", "size": "", "date": "", "perm": "", "is_dir": True, }) for a in attrs: is_dir = stat.S_ISDIR(a.st_mode) if a.st_mode else False items.append({ "name": a.filename, "size": "" if is_dir else _format_size(a.st_size or 0), "date": _format_date(a.st_mtime or 0), "perm": _format_perm(a.st_mode & 0o777) if a.st_mode else "", "is_dir": is_dir, }) self.after(0, lambda: self._populate_remote(items)) except Exception as e: self.after(0, lambda: self._on_sftp_error(str(e))) threading.Thread(target=_do, daemon=True).start() def _populate_remote(self, items: list[dict]): self._remote_list.populate(items) count = len([i for i in items if i["name"] != ".."]) self._remote_status.configure(text=t("items_count").format(count=count)) def _navigate_remote(self, name: str): if name == "..": self._remote_go_up() return if self._remote_path == "/": target = "/" + name else: target = self._remote_path + "/" + name self._remote_history.append(self._remote_path) self._remote_path = target self._refresh_remote() def _remote_go_up(self): if self._remote_path == "/": return parent = "/".join(self._remote_path.rstrip("/").split("/")[:-1]) or "/" self._remote_history.append(self._remote_path) self._remote_path = parent self._refresh_remote() def _remote_go_back(self): if self._remote_history: self._remote_path = self._remote_history.pop() self._refresh_remote() def _remote_go_to_path(self): if not self._sftp or not self._sftp.connected: return path = self._remote_path_entry.get().strip() if path: self._remote_history.append(self._remote_path) self._remote_path = path self._refresh_remote() # ── File operations ── def _upload_selected(self): if not self._sftp or not self._sftp.connected or self._transferring: return selected = self._local_list.get_selected() files = [s for s in selected if not s.get("is_dir") and s["name"] != ".."] if not files: self._log_msg(t("no_server_selected")) return self._transferring = True self._upload_btn.configure(state="disabled") self._download_btn.configure(state="disabled") def _do(): for item in files: name = item["name"] local = os.path.join(self._local_path, name) if self._remote_path == "/": remote = "/" + name else: remote = self._remote_path + "/" + name try: file_size = os.path.getsize(local) self.after(0, lambda n=name: self._transfer_label.configure( text=t("uploading").format(name=n) )) def _progress(transferred, total, n=name): if total > 0: frac = transferred / total size_str = f"{_format_size(transferred)}/{_format_size(total)}" self.after(0, lambda f=frac, s=size_str, nn=n: self._update_progress(f, t("uploading").format(name=nn) + f" ({s})")) self._sftp.upload(local, remote, progress_cb=_progress) self.after(0, lambda n=name: self._log_msg( t("transfer_done").format(name=n) )) except Exception as e: self.after(0, lambda e=e: self._log_msg( t("transfer_failed").format(e=str(e)) )) self.after(0, self._on_transfer_done) threading.Thread(target=_do, daemon=True).start() def _download_selected(self): if not self._sftp or not self._sftp.connected or self._transferring: return selected = self._remote_list.get_selected() files = [s for s in selected if not s.get("is_dir") and s["name"] != ".."] if not files: return self._transferring = True self._upload_btn.configure(state="disabled") self._download_btn.configure(state="disabled") def _do(): for item in files: name = item["name"] if self._remote_path == "/": remote = "/" + name else: remote = self._remote_path + "/" + name local = os.path.join(self._local_path, name) try: self.after(0, lambda n=name: self._transfer_label.configure( text=t("downloading").format(name=n) )) def _progress(transferred, total, n=name): if total > 0: frac = transferred / total size_str = f"{_format_size(transferred)}/{_format_size(total)}" self.after(0, lambda f=frac, s=size_str, nn=n: self._update_progress(f, t("downloading").format(name=nn) + f" ({s})")) self._sftp.download(remote, local, progress_cb=_progress) self.after(0, lambda n=name: self._log_msg( t("transfer_done").format(name=n) )) except Exception as e: self.after(0, lambda e=e: self._log_msg( t("transfer_failed").format(e=str(e)) )) self.after(0, self._on_transfer_done) threading.Thread(target=_do, daemon=True).start() def _on_transfer_done(self): self._transferring = False self._progress.set(0) self._transfer_label.configure(text="") self._upload_btn.configure(state="normal") self._download_btn.configure(state="normal") self._refresh_local() self._refresh_remote() def _update_progress(self, fraction: float, text: str): self._progress.set(fraction) self._transfer_label.configure(text=text) def _mkdir_remote(self): if not self._sftp or not self._sftp.connected: return dialog = ctk.CTkInputDialog( text=t("new_folder_name"), title=t("new_folder"), ) name = dialog.get_input() if not name or not name.strip(): return name = name.strip() if self._remote_path == "/": path = "/" + name else: path = self._remote_path + "/" + name def _do(): try: self._sftp.mkdir(path) self.after(0, self._refresh_remote) except Exception as e: self.after(0, lambda: self._log_msg(t("sftp_error").format(e=str(e)))) threading.Thread(target=_do, daemon=True).start() def _delete_remote(self): if not self._sftp or not self._sftp.connected: return selected = self._remote_list.get_selected() items = [s for s in selected if s["name"] != ".."] if not items: return if not messagebox.askyesno( t("delete_files"), t("delete_files_confirm").format(count=len(items)), ): return def _do(): for item in items: name = item["name"] if self._remote_path == "/": path = "/" + name else: path = self._remote_path + "/" + name try: if item.get("is_dir"): self._sftp.rmdir(path) else: self._sftp.remove(path) except Exception as e: self.after(0, lambda: self._log_msg( t("sftp_error").format(e=str(e)) )) self.after(0, self._refresh_remote) threading.Thread(target=_do, daemon=True).start() def _rename_remote(self): if not self._sftp or not self._sftp.connected: return selected = self._remote_list.get_selected() if not selected or selected[0]["name"] == "..": return old_name = selected[0]["name"] dialog = ctk.CTkInputDialog( text=t("rename_prompt"), title=t("rename_file"), ) new_name = dialog.get_input() if not new_name or not new_name.strip() or new_name.strip() == old_name: return new_name = new_name.strip() if self._remote_path == "/": old_path = "/" + old_name new_path = "/" + new_name else: old_path = self._remote_path + "/" + old_name new_path = self._remote_path + "/" + new_name def _do(): try: self._sftp.rename(old_path, new_path) self.after(0, self._refresh_remote) except Exception as e: self.after(0, lambda: self._log_msg(t("sftp_error").format(e=str(e)))) threading.Thread(target=_do, daemon=True).start() # ── Helpers ── def _set_remote_buttons_state(self, state: str): for btn in ( self._upload_btn, self._download_btn, self._mkdir_btn, self._delete_btn, self._rename_btn, self._remote_refresh_btn, self._remote_back_btn, self._remote_up_btn, ): btn.configure(state=state) def _log_msg(self, text: str): self._log.configure(state="normal") self._log.insert("end", text + "\n") self._log.configure(state="disabled") self._log.see("end")