Files
server-manager/gui/app.py
2026-03-05 03:58:32 -05:00

728 lines
26 KiB
Python

"""
Main application window — sidebar + tabview layout.
"""
import tkinter
import sys
import customtkinter as ctk
from tkinter import messagebox
from core.server_store import ServerStore
from core.status_checker import StatusChecker
from core.updater import UpdateChecker
from core import i18n
from core.i18n import t, LANGUAGES
from core.icons import icon, TAB_ICONS, ctk_icon
from core.session_pool import SessionPool
from gui.sidebar import Sidebar
from gui.server_dialog import ServerDialog
from gui.about_dialog import AboutDialog
from gui.tabs.terminal_tab import TerminalTab
from gui.tabs.files_tab import FilesTab
from gui.tabs.info_tab import InfoTab
from gui.tabs.keys_tab import KeysTab
from gui.tabs.setup_tab import SetupTab
from gui.tabs.totp_tab import TOTPTab
from gui.tabs.query_tab import QueryTab
from gui.tabs.redis_tab import RedisTab
from gui.tabs.grafana_tab import GrafanaTab
from gui.tabs.prometheus_tab import PrometheusTab
from gui.tabs.powershell_tab import PowershellTab
from gui.tabs.launch_tab import LaunchTab
from gui.tabs.s3_tab import S3Tab
# Tab sets per server type — determines which tabs are shown
TAB_REGISTRY = {
"ssh": ["terminal", "files", "info", "keys", "totp", "setup"],
"telnet": ["terminal", "info", "setup"],
"winrm": ["powershell", "info", "setup"],
"mariadb": ["query", "info", "setup"],
"mssql": ["query", "info", "setup"],
"postgresql": ["query", "info", "setup"],
"redis": ["console", "info", "setup"],
"grafana": ["dashboards", "info", "setup"],
"prometheus": ["metrics", "info", "setup"],
"rdp": ["launch", "info", "setup"],
"vnc": ["launch", "info", "setup"],
"s3": ["objects", "info", "setup"],
}
# Map tab key → widget class (used as lazy factory)
TAB_CLASSES = {
"terminal": TerminalTab,
"files": FilesTab,
"info": InfoTab,
"keys": KeysTab,
"totp": TOTPTab,
"setup": SetupTab,
"query": QueryTab,
"console": RedisTab,
"dashboards": GrafanaTab,
"metrics": PrometheusTab,
"powershell": PowershellTab,
"launch": LaunchTab,
"objects": S3Tab,
}
def _tab_label(key: str) -> str:
"""Return tab label with icon prefix: '📁 Files'."""
icon_name = TAB_ICONS.get(key)
sym = icon(icon_name) if icon_name else ""
text = t(key)
return f"{sym} {text}" if sym else text
class App(ctk.CTk):
def __init__(self):
super().__init__()
# Window config
self.title("ServerManager")
self.minsize(900, 500)
ctk.set_appearance_mode("dark")
ctk.set_default_color_theme("blue")
# Core
self.store = ServerStore()
self.checker = StatusChecker(self.store)
self.session_pool = SessionPool(max_sessions=5) # Create session pool
self.updater = UpdateChecker(self.store, gui_callback=self._on_update_event)
# Restore saved window geometry or use default
saved_geo = self.store._window_geometry
if saved_geo:
self.geometry(saved_geo)
else:
self.geometry("1100x700")
# Layout
self._build_layout()
# Status checker
self.checker.set_gui_callback(lambda: self.after(0, self._on_status_update))
self.checker.start()
self.checker.check_all_now()
# Auto-updater
self.updater.start()
# Fix Ctrl+V/C/A/X for non-Latin keyboard layouts (e.g. Russian)
# Tkinter maps <<Paste>> to <Control-v> by keysym, which fails when
# the layout produces non-Latin characters. This fix uses keycodes instead.
self._setup_keyboard_layout_fix()
# Add Undo/Redo support for Entry widgets (tk.Entry has no built-in undo)
self._setup_entry_undo()
# Cleanup on close
self.protocol("WM_DELETE_WINDOW", self._on_close)
# Fix: restore window after Win+D (Show Desktop)
self.bind("<Map>", self._on_map, add="+")
if sys.platform == "win32":
self._setup_win32_restore()
def _on_map(self, event=None):
"""Ensure window is fully visible when restored from taskbar."""
try:
self.deiconify()
self.lift()
except Exception:
pass
def _setup_win32_restore(self):
"""Win32 fallback: periodic check for stuck minimized state."""
import ctypes
self._user32 = ctypes.windll.user32
self._hwnd = int(self.wm_frame(), 16)
self._check_minimized()
def _check_minimized(self):
"""If window is iconic but should be visible, force restore."""
try:
if self._user32.IsIconic(self._hwnd):
fg = self._user32.GetForegroundWindow()
if fg == self._hwnd:
self._user32.ShowWindow(self._hwnd, 9) # SW_RESTORE
except Exception:
pass
self.after(500, self._check_minimized)
def _build_layout(self):
# PanedWindow — resizable sidebar | main area
self._paned = tkinter.PanedWindow(
self, orient="horizontal", sashwidth=4,
bg="#2b2b2b", sashrelief="flat", opaqueresize=True,
)
self._paned.pack(fill="both", expand=True)
# Sidebar
self.sidebar = Sidebar(self._paned, self.store, on_select=self._on_server_select, session_pool=self.session_pool)
self._paned.add(self.sidebar, minsize=180, width=self.store._sidebar_width)
self.sidebar.add_callback = self._add_server
self.sidebar.edit_callback = self._edit_server
self.sidebar.delete_callback = self._delete_server
self.sidebar.add_group_callback = self._add_group
self.sidebar.open_tab_callback = self._context_open_tab
self.sidebar.check_status_callback = self._context_check_status
self.sidebar.open_browser_callback = self._context_open_browser
# Main area
self._main_frame = ctk.CTkFrame(self._paned, fg_color="transparent")
self._paned.add(self._main_frame, minsize=500)
# Header bar (language + about)
header_bar = ctk.CTkFrame(self._main_frame, fg_color="transparent", height=40)
header_bar.pack(fill="x", padx=10, pady=(8, 0))
header_bar.pack_propagate(False)
# Language selector
_lang_img = ctk_icon("globe", 18)
self._lang_icon = ctk.CTkLabel(
header_bar, text="" if _lang_img else "\U0001f310",
image=_lang_img, font=ctk.CTkFont(size=14), width=20,
)
self._lang_icon.pack(side="right", padx=(5, 0))
lang_values = list(LANGUAGES.values())
current_display = LANGUAGES.get(i18n.get_language(), "English")
self._lang_var = ctk.StringVar(value=current_display)
self.lang_menu = ctk.CTkOptionMenu(
header_bar, values=lang_values, variable=self._lang_var,
width=110, height=30, command=self._change_language
)
self.lang_menu.pack(side="right", padx=(5, 0))
# Check Updates button
_sync_img = ctk_icon("refresh", 18)
self._update_check_btn = ctk.CTkButton(
header_bar, text="" if _sync_img else "\u21bb",
image=_sync_img, width=30, height=30,
corner_radius=15, fg_color="#6b7280", hover_color="#4b5563",
command=self._check_updates_manual,
)
self._update_check_btn.pack(side="right", padx=(5, 0))
# About button
_info_img = ctk_icon("info", 18)
self.about_btn = ctk.CTkButton(
header_bar, text="" if _info_img else "",
image=_info_img, width=30, height=30,
corner_radius=15, fg_color="#6b7280", hover_color="#4b5563",
command=self._show_about
)
self.about_btn.pack(side="right", padx=(5, 5))
# Update banner (hidden by default)
self._update_banner = None
self._pending_update_info = None
self._pending_download_path = None
# Initialize tab tracking
self.tabview = None
self._tab_keys = []
self._tab_instances = {}
# Build default SSH tab set
self._rebuild_tabs(TAB_REGISTRY["ssh"])
def _rebuild_tabs(self, tab_keys: list[str], restore_tab_key: str | None = None):
"""Destroy current tabview and rebuild with the given tab keys."""
# Remember current active tab
if restore_tab_key is None:
restore_tab_key = self._get_current_tab_key() if self._tab_keys else None
# Destroy old tab instances
for key, widget in self._tab_instances.items():
try:
widget.pack_forget()
widget.destroy()
except Exception:
pass
self._tab_instances = {}
# Destroy old tabview
if self.tabview is not None:
try:
self.tabview.destroy()
except Exception:
pass
# Store new tab key list
self._tab_keys = list(tab_keys)
# Create new tabview
self.tabview = ctk.CTkTabview(self._main_frame, command=self._on_tab_changed)
self.tabview.pack(fill="both", expand=True, padx=10, pady=10)
for key in self._tab_keys:
self.tabview.add(_tab_label(key))
# Create tab instances using TAB_CLASSES factory
for key in self._tab_keys:
cls = TAB_CLASSES.get(key)
if cls is None:
continue
parent = self.tabview.tab(_tab_label(key))
widget = self._create_tab_instance(cls, key, parent)
widget.pack(fill="both", expand=True)
self._tab_instances[key] = widget
# Restore previously active tab if still available
if restore_tab_key and restore_tab_key in self._tab_keys:
try:
self.tabview.set(_tab_label(restore_tab_key))
except Exception:
pass
def _create_tab_instance(self, cls, key: str, parent):
"""Create a tab widget instance with the correct constructor args."""
if cls in (TerminalTab, FilesTab):
return cls(parent, self.store, self.session_pool)
elif cls is InfoTab:
return cls(parent, self.store, edit_callback=self._edit_server)
elif cls is SetupTab:
return cls(parent, self.store)
elif cls in (KeysTab, TOTPTab):
return cls(parent, self.store)
else:
# QueryTab, RedisTab, GrafanaTab, PrometheusTab, PowershellTab, LaunchTab
return cls(parent, self.store)
def _on_server_select(self, alias: str):
# Determine server type and required tabs
if alias:
server = self.store.get_server(alias)
server_type = server.get("type", "ssh") if server else "ssh"
else:
server_type = "ssh"
new_tab_keys = TAB_REGISTRY.get(server_type, TAB_REGISTRY["ssh"])
# Rebuild tabs only if the tab set changed
if new_tab_keys != self._tab_keys:
self._rebuild_tabs(new_tab_keys)
# Notify each tab instance about the selected server
for key, widget in self._tab_instances.items():
if hasattr(widget, "set_server"):
widget.set_server(alias)
# Update session indicators after a short delay (connection is async)
self.after(1500, self.sidebar.update_session_indicators)
def _add_server(self):
dialog = ServerDialog(self, self.store)
self.wait_window(dialog)
def _add_group(self):
from gui.group_dialog import GroupDialog
dialog = GroupDialog(self, self.store)
self.wait_window(dialog)
def _edit_server(self, alias: str):
server = self.store.get_server(alias)
if server:
dialog = ServerDialog(self, self.store, server=server)
self.wait_window(dialog)
if dialog.result and dialog.result.get("alias") != alias:
new_alias = dialog.result["alias"]
self.sidebar._select(new_alias)
self.session_pool.rename_server(alias, new_alias)
else:
# Data may have changed (IP, port, password) — force reconnect
self._force_reconnect(alias)
def _force_reconnect(self, alias: str):
"""Force tabs to reconnect after server data changed."""
# Invalidate cached SSH/SFTP sessions in pool
self.session_pool.disconnect_session(alias)
# Reset _current_alias so set_server() bypasses early return
for widget in self._tab_instances.values():
if getattr(widget, '_current_alias', None) == alias:
widget._current_alias = None
# Re-trigger server selection (calls set_server on all tabs)
self._on_server_select(alias)
def _delete_server(self, alias: str):
if messagebox.askyesno(t("delete_server"), t("delete_confirm").format(alias=alias)):
# Clean up sessions when deleting server
self.session_pool.cleanup_deleted_server(alias)
self.store.remove_server(alias)
self._on_server_select(None)
def _context_open_tab(self, alias: str, tab_key: str):
"""Context menu: select server and switch to a specific tab."""
self._on_server_select(alias)
self.sidebar._select(alias)
if tab_key in self._tab_keys:
try:
self.tabview.set(_tab_label(tab_key))
except Exception:
pass
def _context_check_status(self, alias: str):
"""Context menu: check single server status in background."""
import threading
server = self.store.get_server(alias)
if not server:
return
def _check():
online = self.checker.check_one(server)
self.store.set_status(alias, "online" if online else "offline")
self.after(0, self.sidebar.update_statuses)
threading.Thread(target=_check, daemon=True).start()
def _context_open_browser(self, alias: str):
"""Context menu: open Grafana/Prometheus in browser."""
import webbrowser
server = self.store.get_server(alias)
if not server:
return
use_ssl = server.get("use_ssl", False)
scheme = "https" if use_ssl else "http"
port = server.get("port", 3000)
url = f"{scheme}://{server['ip']}:{port}"
webbrowser.open(url)
def _on_status_update(self):
self.sidebar.update_statuses()
self.sidebar.update_session_indicators()
info = self._tab_instances.get("info")
if info and hasattr(info, "refresh"):
info.refresh()
def _show_about(self):
AboutDialog(self)
# ── Update handling ─────────────────────────────────
def _on_update_event(self, event_type: str, info: dict, path: str = None):
"""Called from updater thread — schedule GUI work on main thread."""
self.after(0, lambda: self._handle_update_event(event_type, info, path))
def _handle_update_event(self, event_type: str, info: dict, path: str = None):
"""Handle update events on the main thread."""
self._pending_update_info = info
self._pending_download_path = path
if event_type == "auto_apply":
# Full-auto mode: apply immediately
self._apply_update(path)
return
# Show banner
self._show_update_banner(info, downloaded=(path is not None))
def _show_update_banner(self, info: dict, downloaded: bool = False):
"""Show/update the update banner."""
from gui.update_dialog import UpdateBanner
if self._update_banner is not None:
try:
self._update_banner.destroy()
except Exception:
pass
self._update_banner = UpdateBanner(
self._main_frame,
on_update=self._show_update_dialog,
on_skip=lambda: self._skip_update(info["version"]),
on_dismiss=self._dismiss_banner,
)
self._update_banner.set_info(info["version"], downloaded=downloaded)
# Pack banner between header and tabview
self._update_banner.pack(fill="x", padx=10, pady=(4, 0), before=self.tabview)
def _show_update_dialog(self):
"""Open the update dialog."""
from gui.update_dialog import UpdateDialog
if not self._pending_update_info:
return
import sys
if not getattr(sys, "frozen", False):
from tkinter import messagebox
messagebox.showinfo(
t("update_available_title"),
t("update_not_frozen"),
)
return
UpdateDialog(
self,
self._pending_update_info,
downloaded_path=self._pending_download_path,
on_install=self._apply_update,
on_skip=self._skip_update,
)
def _apply_update(self, path: str):
"""Apply downloaded update — cleanup, launch updater script, force exit."""
import os, sys
if self.updater.apply_update(path):
# Full cleanup before exit
try:
self.session_pool.disconnect_all()
self.checker.stop()
self.updater.stop()
self.destroy()
except Exception:
pass
# Force terminate — daemon threads keep process alive otherwise
os._exit(0)
def _skip_update(self, version: str):
"""Skip this version."""
self.store.set_skip_version(version)
self._dismiss_banner()
def _dismiss_banner(self):
if self._update_banner:
try:
self._update_banner.pack_forget()
self._update_banner.destroy()
except Exception:
pass
self._update_banner = None
def _check_updates_manual(self):
"""Manual check for updates (button click)."""
import threading
from tkinter import messagebox
self._update_check_btn.configure(state="disabled")
def _check():
info = self.updater.check_now()
self.after(0, lambda: self._manual_check_done(info))
threading.Thread(target=_check, daemon=True).start()
def _manual_check_done(self, info):
self._update_check_btn.configure(state="normal")
if info:
self._pending_update_info = info
self._pending_download_path = None
self._show_update_banner(info)
else:
from tkinter import messagebox
messagebox.showinfo(
t("update_check"),
t("update_no_updates"),
)
def _get_current_tab_key(self) -> str:
"""Get the i18n key of the currently active tab."""
try:
current_name = self.tabview.get()
# Match against current language translations with icons
for key in self._tab_keys:
if _tab_label(key) == current_name:
return key
except Exception:
pass
return self._tab_keys[0]
def _change_language(self, display_name: str):
# Remember current tab KEY before language switch
active_tab_key = self._get_current_tab_key()
# Find lang code from display name
lang_code = "en"
for code, name in LANGUAGES.items():
if name == display_name:
lang_code = code
break
i18n.set_language(lang_code)
self.store._save_settings()
self._apply_language(active_tab_key)
def _apply_language(self, restore_tab_key: str | None = None):
# Remember selected server
alias = self.sidebar.get_selected()
# Use provided key or default to first tab
current_key = restore_tab_key or (self._tab_keys[0] if self._tab_keys else "terminal")
# Save FilesTab state if it exists
files_tab = self._tab_instances.get("files")
saved_remote_path = None
saved_local_path = None
had_sftp = False
if files_tab:
saved_remote_path = files_tab._remote_path
saved_local_path = files_tab._local_path
had_sftp = files_tab._sftp is not None and files_tab._sftp.connected
# Disconnect all sessions in the pool
self.session_pool.disconnect_all()
# Rebuild tabs with translated names (same tab keys, just new language)
self._rebuild_tabs(self._tab_keys, restore_tab_key=current_key)
# Restore FilesTab state if it exists in new tab set
files_tab = self._tab_instances.get("files")
if files_tab:
files_tab._local_path = saved_local_path
files_tab._refresh_local()
if alias and had_sftp:
files_tab._remote_path = saved_remote_path
files_tab.set_server(alias)
elif alias:
files_tab.set_server(alias)
# Restore server selection for all other tabs
if alias:
for key, widget in self._tab_instances.items():
if key == "files":
continue # Already handled above
if hasattr(widget, "set_server"):
widget.set_server(alias)
# Update sidebar
self.sidebar.update_language()
def _setup_entry_undo(self):
"""Add Undo/Redo support for tk.Entry widgets (they have no built-in undo).
Tracks text changes per-widget and restores on <<Undo>>/<<Redo>>.
"""
undo_stacks: dict[str, list[str]] = {}
redo_stacks: dict[str, list[str]] = {}
def _save_state(event):
w = event.widget
wid = str(w)
try:
current = w.get()
except Exception:
return
stack = undo_stacks.setdefault(wid, [])
if not stack or stack[-1] != current:
stack.append(current)
if len(stack) > 100:
stack.pop(0)
# Clear redo on new input
redo_stacks.pop(wid, None)
def _on_undo(event):
w = event.widget
wid = str(w)
try:
current = w.get()
except Exception:
return "break"
stack = undo_stacks.get(wid, [])
# Pop current state
while stack and stack[-1] == current:
stack.pop()
if stack:
prev = stack[-1]
redo_stacks.setdefault(wid, []).append(current)
w.delete(0, "end")
w.insert(0, prev)
return "break"
def _on_redo(event):
w = event.widget
wid = str(w)
try:
current = w.get()
except Exception:
return "break"
stack = redo_stacks.get(wid, [])
if stack:
next_val = stack.pop()
undo_stacks.setdefault(wid, []).append(current)
w.delete(0, "end")
w.insert(0, next_val)
return "break"
# Add class-level bindings for Entry
self.tk.eval('''
bind Entry <<Undo>> {}
bind Entry <<Redo>> {}
''')
tkinter.Misc.bind_class(self, "Entry", "<Key>", _save_state, "+")
tkinter.Misc.bind_class(self, "Entry", "<<Undo>>", _on_undo)
tkinter.Misc.bind_class(self, "Entry", "<<Redo>>", _on_redo)
def _setup_keyboard_layout_fix(self):
"""Fix Ctrl+V/C/X/A/Z for non-Latin keyboard layouts (Russian, Chinese, etc.).
Tkinter binds <<Paste>> to <Control-v> by keysym. When the keyboard
layout is Russian, pressing Ctrl+V produces <Control-м> which doesn't
match. This fix intercepts <Control-Key> at the 'all' level by keycode
(layout-independent) and generates the correct virtual events.
"""
# Keycode → virtual event mapping (physical keys, layout-independent)
keycode_to_event = {
86: "<<Paste>>", # V
67: "<<Copy>>", # C
88: "<<Cut>>", # X
65: "<<SelectAll>>", # A
90: "<<Undo>>", # Z
89: "<<Redo>>", # Y
}
def _on_ctrl_key_global(event):
# Only act when Ctrl is held (not Ctrl+Shift, etc.)
if not (event.state & 0x4):
return
# Skip if keysym is already Latin (standard handling works)
if event.keysym in ('v', 'V', 'c', 'C', 'x', 'X', 'a', 'A',
'z', 'Z', 'y', 'Y'):
return
virtual = keycode_to_event.get(event.keycode)
if virtual:
event.widget.event_generate(virtual)
return "break"
# Bypass CTk's bind_all restriction by calling tkinter.Misc directly
tkinter.Misc.bind_all(self, "<Control-Key>", _on_ctrl_key_global, "+")
def _on_tab_changed(self):
"""Handle tab switch — manage terminal focus."""
try:
current = self.tabview.get()
terminal = self._tab_instances.get("terminal")
if terminal and current == _tab_label("terminal"):
terminal._terminal.focus_terminal()
else:
self.focus_set()
except Exception:
pass
def _on_close(self):
# Save window geometry (size + position) and sidebar width
try:
self.store._window_geometry = self.geometry()
# Save sidebar width from PanedWindow sash position
try:
sash_pos = self._paned.sash_coord(0)
if sash_pos:
self.store._sidebar_width = sash_pos[0]
except Exception:
pass
self.store._save_settings()
except Exception:
pass
# Clean up tab instances
for key, widget in self._tab_instances.items():
if hasattr(widget, "on_close"):
try:
widget.on_close()
except Exception:
pass
# Disconnect all sessions before closing
self.session_pool.disconnect_all()
self.checker.stop()
self.updater.stop()
self.destroy()