""" Main application window β€” sidebar + tabview layout. """ import sys import tkinter import customtkinter as ctk from tkinter import messagebox from core.server_store import ServerStore from core.status_checker import StatusChecker from core.updater import UpdateChecker from core import i18n from core.i18n import t, LANGUAGES from core.icons import icon, TAB_ICONS, ctk_icon from core.session_pool import SessionPool from gui.sidebar import Sidebar from gui.server_dialog import ServerDialog from gui.about_dialog import AboutDialog from gui.tabs.terminal_tab import TerminalTab from gui.tabs.files_tab import FilesTab from gui.tabs.info_tab import InfoTab from gui.tabs.keys_tab import KeysTab from gui.tabs.setup_tab import SetupTab from gui.tabs.totp_tab import TOTPTab from gui.tabs.query_tab import QueryTab from gui.tabs.redis_tab import RedisTab from gui.tabs.grafana_tab import GrafanaTab from gui.tabs.prometheus_tab import PrometheusTab from gui.tabs.powershell_tab import PowershellTab from gui.tabs.launch_tab import LaunchTab from gui.tabs.s3_tab import S3Tab # Tab sets per server type β€” determines which tabs are shown TAB_REGISTRY = { "ssh": ["terminal", "files", "info", "keys", "totp", "setup"], "telnet": ["terminal", "info", "setup"], "winrm": ["powershell", "info", "setup"], "mariadb": ["query", "info", "setup"], "mssql": ["query", "info", "setup"], "postgresql": ["query", "info", "setup"], "redis": ["console", "info", "setup"], "grafana": ["dashboards", "info", "setup"], "prometheus": ["metrics", "info", "setup"], "rdp": ["launch", "info", "setup"], "vnc": ["launch", "info", "setup"], "s3": ["objects", "info", "setup"], } # Map tab key β†’ widget class (used as lazy factory) TAB_CLASSES = { "terminal": TerminalTab, "files": FilesTab, "info": InfoTab, "keys": KeysTab, "totp": TOTPTab, "setup": SetupTab, "query": QueryTab, "console": RedisTab, "dashboards": GrafanaTab, "metrics": PrometheusTab, "powershell": PowershellTab, "launch": LaunchTab, "objects": S3Tab, } def _tab_label(key: str) -> str: """Return tab label with icon prefix: 'πŸ“ Files'.""" icon_name = TAB_ICONS.get(key) sym = icon(icon_name) if icon_name else "" text = t(key) return f"{sym} {text}" if sym else text class App(ctk.CTk): def __init__(self): super().__init__() # Window config self.title("ServerManager") self.minsize(900, 500) ctk.set_appearance_mode("dark") ctk.set_default_color_theme("blue") # Core self.store = ServerStore() self.checker = StatusChecker(self.store) self.session_pool = SessionPool(max_sessions=5) # Create session pool self.updater = UpdateChecker(self.store, gui_callback=self._on_update_event) # Restore saved window geometry or use default saved_geo = self.store._window_geometry if saved_geo and self._is_valid_geometry(saved_geo): self.geometry(saved_geo) else: self.geometry("1100x700") # Layout self._build_layout() # Status checker self.checker.set_gui_callback(lambda: self.after(0, self._on_status_update)) self.checker.start() self.checker.check_all_now() # Auto-updater self.updater.start() # Fix Ctrl+V/C/A/X for non-Latin keyboard layouts (e.g. Russian) # Tkinter maps <> to by keysym, which fails when # the layout produces non-Latin characters. This fix uses keycodes instead. self._setup_keyboard_layout_fix() # Add Undo/Redo support for Entry widgets (tk.Entry has no built-in undo) self._setup_entry_undo() # Cleanup on close self.protocol("WM_DELETE_WINDOW", self._on_close) # Win32: restore window when stuck minimized after Win+D self._restore_check_id = None if sys.platform == "win32": self.after(3000, self._start_restore_watchdog) def _start_restore_watchdog(self): """Start periodic check for stuck minimized state (Windows only).""" try: import ctypes self._user32 = ctypes.windll.user32 self._hwnd = int(self.wm_frame(), 16) self._check_restore() except Exception: pass def _check_restore(self): """If window is iconic but user clicked taskbar, force restore.""" try: if self._user32.IsIconic(self._hwnd): fg = self._user32.GetForegroundWindow() if fg == self._hwnd: self._user32.ShowWindow(self._hwnd, 9) # SW_RESTORE except Exception: pass self._restore_check_id = self.after(500, self._check_restore) def _build_layout(self): # PanedWindow β€” resizable sidebar | main area self._paned = tkinter.PanedWindow( self, orient="horizontal", sashwidth=4, bg="#2b2b2b", sashrelief="flat", opaqueresize=True, ) self._paned.pack(fill="both", expand=True) # Sidebar self.sidebar = Sidebar(self._paned, self.store, on_select=self._on_server_select, on_double_click=self._on_server_connect, session_pool=self.session_pool) self._paned.add(self.sidebar, minsize=180, width=self.store._sidebar_width) self.sidebar.add_callback = self._add_server self.sidebar.edit_callback = self._edit_server self.sidebar.delete_callback = self._delete_server self.sidebar.add_group_callback = self._add_group self.sidebar.open_tab_callback = self._context_open_tab self.sidebar.check_status_callback = self._context_check_status self.sidebar.open_browser_callback = self._context_open_browser self.sidebar.disconnect_callback = self._on_server_disconnect # Main area self._main_frame = ctk.CTkFrame(self._paned, fg_color="transparent") self._paned.add(self._main_frame, minsize=500) # Header bar (language + about) header_bar = ctk.CTkFrame(self._main_frame, fg_color="transparent", height=40) header_bar.pack(fill="x", padx=10, pady=(8, 0)) header_bar.pack_propagate(False) # Language selector _lang_img = ctk_icon("globe", 18) self._lang_icon = ctk.CTkLabel( header_bar, text="" if _lang_img else "\U0001f310", image=_lang_img, font=ctk.CTkFont(size=14), width=20, ) self._lang_icon.pack(side="right", padx=(5, 0)) lang_values = list(LANGUAGES.values()) current_display = LANGUAGES.get(i18n.get_language(), "English") self._lang_var = ctk.StringVar(value=current_display) self.lang_menu = ctk.CTkOptionMenu( header_bar, values=lang_values, variable=self._lang_var, width=110, height=30, command=self._change_language ) self.lang_menu.pack(side="right", padx=(5, 0)) # Check Updates button _sync_img = ctk_icon("refresh", 18) self._update_check_btn = ctk.CTkButton( header_bar, text="" if _sync_img else "\u21bb", image=_sync_img, width=30, height=30, corner_radius=15, fg_color="#6b7280", hover_color="#4b5563", command=self._check_updates_manual, ) self._update_check_btn.pack(side="right", padx=(5, 0)) # About button _info_img = ctk_icon("info", 18) self.about_btn = ctk.CTkButton( header_bar, text="" if _info_img else "β“˜", image=_info_img, width=30, height=30, corner_radius=15, fg_color="#6b7280", hover_color="#4b5563", command=self._show_about ) self.about_btn.pack(side="right", padx=(5, 5)) # Update banner (hidden by default) self._update_banner = None self._pending_update_info = None self._pending_download_path = None # Initialize tab tracking self.tabview = None self._tab_keys = [] self._tab_instances = {} # Build default SSH tab set self._rebuild_tabs(TAB_REGISTRY["ssh"]) def _rebuild_tabs(self, tab_keys: list[str], restore_tab_key: str | None = None): """Destroy current tabview and rebuild with the given tab keys.""" # Remember current active tab if restore_tab_key is None: restore_tab_key = self._get_current_tab_key() if self._tab_keys else None # Destroy old tab instances for key, widget in self._tab_instances.items(): try: widget.pack_forget() widget.destroy() except Exception: pass self._tab_instances = {} # Destroy old tabview if self.tabview is not None: try: self.tabview.destroy() except Exception: pass # Store new tab key list self._tab_keys = list(tab_keys) # Create new tabview self.tabview = ctk.CTkTabview(self._main_frame, command=self._on_tab_changed) self.tabview._outer_spacing = 0 self.tabview._configure_grid() self.tabview.pack(fill="both", expand=True, padx=10, pady=(0, 10)) for key in self._tab_keys: self.tabview.add(_tab_label(key)) # Create tab instances using TAB_CLASSES factory for key in self._tab_keys: cls = TAB_CLASSES.get(key) if cls is None: continue parent = self.tabview.tab(_tab_label(key)) widget = self._create_tab_instance(cls, key, parent) widget.pack(fill="both", expand=True) self._tab_instances[key] = widget # Wire disconnect callback for terminal toolbar button terminal = self._tab_instances.get("terminal") if terminal and hasattr(terminal, "_on_disconnect_callback"): terminal._on_disconnect_callback = self._on_server_disconnect # Restore previously active tab if still available if restore_tab_key and restore_tab_key in self._tab_keys: try: self.tabview.set(_tab_label(restore_tab_key)) except Exception: pass def _create_tab_instance(self, cls, key: str, parent): """Create a tab widget instance with the correct constructor args.""" if cls in (TerminalTab, FilesTab): return cls(parent, self.store, self.session_pool) elif cls is InfoTab: return cls(parent, self.store, edit_callback=self._edit_server) elif cls is SetupTab: return cls(parent, self.store) elif cls in (KeysTab, TOTPTab): return cls(parent, self.store) else: # QueryTab, RedisTab, GrafanaTab, PrometheusTab, PowershellTab, LaunchTab return cls(parent, self.store) def _on_server_select(self, alias: str): # Determine server type and required tabs if alias: server = self.store.get_server(alias) server_type = server.get("type", "ssh") if server else "ssh" else: server_type = "ssh" new_tab_keys = TAB_REGISTRY.get(server_type, TAB_REGISTRY["ssh"]) # Rebuild tabs only if the tab set changed if new_tab_keys != self._tab_keys: self._rebuild_tabs(new_tab_keys) # Notify each tab instance about the selected server for key, widget in self._tab_instances.items(): if hasattr(widget, "set_server"): widget.set_server(alias) # Update session indicators after a short delay (connection is async) self.after(1500, self.sidebar.update_session_indicators) def _on_server_connect(self, alias: str): """Double-click: connect interactive tabs (terminal, files, powershell).""" for key, widget in self._tab_instances.items(): if hasattr(widget, "connect"): widget.connect() def _on_server_disconnect(self, alias: str): """Disconnect all sessions for a server.""" for key, widget in self._tab_instances.items(): if hasattr(widget, "disconnect"): widget.disconnect() self.session_pool.disconnect_session(alias) self.after(500, self.sidebar.update_session_indicators) def _add_server(self): dialog = ServerDialog(self, self.store) self.wait_window(dialog) def _add_group(self): from gui.group_dialog import GroupDialog dialog = GroupDialog(self, self.store) self.wait_window(dialog) def _edit_server(self, alias: str): server = self.store.get_server(alias) if server: dialog = ServerDialog(self, self.store, server=server) self.wait_window(dialog) if dialog.result and dialog.result.get("alias") != alias: new_alias = dialog.result["alias"] self.sidebar._select(new_alias) self.session_pool.rename_server(alias, new_alias) else: # Data may have changed (IP, port, password) β€” force reconnect self._force_reconnect(alias) def _force_reconnect(self, alias: str): """Force tabs to reconnect after server data changed.""" # Invalidate cached SSH/SFTP sessions in pool self.session_pool.disconnect_session(alias) # Reset _current_alias so set_server() bypasses early return for widget in self._tab_instances.values(): if getattr(widget, '_current_alias', None) == alias: widget._current_alias = None # Re-trigger server selection (calls set_server on all tabs) self._on_server_select(alias) def _delete_server(self, alias: str): if messagebox.askyesno(t("delete_server"), t("delete_confirm").format(alias=alias)): # Clean up sessions when deleting server self.session_pool.cleanup_deleted_server(alias) self.store.remove_server(alias) self._on_server_select(None) def _context_open_tab(self, alias: str, tab_key: str): """Context menu: select server and switch to a specific tab.""" self._on_server_select(alias) self.sidebar._select(alias) if tab_key in self._tab_keys: try: self.tabview.set(_tab_label(tab_key)) except Exception: pass # Connect the target tab if it supports explicit connection widget = self._tab_instances.get(tab_key) if widget and hasattr(widget, "connect"): widget.connect() def _context_check_status(self, alias: str): """Context menu: check single server status in background.""" import threading server = self.store.get_server(alias) if not server: return def _check(): online = self.checker.check_one(server) self.store.set_status(alias, "online" if online else "offline") self.after(0, self.sidebar.update_statuses) threading.Thread(target=_check, daemon=True).start() def _context_open_browser(self, alias: str): """Context menu: open Grafana/Prometheus in browser.""" import webbrowser server = self.store.get_server(alias) if not server: return use_ssl = server.get("use_ssl", False) scheme = "https" if use_ssl else "http" port = server.get("port", 3000) url = f"{scheme}://{server['ip']}:{port}" webbrowser.open(url) def _on_status_update(self): self.sidebar.update_statuses() self.sidebar.update_session_indicators() info = self._tab_instances.get("info") if info and hasattr(info, "refresh"): info.refresh() def _show_about(self): AboutDialog(self) # ── Update handling ───────────────────────────────── def _on_update_event(self, event_type: str, info: dict, path: str = None): """Called from updater thread β€” schedule GUI work on main thread.""" self.after(0, lambda: self._handle_update_event(event_type, info, path)) def _handle_update_event(self, event_type: str, info: dict, path: str = None): """Handle update events on the main thread.""" self._pending_update_info = info self._pending_download_path = path if event_type == "auto_apply": # Full-auto mode: apply immediately self._apply_update(path) return # Show banner self._show_update_banner(info, downloaded=(path is not None)) def _show_update_banner(self, info: dict, downloaded: bool = False): """Show/update the update banner.""" from gui.update_dialog import UpdateBanner if self._update_banner is not None: try: self._update_banner.destroy() except Exception: pass self._update_banner = UpdateBanner( self._main_frame, on_update=self._show_update_dialog, on_skip=lambda: self._skip_update(info["version"]), on_dismiss=self._dismiss_banner, ) self._update_banner.set_info(info["version"], downloaded=downloaded) # Pack banner between header and tabview self._update_banner.pack(fill="x", padx=10, pady=(4, 0), before=self.tabview) def _show_update_dialog(self): """Open the update dialog.""" from gui.update_dialog import UpdateDialog if not self._pending_update_info: return import sys if not getattr(sys, "frozen", False): from tkinter import messagebox messagebox.showinfo( t("update_available_title"), t("update_not_frozen"), ) return UpdateDialog( self, self._pending_update_info, downloaded_path=self._pending_download_path, on_install=self._apply_update, on_skip=self._skip_update, ) def _apply_update(self, path: str): """Apply downloaded update β€” cleanup, launch updater script, force exit.""" import os, sys if self.updater.apply_update(path): # Full cleanup before exit try: self.session_pool.disconnect_all() self.checker.stop() self.updater.stop() self.destroy() except Exception: pass # Force terminate β€” daemon threads keep process alive otherwise os._exit(0) def _skip_update(self, version: str): """Skip this version.""" self.store.set_skip_version(version) self._dismiss_banner() def _dismiss_banner(self): if self._update_banner: try: self._update_banner.pack_forget() self._update_banner.destroy() except Exception: pass self._update_banner = None def _check_updates_manual(self): """Manual check for updates (button click).""" import threading from tkinter import messagebox self._update_check_btn.configure(state="disabled") def _check(): info = self.updater.check_now() self.after(0, lambda: self._manual_check_done(info)) threading.Thread(target=_check, daemon=True).start() def _manual_check_done(self, info): self._update_check_btn.configure(state="normal") if info: self._pending_update_info = info self._pending_download_path = None self._show_update_banner(info) else: from tkinter import messagebox messagebox.showinfo( t("update_check"), t("update_no_updates"), ) def _get_current_tab_key(self) -> str: """Get the i18n key of the currently active tab.""" try: current_name = self.tabview.get() # Match against current language translations with icons for key in self._tab_keys: if _tab_label(key) == current_name: return key except Exception: pass return self._tab_keys[0] def _change_language(self, display_name: str): # Remember current tab KEY before language switch active_tab_key = self._get_current_tab_key() # Find lang code from display name lang_code = "en" for code, name in LANGUAGES.items(): if name == display_name: lang_code = code break i18n.set_language(lang_code) self.store._save_settings() self._apply_language(active_tab_key) def _apply_language(self, restore_tab_key: str | None = None): # Remember selected server alias = self.sidebar.get_selected() # Use provided key or default to first tab current_key = restore_tab_key or (self._tab_keys[0] if self._tab_keys else "terminal") # Save FilesTab state if it exists files_tab = self._tab_instances.get("files") saved_remote_path = None saved_local_path = None had_sftp = False if files_tab: saved_remote_path = files_tab._remote_path saved_local_path = files_tab._local_path had_sftp = files_tab._sftp is not None and files_tab._sftp.connected # Disconnect all sessions in the pool self.session_pool.disconnect_all() # Rebuild tabs with translated names (same tab keys, just new language) self._rebuild_tabs(self._tab_keys, restore_tab_key=current_key) # Restore FilesTab state if it exists in new tab set files_tab = self._tab_instances.get("files") if files_tab: files_tab._local_path = saved_local_path files_tab._refresh_local() if alias and had_sftp: files_tab._remote_path = saved_remote_path files_tab.set_server(alias) elif alias: files_tab.set_server(alias) # Restore server selection for all other tabs if alias: for key, widget in self._tab_instances.items(): if key == "files": continue # Already handled above if hasattr(widget, "set_server"): widget.set_server(alias) # Update sidebar self.sidebar.update_language() def _setup_entry_undo(self): """Add Undo/Redo support for tk.Entry widgets (they have no built-in undo). Tracks text changes per-widget and restores on <>/<>. """ undo_stacks: dict[str, list[str]] = {} redo_stacks: dict[str, list[str]] = {} def _save_state(event): w = event.widget wid = str(w) try: current = w.get() except Exception: return stack = undo_stacks.setdefault(wid, []) if not stack or stack[-1] != current: stack.append(current) if len(stack) > 100: stack.pop(0) # Clear redo on new input redo_stacks.pop(wid, None) def _on_undo(event): w = event.widget wid = str(w) try: current = w.get() except Exception: return "break" stack = undo_stacks.get(wid, []) # Pop current state while stack and stack[-1] == current: stack.pop() if stack: prev = stack[-1] redo_stacks.setdefault(wid, []).append(current) w.delete(0, "end") w.insert(0, prev) return "break" def _on_redo(event): w = event.widget wid = str(w) try: current = w.get() except Exception: return "break" stack = redo_stacks.get(wid, []) if stack: next_val = stack.pop() undo_stacks.setdefault(wid, []).append(current) w.delete(0, "end") w.insert(0, next_val) return "break" # Add class-level bindings for Entry self.tk.eval(''' bind Entry <> {} bind Entry <> {} ''') tkinter.Misc.bind_class(self, "Entry", "", _save_state, "+") tkinter.Misc.bind_class(self, "Entry", "<>", _on_undo) tkinter.Misc.bind_class(self, "Entry", "<>", _on_redo) def _setup_keyboard_layout_fix(self): """Fix Ctrl+V/C/X/A/Z for non-Latin keyboard layouts (Russian, Chinese, etc.). Tkinter binds <> to by keysym. When the keyboard layout is Russian, pressing Ctrl+V produces which doesn't match. This fix intercepts at the 'all' level by keycode (layout-independent) and generates the correct virtual events. """ # Keycode β†’ virtual event mapping (physical keys, layout-independent) keycode_to_event = { 86: "<>", # V 67: "<>", # C 88: "<>", # X 65: "<>", # A 90: "<>", # Z 89: "<>", # Y } def _on_ctrl_key_global(event): # Only act when Ctrl is held (not Ctrl+Shift, etc.) if not (event.state & 0x4): return # Skip if keysym is already Latin (standard handling works) if event.keysym in ('v', 'V', 'c', 'C', 'x', 'X', 'a', 'A', 'z', 'Z', 'y', 'Y'): return virtual = keycode_to_event.get(event.keycode) if virtual: event.widget.event_generate(virtual) return "break" # Bypass CTk's bind_all restriction by calling tkinter.Misc directly tkinter.Misc.bind_all(self, "", _on_ctrl_key_global, "+") def _on_tab_changed(self): """Handle tab switch β€” manage terminal focus.""" try: current = self.tabview.get() terminal = self._tab_instances.get("terminal") if terminal and current == _tab_label("terminal"): terminal._terminal.focus_terminal() else: self.focus_set() except Exception: pass @staticmethod def _is_valid_geometry(geo: str) -> bool: """Reject geometry with offscreen coordinates (e.g. minimized -32000).""" try: # format: WxH+X+Y or WxH-X-Y import re m = re.match(r"(\d+)x(\d+)([+-]\d+)([+-]\d+)", geo) if not m: return False x, y = int(m.group(3)), int(m.group(4)) return -100 < x < 10000 and -100 < y < 10000 except Exception: return False def _on_close(self): # Cancel restore watchdog try: if self._restore_check_id: self.after_cancel(self._restore_check_id) except Exception: pass # Save window geometry (size + position) and sidebar width try: geo = self.geometry() self.store._window_geometry = geo if self._is_valid_geometry(geo) else None # Save sidebar width from PanedWindow sash position try: sash_pos = self._paned.sash_coord(0) if sash_pos: self.store._sidebar_width = sash_pos[0] except Exception: pass self.store._save_settings() except Exception: pass # Clean up tab instances for key, widget in self._tab_instances.items(): if hasattr(widget, "on_close"): try: widget.on_close() except Exception: pass # Disconnect all sessions before closing self.session_pool.disconnect_all() self.checker.stop() self.updater.stop() self.destroy()