""" Files tab — dual-pane SFTP file manager. """ import os import platform import stat import string import threading from datetime import datetime from tkinter import messagebox, filedialog import customtkinter as ctk from core.i18n import t from core.ssh_client import SFTPSession from gui.widgets.file_list import FileListWidget def _format_size(size_bytes: int) -> str: if size_bytes < 0: return "" for unit in ("B", "KB", "MB", "GB"): if size_bytes < 1024: return f"{size_bytes:.1f} {unit}" if unit != "B" else f"{size_bytes} B" size_bytes /= 1024 return f"{size_bytes:.1f} TB" def _format_date(mtime: float) -> str: try: dt = datetime.fromtimestamp(mtime) return dt.strftime("%b %d %H:%M") except Exception: return "" def _format_perm(mode: int) -> str: parts = [] for who in (6, 3, 0): m = (mode >> who) & 7 parts.append( ("r" if m & 4 else "-") + ("w" if m & 2 else "-") + ("x" if m & 1 else "-") ) return "".join(parts) def _get_windows_drives() -> list[str]: """Return list of available drive letters on Windows (e.g. ['C:\\', 'D:\\']).""" drives = [] for letter in string.ascii_uppercase: drive = f"{letter}:\\" if os.path.exists(drive): drives.append(drive) return drives class FilesTab(ctk.CTkFrame): def __init__(self, master, store): super().__init__(master, fg_color="transparent") self.store = store self._current_alias: str | None = None self._sftp: SFTPSession | None = None self._local_path = os.path.expanduser("~") self._remote_path = "/" self._local_history: list[str] = [] self._remote_history: list[str] = [] self._transferring = False self._sudo_var = ctk.BooleanVar(value=False) self._build_ui() self._refresh_local() def _build_ui(self): # === Panes area === panes = ctk.CTkFrame(self, fg_color="transparent") panes.pack(fill="both", expand=True, padx=10, pady=(10, 5)) # Left pane — Local left_pane = ctk.CTkFrame(panes, fg_color="transparent") left_pane.pack(side="left", fill="both", expand=True, padx=(0, 5)) left_header = ctk.CTkFrame(left_pane, fg_color="transparent") left_header.pack(fill="x", pady=(0, 4)) ctk.CTkLabel(left_header, text=t("local_files"), font=ctk.CTkFont(size=13, weight="bold")).pack(side="left") self._local_back_btn = ctk.CTkButton( left_header, text="\u2190", width=30, height=28, command=self._local_go_back, ) self._local_back_btn.pack(side="left", padx=(8, 2)) self._local_up_btn = ctk.CTkButton( left_header, text="\u2191", width=30, height=28, command=self._local_go_up, ) self._local_up_btn.pack(side="left", padx=2) # Local refresh button self._local_refresh_btn = ctk.CTkButton( left_header, text="\u21BB", width=30, height=28, command=self._refresh_local, ) self._local_refresh_btn.pack(side="left", padx=2) # Browse button self._browse_btn = ctk.CTkButton( left_header, text=t("browse"), width=60, height=28, command=self._browse_local, ) self._browse_btn.pack(side="left", padx=2) # Windows drive selector self._drive_menu = None if platform.system() == "Windows": drives = _get_windows_drives() if drives: current_drive = os.path.splitdrive(self._local_path)[0] + "\\" self._drive_var = ctk.StringVar(value=current_drive) self._drive_menu = ctk.CTkOptionMenu( left_header, values=drives, variable=self._drive_var, width=65, height=28, command=self._on_drive_change, ) self._drive_menu.pack(side="left", padx=2) self._local_path_entry = ctk.CTkEntry(left_header, height=28) self._local_path_entry.pack(side="left", fill="x", expand=True, padx=(4, 0)) self._local_path_entry.bind("", lambda e: self._local_go_to_path()) self._local_list = FileListWidget( left_pane, columns=[(t("name_col"), 220), (t("size_col"), 80), (t("date_col"), 110)], on_navigate=self._navigate_local, on_drop=self._on_drop_to_local, ) self._local_list.pack(fill="both", expand=True) self._local_status = ctk.CTkLabel( left_pane, text="", font=ctk.CTkFont(size=11), text_color="#9ca3af", anchor="w", ) self._local_status.pack(fill="x", pady=(2, 0)) # Right pane — Remote right_pane = ctk.CTkFrame(panes, fg_color="transparent") right_pane.pack(side="right", fill="both", expand=True, padx=(5, 0)) right_header = ctk.CTkFrame(right_pane, fg_color="transparent") right_header.pack(fill="x", pady=(0, 4)) ctk.CTkLabel(right_header, text=t("remote_files"), font=ctk.CTkFont(size=13, weight="bold")).pack(side="left") self._remote_back_btn = ctk.CTkButton( right_header, text="\u2190", width=30, height=28, command=self._remote_go_back, ) self._remote_back_btn.pack(side="left", padx=(8, 2)) self._remote_up_btn = ctk.CTkButton( right_header, text="\u2191", width=30, height=28, command=self._remote_go_up, ) self._remote_up_btn.pack(side="left", padx=2) self._remote_refresh_btn = ctk.CTkButton( right_header, text="\u21BB", width=30, height=28, command=self._refresh_remote, ) self._remote_refresh_btn.pack(side="left", padx=2) self._remote_path_entry = ctk.CTkEntry(right_header, height=28) self._remote_path_entry.pack(side="left", fill="x", expand=True, padx=(4, 0)) self._remote_path_entry.bind("", lambda e: self._remote_go_to_path()) self._remote_list = FileListWidget( right_pane, columns=[ (t("name_col"), 200), (t("size_col"), 70), (t("date_col"), 100), (t("perm_col"), 80), ], on_navigate=self._navigate_remote, on_drop=self._on_drop_to_remote, ) self._remote_list.pack(fill="both", expand=True) self._remote_status = ctk.CTkLabel( right_pane, text=t("connect_to_browse"), font=ctk.CTkFont(size=11), text_color="#9ca3af", anchor="w", ) self._remote_status.pack(fill="x", pady=(2, 0)) # Wire drag-and-drop partners self._local_list.set_drag_partner(self._remote_list) self._remote_list.set_drag_partner(self._local_list) # === Toolbar === toolbar = ctk.CTkFrame(self, fg_color="transparent") toolbar.pack(fill="x", padx=10, pady=4) self._upload_btn = ctk.CTkButton( toolbar, text=f"{t('upload')} \u2192", width=110, height=30, command=self._upload_selected, ) self._upload_btn.pack(side="left", padx=(0, 4)) self._download_btn = ctk.CTkButton( toolbar, text=f"\u2190 {t('download')}", width=110, height=30, command=self._download_selected, ) self._download_btn.pack(side="left", padx=4) sep = ctk.CTkFrame(toolbar, width=2, height=24, fg_color="gray40") sep.pack(side="left", padx=8) self._mkdir_btn = ctk.CTkButton( toolbar, text=t("new_folder"), width=100, height=30, command=self._mkdir_remote, ) self._mkdir_btn.pack(side="left", padx=4) self._delete_btn = ctk.CTkButton( toolbar, text=t("delete_files"), width=80, height=30, fg_color="#dc2626", hover_color="#b91c1c", command=self._delete_remote, ) self._delete_btn.pack(side="left", padx=4) self._rename_btn = ctk.CTkButton( toolbar, text=t("rename_file"), width=100, height=30, command=self._rename_remote, ) self._rename_btn.pack(side="left", padx=4) # Sudo toggle sep2 = ctk.CTkFrame(toolbar, width=2, height=24, fg_color="gray40") sep2.pack(side="left", padx=8) self._sudo_switch = ctk.CTkSwitch( toolbar, text=t("sudo_mode"), variable=self._sudo_var, width=50, height=24, command=self._on_sudo_toggle, ) self._sudo_switch.pack(side="left", padx=4) self._set_remote_buttons_state("disabled") # === Transfer / Log area === transfer_frame = ctk.CTkFrame(self, fg_color="transparent") transfer_frame.pack(fill="x", padx=10, pady=(2, 2)) self._progress = ctk.CTkProgressBar(transfer_frame, height=14) self._progress.pack(fill="x") self._progress.set(0) self._transfer_label = ctk.CTkLabel( transfer_frame, text="", font=ctk.CTkFont(size=11), text_color="#9ca3af", anchor="w", ) self._transfer_label.pack(fill="x") self._log = ctk.CTkTextbox( self, height=80, font=ctk.CTkFont(family="Consolas", size=11), state="disabled", ) self._log.pack(fill="x", padx=10, pady=(0, 10)) # ── Server selection ── def set_server(self, alias: str | None): if self._current_alias == alias: return self._disconnect_sftp() self._current_alias = alias if alias: self._connect_sftp() else: self._remote_list.populate([]) self._remote_status.configure(text=t("connect_to_browse")) self._set_remote_buttons_state("disabled") # ── SFTP connection ── def _connect_sftp(self): server = self.store.get_server(self._current_alias) if not server: return self._remote_status.configure(text=t("connecting_sftp")) self._set_remote_buttons_state("disabled") def _do(): try: sftp = SFTPSession(server, self.store.get_ssh_key_path()) sftp.connect() home = sftp.normalize(".") self.after(0, lambda: self._on_sftp_connected(sftp, home)) except Exception as e: self.after(0, lambda: self._on_sftp_error(str(e))) threading.Thread(target=_do, daemon=True).start() def _on_sftp_connected(self, sftp: SFTPSession, home: str): self._sftp = sftp self._sftp.sudo_mode = self._sudo_var.get() self._remote_path = home self._remote_history.clear() self._remote_status.configure( text=t("connected_sftp").format(alias=self._current_alias) ) self._set_remote_buttons_state("normal") self._refresh_remote() def _on_sftp_error(self, error: str): self._remote_status.configure(text=t("sftp_error").format(e=error)) self._log_msg(t("sftp_error").format(e=error)) def _disconnect_sftp(self): if self._sftp: try: self._sftp.disconnect() except Exception: pass self._sftp = None # ── Browse / Drive ── def _browse_local(self): path = filedialog.askdirectory(initialdir=self._local_path) if path: self._local_history.append(self._local_path) self._local_path = os.path.normpath(path) self._refresh_local() def _on_drive_change(self, drive: str): self._local_history.append(self._local_path) self._local_path = drive self._refresh_local() def _on_sudo_toggle(self): if self._sftp: self._sftp.sudo_mode = self._sudo_var.get() self._refresh_remote() # ── Local navigation ── def _refresh_local(self): self._local_path_entry.delete(0, "end") self._local_path_entry.insert(0, self._local_path) # Sync drive selector if self._drive_menu and platform.system() == "Windows": current_drive = os.path.splitdrive(self._local_path)[0] + "\\" self._drive_var.set(current_drive) try: entries = os.listdir(self._local_path) except PermissionError: self._log_msg(t("permission_denied").format(path=self._local_path)) entries = [] items = [] # Parent dir parent = os.path.dirname(self._local_path) if parent != self._local_path: items.append({"name": "..", "size": "", "date": "", "is_dir": True}) for name in entries: full = os.path.join(self._local_path, name) try: st = os.stat(full) is_dir = os.path.isdir(full) items.append({ "name": name, "size": "" if is_dir else _format_size(st.st_size), "date": _format_date(st.st_mtime), "is_dir": is_dir, }) except (PermissionError, OSError): items.append({ "name": name, "size": "?", "date": "?", "is_dir": False, }) self._local_list.populate(items) count = len([i for i in items if i["name"] != ".."]) self._local_status.configure(text=t("items_count").format(count=count)) def _navigate_local(self, name: str): if name == "..": self._local_go_up() return target = os.path.join(self._local_path, name) if os.path.isdir(target): self._local_history.append(self._local_path) self._local_path = os.path.normpath(target) self._refresh_local() def _local_go_up(self): parent = os.path.dirname(self._local_path) if parent != self._local_path: self._local_history.append(self._local_path) self._local_path = parent self._refresh_local() def _local_go_back(self): if self._local_history: self._local_path = self._local_history.pop() self._refresh_local() def _local_go_to_path(self): path = self._local_path_entry.get().strip() if path and os.path.isdir(path): self._local_history.append(self._local_path) self._local_path = os.path.normpath(path) self._refresh_local() # ── Remote navigation ── def _refresh_remote(self): if not self._sftp: return self._remote_path_entry.delete(0, "end") self._remote_path_entry.insert(0, self._remote_path) def _list_remote(): """Fetch remote listing, returns items list.""" if self._sftp.sudo_mode: try: attrs = self._sftp.listdir_attr_sudo(self._remote_path) except Exception: attrs = self._sftp.listdir_attr(self._remote_path) else: attrs = self._sftp.listdir_attr(self._remote_path) items = [] if self._remote_path != "/": items.append({ "name": "..", "size": "", "date": "", "perm": "", "is_dir": True, }) for a in attrs: is_dir = stat.S_ISDIR(a.st_mode) if a.st_mode else False items.append({ "name": a.filename, "size": "" if is_dir else _format_size(a.st_size or 0), "date": _format_date(a.st_mtime or 0), "perm": _format_perm(a.st_mode & 0o777) if a.st_mode else "", "is_dir": is_dir, }) return items def _do(): # Ensure connection is alive, reconnect if needed if not self._sftp.connected: try: self._sftp.reconnect() except Exception as e: self.after(0, lambda: self._on_sftp_error(str(e))) return if not self._sftp.connected: self.after(0, lambda: self._on_sftp_error("Reconnect failed")) return try: items = _list_remote() self.after(0, lambda: self._populate_remote(items)) except PermissionError as e: hint = f"\n{t('try_sudo_hint')}" if not self._sftp.sudo_mode else "" self.after(0, lambda: self._on_sftp_error(str(e) + hint)) except Exception: # Operation failed — one reconnect attempt try: self._sftp.reconnect() items = _list_remote() self.after(0, lambda: self._populate_remote(items)) except Exception as e2: self.after(0, lambda: self._on_sftp_error(str(e2))) threading.Thread(target=_do, daemon=True).start() def _populate_remote(self, items: list[dict]): self._remote_list.populate(items) count = len([i for i in items if i["name"] != ".."]) self._remote_status.configure(text=t("items_count").format(count=count)) def _navigate_remote(self, name: str): if not self._sftp: return if name == "..": self._remote_go_up() return if self._remote_path == "/": target = "/" + name else: target = self._remote_path + "/" + name self._remote_history.append(self._remote_path) self._remote_path = target self._refresh_remote() def _remote_go_up(self): if self._remote_path == "/": return if not self._sftp: return parent = "/".join(self._remote_path.rstrip("/").split("/")[:-1]) or "/" self._remote_history.append(self._remote_path) self._remote_path = parent self._refresh_remote() def _remote_go_back(self): if not self._sftp: return if self._remote_history: self._remote_path = self._remote_history.pop() self._refresh_remote() def _remote_go_to_path(self): if not self._sftp: return path = self._remote_path_entry.get().strip() if path: self._remote_history.append(self._remote_path) self._remote_path = path self._refresh_remote() # ── Remote path helper ── def _remote_join(self, name: str) -> str: if self._remote_path == "/": return "/" + name return self._remote_path + "/" + name # ── Upload / Download (shared logic) ── def _do_upload(self, items: list[dict]): """Upload files and folders from local to remote. Runs in background thread.""" if not self._sftp or self._transferring: return items = [s for s in items if s["name"] != ".."] if not items: return self._transferring = True self._upload_btn.configure(state="disabled") self._download_btn.configure(state="disabled") def _do(): if not self._sftp.connected: try: self._sftp.reconnect() except Exception as e: self.after(0, lambda: self._on_sftp_error(str(e))) self.after(0, self._on_transfer_done) return for item in items: name = item["name"] local = os.path.join(self._local_path, name) remote = self._remote_join(name) try: if item.get("is_dir"): self.after(0, lambda n=name: self._transfer_label.configure( text=t("uploading_dir").format(name=n) )) def _file_cb(cur, total, fname, n=name): self.after(0, lambda c=cur, tt=total, fn=fname: self._transfer_label.configure( text=t("transfer_file_progress").format(cur=c, total=tt, name=fn) )) def _progress(transferred, total): if total > 0: self.after(0, lambda f=transferred/total: self._progress.set(f)) self._sftp.upload_dir(local, remote, progress_cb=_progress, file_cb=_file_cb) else: file_size = os.path.getsize(local) self.after(0, lambda n=name: self._transfer_label.configure( text=t("uploading").format(name=n) )) def _progress(transferred, total, n=name): if total > 0: frac = transferred / total size_str = f"{_format_size(transferred)}/{_format_size(total)}" self.after(0, lambda f=frac, s=size_str, nn=n: self._update_progress(f, t("uploading").format(name=nn) + f" ({s})")) if self._sftp.sudo_mode: try: self._sftp.upload_sudo(local, remote, progress_cb=_progress) except Exception: self._sftp.upload(local, remote, progress_cb=_progress) else: self._sftp.upload(local, remote, progress_cb=_progress) self.after(0, lambda n=name: self._log_msg( t("transfer_done").format(name=n) )) except PermissionError as e: hint = f" - {t('try_sudo_hint')}" if not self._sftp.sudo_mode else "" self.after(0, lambda e=e, h=hint: self._log_msg( t("transfer_failed").format(e=str(e)) + h )) except Exception as e: self.after(0, lambda e=e: self._log_msg( t("transfer_failed").format(e=str(e)) )) self.after(0, self._on_transfer_done) threading.Thread(target=_do, daemon=True).start() def _do_download(self, items: list[dict]): """Download files and folders from remote to local. Runs in background thread.""" if not self._sftp or self._transferring: return items = [s for s in items if s["name"] != ".."] if not items: return self._transferring = True self._upload_btn.configure(state="disabled") self._download_btn.configure(state="disabled") def _do(): if not self._sftp.connected: try: self._sftp.reconnect() except Exception as e: self.after(0, lambda: self._on_sftp_error(str(e))) self.after(0, self._on_transfer_done) return for item in items: name = item["name"] remote = self._remote_join(name) local = os.path.join(self._local_path, name) try: if item.get("is_dir"): self.after(0, lambda n=name: self._transfer_label.configure( text=t("downloading_dir").format(name=n) )) os.makedirs(local, exist_ok=True) def _file_cb(cur, total, fname, n=name): self.after(0, lambda c=cur, tt=total, fn=fname: self._transfer_label.configure( text=t("transfer_file_progress").format(cur=c, total=tt, name=fn) )) def _progress(transferred, total): if total > 0: self.after(0, lambda f=transferred/total: self._progress.set(f)) self._sftp.download_dir(remote, local, progress_cb=_progress, file_cb=_file_cb) else: self.after(0, lambda n=name: self._transfer_label.configure( text=t("downloading").format(name=n) )) def _progress(transferred, total, n=name): if total > 0: frac = transferred / total size_str = f"{_format_size(transferred)}/{_format_size(total)}" self.after(0, lambda f=frac, s=size_str, nn=n: self._update_progress(f, t("downloading").format(name=nn) + f" ({s})")) if self._sftp.sudo_mode: try: self._sftp.download_sudo(remote, local, progress_cb=_progress) except Exception: self._sftp.download(remote, local, progress_cb=_progress) else: self._sftp.download(remote, local, progress_cb=_progress) self.after(0, lambda n=name: self._log_msg( t("transfer_done").format(name=n) )) except PermissionError as e: hint = f" - {t('try_sudo_hint')}" if not self._sftp.sudo_mode else "" self.after(0, lambda e=e, h=hint: self._log_msg( t("transfer_failed").format(e=str(e)) + h )) except Exception as e: self.after(0, lambda e=e: self._log_msg( t("transfer_failed").format(e=str(e)) )) self.after(0, self._on_transfer_done) threading.Thread(target=_do, daemon=True).start() # ── Button handlers ── def _upload_selected(self): if not self._sftp or not self._sftp.connected or self._transferring: return selected = self._local_list.get_selected() items = [s for s in selected if s["name"] != ".."] if not items: self._log_msg(t("no_server_selected")) return self._do_upload(items) def _download_selected(self): if not self._sftp or not self._sftp.connected or self._transferring: return selected = self._remote_list.get_selected() items = [s for s in selected if s["name"] != ".."] if not items: return self._do_download(items) # ── Drag-and-drop callbacks ── def _on_drop_to_remote(self, items: list[dict], source): """Called when items are dropped onto remote panel (upload).""" self._do_upload(items) def _on_drop_to_local(self, items: list[dict], source): """Called when items are dropped onto local panel (download).""" self._do_download(items) def _on_transfer_done(self): self._transferring = False self._progress.set(0) self._transfer_label.configure(text="") self._upload_btn.configure(state="normal") self._download_btn.configure(state="normal") self._refresh_local() self._refresh_remote() def _update_progress(self, fraction: float, text: str): self._progress.set(fraction) self._transfer_label.configure(text=text) def _mkdir_remote(self): if not self._sftp or not self._sftp.connected: return dialog = ctk.CTkInputDialog( text=t("new_folder_name"), title=t("new_folder"), ) name = dialog.get_input() if not name or not name.strip(): return name = name.strip() path = self._remote_join(name) def _do(): try: self._sftp.mkdir(path) self.after(0, self._refresh_remote) except Exception as e: self.after(0, lambda: self._log_msg(t("sftp_error").format(e=str(e)))) threading.Thread(target=_do, daemon=True).start() def _delete_remote(self): if not self._sftp or not self._sftp.connected: return selected = self._remote_list.get_selected() items = [s for s in selected if s["name"] != ".."] if not items: return # Check for directories — use recursive delete confirm has_dirs = any(s.get("is_dir") for s in items) if has_dirs and len(items) == 1 and items[0].get("is_dir"): if not messagebox.askyesno( t("delete_files"), t("recursive_delete_confirm").format(name=items[0]["name"]), ): return else: if not messagebox.askyesno( t("delete_files"), t("delete_files_confirm").format(count=len(items)), ): return def _do(): for item in items: name = item["name"] path = self._remote_join(name) try: if item.get("is_dir"): self._sftp.rmdir_recursive(path) else: self._sftp.remove(path) except Exception as e: self.after(0, lambda: self._log_msg( t("sftp_error").format(e=str(e)) )) self.after(0, self._refresh_remote) threading.Thread(target=_do, daemon=True).start() def _rename_remote(self): if not self._sftp or not self._sftp.connected: return selected = self._remote_list.get_selected() if not selected or selected[0]["name"] == "..": return old_name = selected[0]["name"] dialog = ctk.CTkInputDialog( text=t("rename_prompt"), title=t("rename_file"), ) new_name = dialog.get_input() if not new_name or not new_name.strip() or new_name.strip() == old_name: return new_name = new_name.strip() old_path = self._remote_join(old_name) new_path = self._remote_join(new_name) def _do(): try: self._sftp.rename(old_path, new_path) self.after(0, self._refresh_remote) except Exception as e: self.after(0, lambda: self._log_msg(t("sftp_error").format(e=str(e)))) threading.Thread(target=_do, daemon=True).start() # ── Helpers ── def _set_remote_buttons_state(self, state: str): for btn in ( self._upload_btn, self._download_btn, self._mkdir_btn, self._delete_btn, self._rename_btn, self._remote_refresh_btn, self._remote_back_btn, self._remote_up_btn, ): btn.configure(state=state) def _log_msg(self, text: str): self._log.configure(state="normal") self._log.insert("end", text + "\n") self._log.configure(state="disabled") self._log.see("end")