""" Files tab — SFTP upload/download. """ import os import threading import customtkinter as ctk from tkinter import filedialog from core.ssh_client import SSHClientWrapper class FilesTab(ctk.CTkFrame): def __init__(self, master, store): super().__init__(master, fg_color="transparent") self.store = store self._current_alias: str | None = None # Upload section upload_label = ctk.CTkLabel(self, text="Upload", font=ctk.CTkFont(size=14, weight="bold"), anchor="w") upload_label.pack(fill="x", padx=15, pady=(15, 5)) upload_frame = ctk.CTkFrame(self, fg_color="transparent") upload_frame.pack(fill="x", padx=15, pady=(0, 5)) ctk.CTkLabel(upload_frame, text="Local:", width=60, anchor="w").pack(side="left") self.upload_local = ctk.CTkEntry(upload_frame, placeholder_text="/path/to/local/file") self.upload_local.pack(side="left", fill="x", expand=True, padx=5) ctk.CTkButton(upload_frame, text="Browse", width=70, command=self._browse_upload).pack(side="right") upload_remote_frame = ctk.CTkFrame(self, fg_color="transparent") upload_remote_frame.pack(fill="x", padx=15, pady=(0, 5)) ctk.CTkLabel(upload_remote_frame, text="Remote:", width=60, anchor="w").pack(side="left") self.upload_remote = ctk.CTkEntry(upload_remote_frame, placeholder_text="/remote/path/file") self.upload_remote.pack(side="left", fill="x", expand=True, padx=5) self.upload_btn = ctk.CTkButton(upload_remote_frame, text="Upload", width=70, command=self._upload) self.upload_btn.pack(side="right") # Separator ctk.CTkFrame(self, height=2, fg_color="gray40").pack(fill="x", padx=15, pady=10) # Download section download_label = ctk.CTkLabel(self, text="Download", font=ctk.CTkFont(size=14, weight="bold"), anchor="w") download_label.pack(fill="x", padx=15, pady=(5, 5)) download_remote_frame = ctk.CTkFrame(self, fg_color="transparent") download_remote_frame.pack(fill="x", padx=15, pady=(0, 5)) ctk.CTkLabel(download_remote_frame, text="Remote:", width=60, anchor="w").pack(side="left") self.download_remote = ctk.CTkEntry(download_remote_frame, placeholder_text="/remote/path/file") self.download_remote.pack(side="left", fill="x", expand=True, padx=5) download_local_frame = ctk.CTkFrame(self, fg_color="transparent") download_local_frame.pack(fill="x", padx=15, pady=(0, 5)) ctk.CTkLabel(download_local_frame, text="Local:", width=60, anchor="w").pack(side="left") self.download_local = ctk.CTkEntry(download_local_frame, placeholder_text="/path/to/save") self.download_local.pack(side="left", fill="x", expand=True, padx=5) ctk.CTkButton(download_local_frame, text="Browse", width=70, command=self._browse_download).pack(side="left", padx=(5, 0)) self.download_btn = ctk.CTkButton(download_local_frame, text="Download", width=80, command=self._download) self.download_btn.pack(side="right") # Progress self.progress = ctk.CTkProgressBar(self) self.progress.pack(fill="x", padx=15, pady=(10, 5)) self.progress.set(0) # Log self.log = ctk.CTkTextbox(self, height=150, font=ctk.CTkFont(family="Consolas", size=11), state="disabled") self.log.pack(fill="both", expand=True, padx=15, pady=(5, 15)) def set_server(self, alias: str | None): self._current_alias = alias def _log_msg(self, text: str): self.log.configure(state="normal") self.log.insert("end", text + "\n") self.log.configure(state="disabled") self.log.see("end") def _browse_upload(self): path = filedialog.askopenfilename() if path: self.upload_local.delete(0, "end") self.upload_local.insert(0, path) if not self.upload_remote.get(): self.upload_remote.insert(0, "/tmp/" + os.path.basename(path)) def _browse_download(self): path = filedialog.asksaveasfilename() if path: self.download_local.delete(0, "end") self.download_local.insert(0, path) def _upload(self): if not self._current_alias: self._log_msg("[!] No server selected") return local = self.upload_local.get().strip() remote = self.upload_remote.get().strip() if not local or not remote: self._log_msg("[!] Both paths required") return if not os.path.exists(local): self._log_msg(f"[!] File not found: {local}") return server = self.store.get_server(self._current_alias) if not server: return self.upload_btn.configure(state="disabled") self.progress.set(0) file_size = os.path.getsize(local) def _progress(transferred, total): if total > 0: self.after(0, lambda: self.progress.set(transferred / total)) def _do(): try: wrapper = SSHClientWrapper(server, self.store.get_ssh_key_path()) wrapper.upload(local, remote, progress_cb=_progress) self.after(0, lambda: self._log_msg(f"OK: {local} -> {self._current_alias}:{remote}")) except Exception as e: self.after(0, lambda: self._log_msg(f"[ERROR] {e}")) finally: self.after(0, lambda: self.upload_btn.configure(state="normal")) threading.Thread(target=_do, daemon=True).start() def _download(self): if not self._current_alias: self._log_msg("[!] No server selected") return remote = self.download_remote.get().strip() local = self.download_local.get().strip() if not remote or not local: self._log_msg("[!] Both paths required") return server = self.store.get_server(self._current_alias) if not server: return self.download_btn.configure(state="disabled") self.progress.set(0) def _progress(transferred, total): if total > 0: self.after(0, lambda: self.progress.set(transferred / total)) def _do(): try: wrapper = SSHClientWrapper(server, self.store.get_ssh_key_path()) wrapper.download(remote, local, progress_cb=_progress) self.after(0, lambda: self._log_msg(f"OK: {self._current_alias}:{remote} -> {local}")) except Exception as e: self.after(0, lambda: self._log_msg(f"[ERROR] {e}")) finally: self.after(0, lambda: self.download_btn.configure(state="normal")) threading.Thread(target=_do, daemon=True).start()