feat: multi-type server support — SQL, Redis, Grafana, Prometheus, Telnet, WinRM, RDP/VNC

Full implementation of multi-type server management across GUI and CLI:

New clients: SQLClient (MariaDB/MSSQL/PostgreSQL), RedisClient, GrafanaClient,
PrometheusClient, TelnetSession, WinRMClient, RemoteDesktopLauncher.

New GUI tabs: QueryTab (SQL editor + Treeview), RedisTab (console + history),
GrafanaTab (dashboards + alerts), PrometheusTab (PromQL + targets),
PowershellTab (PS/CMD), LaunchTab (RDP/VNC external client).

Infrastructure: TAB_REGISTRY for conditional tabs per server type,
adaptive server_dialog fields, colored type badges in sidebar,
status checker for all types (SSH/TCP/SQL/Redis/HTTP), 100+ i18n keys.

CLI: ssh.py extended with --sql, --redis, --grafana-*, --prom-*, --ps, --cmd.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
chrome-storm-c442
2026-02-24 09:35:24 -05:00
parent 2d1d942ddc
commit eede67e6a9
26 changed files with 3990 additions and 168 deletions

View File

@@ -20,6 +20,43 @@ from gui.tabs.info_tab import InfoTab
from gui.tabs.keys_tab import KeysTab
from gui.tabs.setup_tab import SetupTab
from gui.tabs.totp_tab import TOTPTab
from gui.tabs.query_tab import QueryTab
from gui.tabs.redis_tab import RedisTab
from gui.tabs.grafana_tab import GrafanaTab
from gui.tabs.prometheus_tab import PrometheusTab
from gui.tabs.powershell_tab import PowershellTab
from gui.tabs.launch_tab import LaunchTab
# Tab sets per server type — determines which tabs are shown
TAB_REGISTRY = {
"ssh": ["terminal", "files", "info", "keys", "totp", "setup"],
"telnet": ["terminal", "info", "setup"],
"winrm": ["powershell", "info", "setup"],
"mariadb": ["query", "info", "setup"],
"mssql": ["query", "info", "setup"],
"postgresql": ["query", "info", "setup"],
"redis": ["console", "info", "setup"],
"grafana": ["dashboards", "info", "setup"],
"prometheus": ["metrics", "info", "setup"],
"rdp": ["launch", "info", "setup"],
"vnc": ["launch", "info", "setup"],
}
# Map tab key → widget class (used as lazy factory)
TAB_CLASSES = {
"terminal": TerminalTab,
"files": FilesTab,
"info": InfoTab,
"keys": KeysTab,
"totp": TOTPTab,
"setup": SetupTab,
"query": QueryTab,
"console": RedisTab,
"dashboards": GrafanaTab,
"metrics": PrometheusTab,
"powershell": PowershellTab,
"launch": LaunchTab,
}
class App(ctk.CTk):
@@ -67,11 +104,11 @@ class App(ctk.CTk):
self.sidebar.delete_callback = self._delete_server
# Main area
main = ctk.CTkFrame(self, fg_color="transparent")
main.pack(side="right", fill="both", expand=True)
self._main_frame = ctk.CTkFrame(self, fg_color="transparent")
self._main_frame.pack(side="right", fill="both", expand=True)
# Header bar (language + about)
header_bar = ctk.CTkFrame(main, fg_color="transparent", height=40)
header_bar = ctk.CTkFrame(self._main_frame, fg_color="transparent", height=40)
header_bar.pack(fill="x", padx=10, pady=(8, 0))
header_bar.pack_propagate(False)
@@ -93,39 +130,96 @@ class App(ctk.CTk):
)
self.about_btn.pack(side="right", padx=(5, 5))
# Tabview
self.tabview = ctk.CTkTabview(main, command=self._on_tab_changed)
# Initialize tab tracking
self.tabview = None
self._tab_keys = []
self._tab_instances = {}
# Build default SSH tab set
self._rebuild_tabs(TAB_REGISTRY["ssh"])
def _rebuild_tabs(self, tab_keys: list[str], restore_tab_key: str | None = None):
"""Destroy current tabview and rebuild with the given tab keys."""
# Remember current active tab
if restore_tab_key is None:
restore_tab_key = self._get_current_tab_key() if self._tab_keys else None
# Destroy old tab instances
for key, widget in self._tab_instances.items():
try:
widget.pack_forget()
widget.destroy()
except Exception:
pass
self._tab_instances = {}
# Destroy old tabview
if self.tabview is not None:
try:
self.tabview.destroy()
except Exception:
pass
# Store new tab key list
self._tab_keys = list(tab_keys)
# Create new tabview
self.tabview = ctk.CTkTabview(self._main_frame, command=self._on_tab_changed)
self.tabview.pack(fill="both", expand=True, padx=10, pady=10)
# Tab names stored for language updates
self._tab_keys = ["terminal", "files", "info", "keys", "totp", "setup"]
for key in self._tab_keys:
self.tabview.add(t(key))
self.terminal_tab = TerminalTab(self.tabview.tab(t("terminal")), self.store, self.session_pool)
self.terminal_tab.pack(fill="both", expand=True)
# Create tab instances using TAB_CLASSES factory
for key in self._tab_keys:
cls = TAB_CLASSES.get(key)
if cls is None:
continue
parent = self.tabview.tab(t(key))
widget = self._create_tab_instance(cls, key, parent)
widget.pack(fill="both", expand=True)
self._tab_instances[key] = widget
self.files_tab = FilesTab(self.tabview.tab(t("files")), self.store, self.session_pool)
self.files_tab.pack(fill="both", expand=True)
# Restore previously active tab if still available
if restore_tab_key and restore_tab_key in self._tab_keys:
try:
self.tabview.set(t(restore_tab_key))
except Exception:
pass
self.info_tab = InfoTab(self.tabview.tab(t("info")), self.store, edit_callback=self._edit_server)
self.info_tab.pack(fill="both", expand=True)
self.keys_tab = KeysTab(self.tabview.tab(t("keys")), self.store)
self.keys_tab.pack(fill="both", expand=True)
self.totp_tab = TOTPTab(self.tabview.tab(t("totp")), self.store)
self.totp_tab.pack(fill="both", expand=True)
self.setup_tab = SetupTab(self.tabview.tab(t("setup")), self.store)
self.setup_tab.pack(fill="both", expand=True)
def _create_tab_instance(self, cls, key: str, parent):
"""Create a tab widget instance with the correct constructor args."""
if cls in (TerminalTab, FilesTab):
return cls(parent, self.store, self.session_pool)
elif cls is InfoTab:
return cls(parent, self.store, edit_callback=self._edit_server)
elif cls is SetupTab:
return cls(parent, self.store)
elif cls in (KeysTab, TOTPTab):
return cls(parent, self.store)
else:
# QueryTab, RedisTab, GrafanaTab, PrometheusTab, PowershellTab, LaunchTab
return cls(parent, self.store)
def _on_server_select(self, alias: str):
self.terminal_tab.set_server(alias)
self.files_tab.set_server(alias)
self.info_tab.set_server(alias)
self.keys_tab.set_server(alias)
self.totp_tab.set_server(alias)
# Determine server type and required tabs
if alias:
server = self.store.get_server(alias)
server_type = server.get("type", "ssh") if server else "ssh"
else:
server_type = "ssh"
new_tab_keys = TAB_REGISTRY.get(server_type, TAB_REGISTRY["ssh"])
# Rebuild tabs only if the tab set changed
if new_tab_keys != self._tab_keys:
self._rebuild_tabs(new_tab_keys)
# Notify each tab instance about the selected server
for key, widget in self._tab_instances.items():
if hasattr(widget, "set_server"):
widget.set_server(alias)
# Update session indicators after a short delay (connection is async)
self.after(1500, self.sidebar.update_session_indicators)
@@ -143,7 +237,9 @@ class App(ctk.CTk):
self.sidebar._select(new_alias)
self.session_pool.rename_server(alias, new_alias)
else:
self.info_tab.refresh()
info = self._tab_instances.get("info")
if info and hasattr(info, "refresh"):
info.refresh()
def _delete_server(self, alias: str):
if messagebox.askyesno(t("delete_server"), t("delete_confirm").format(alias=alias)):
@@ -155,7 +251,9 @@ class App(ctk.CTk):
def _on_status_update(self):
self.sidebar.update_statuses()
self.sidebar.update_session_indicators()
self.info_tab.refresh()
info = self._tab_instances.get("info")
if info and hasattr(info, "refresh"):
info.refresh()
def _show_about(self):
AboutDialog(self)
@@ -190,76 +288,42 @@ class App(ctk.CTk):
# Remember selected server
alias = self.sidebar.get_selected()
# Use provided key or default to first tab
current_key = restore_tab_key or self._tab_keys[0]
current_key = restore_tab_key or (self._tab_keys[0] if self._tab_keys else "terminal")
# Save state before destroying tabs
saved_remote_path = self.files_tab._remote_path
saved_local_path = self.files_tab._local_path
had_sftp = self.files_tab._sftp is not None and self.files_tab._sftp.connected
# Save FilesTab state if it exists
files_tab = self._tab_instances.get("files")
saved_remote_path = None
saved_local_path = None
had_sftp = False
if files_tab:
saved_remote_path = files_tab._remote_path
saved_local_path = files_tab._local_path
had_sftp = files_tab._sftp is not None and files_tab._sftp.connected
# Disconnect all sessions in the pool
self.session_pool.disconnect_all()
# Detach tab contents
self.terminal_tab.pack_forget()
self.files_tab.pack_forget()
self.info_tab.pack_forget()
self.keys_tab.pack_forget()
self.totp_tab.pack_forget()
self.setup_tab.pack_forget()
# Rebuild tabs with translated names (same tab keys, just new language)
self._rebuild_tabs(self._tab_keys, restore_tab_key=current_key)
# Get the main frame and destroy old tabview
main = self.tabview.master
self.tabview.destroy()
# Restore FilesTab state if it exists in new tab set
files_tab = self._tab_instances.get("files")
if files_tab:
files_tab._local_path = saved_local_path
files_tab._refresh_local()
if alias and had_sftp:
files_tab._remote_path = saved_remote_path
files_tab.set_server(alias)
elif alias:
files_tab.set_server(alias)
# Create new tabview with translated names
self.tabview = ctk.CTkTabview(main, command=self._on_tab_changed)
self.tabview.pack(fill="both", expand=True, padx=10, pady=10)
for key in self._tab_keys:
self.tabview.add(t(key))
# Re-parent tab contents
self.terminal_tab = TerminalTab(self.tabview.tab(t("terminal")), self.store, self.session_pool)
self.terminal_tab.pack(fill="both", expand=True)
self.files_tab = FilesTab(self.tabview.tab(t("files")), self.store, self.session_pool)
self.files_tab.pack(fill="both", expand=True)
self.info_tab = InfoTab(self.tabview.tab(t("info")), self.store, edit_callback=self._edit_server)
self.info_tab.pack(fill="both", expand=True)
self.keys_tab = KeysTab(self.tabview.tab(t("keys")), self.store)
self.keys_tab.pack(fill="both", expand=True)
self.totp_tab = TOTPTab(self.tabview.tab(t("totp")), self.store)
self.totp_tab.pack(fill="both", expand=True)
self.setup_tab = SetupTab(self.tabview.tab(t("setup")), self.store)
self.setup_tab.pack(fill="both", expand=True)
# Restore active tab by key
try:
self.tabview.set(t(current_key))
except Exception:
pass
# Restore file paths and reconnect properly
self.files_tab._local_path = saved_local_path
self.files_tab._refresh_local()
if alias and had_sftp:
# Had active SFTP — reconnect and restore remote path
self.files_tab._remote_path = saved_remote_path
self.files_tab.set_server(alias)
elif alias:
self.files_tab.set_server(alias)
# Restore server selection for other tabs (terminal auto-reconnects)
# Restore server selection for all other tabs
if alias:
self.terminal_tab.set_server(alias)
self.info_tab.set_server(alias)
self.keys_tab.set_server(alias)
self.totp_tab.set_server(alias)
for key, widget in self._tab_instances.items():
if key == "files":
continue # Already handled above
if hasattr(widget, "set_server"):
widget.set_server(alias)
# Update sidebar
self.sidebar.update_language()
@@ -367,14 +431,22 @@ class App(ctk.CTk):
"""Handle tab switch — manage terminal focus."""
try:
current = self.tabview.get()
if current == t("terminal"):
self.terminal_tab._terminal.focus_terminal()
terminal = self._tab_instances.get("terminal")
if terminal and current == t("terminal"):
terminal._terminal.focus_terminal()
else:
self.focus_set()
except Exception:
pass
def _on_close(self):
# Clean up tab instances
for key, widget in self._tab_instances.items():
if hasattr(widget, "on_close"):
try:
widget.on_close()
except Exception:
pass
# Disconnect all sessions before closing
self.session_pool.disconnect_all()
self.checker.stop()

View File

@@ -1,5 +1,6 @@
"""
Server add/edit dialog — modal window with all server fields.
Form adapts visible fields based on selected server type.
"""
import customtkinter as ctk
@@ -7,6 +8,24 @@ from core.server_store import SERVER_TYPES, DEFAULT_PORTS
from core.i18n import t
# Which conditional fields to show for each server type.
# Fields NOT listed here (alias, ip, type+port, skip_check, notes, buttons)
# are always visible.
FIELD_MAP = {
"ssh": ["user", "password", "totp", "bind_interface"],
"telnet": ["user", "password"],
"winrm": ["user", "password", "use_ssl"],
"mariadb": ["user", "password", "database"],
"mssql": ["user", "password", "database"],
"postgresql": ["user", "password", "database"],
"redis": ["password", "db_index"],
"grafana": ["api_token", "use_ssl"],
"prometheus": ["use_ssl"],
"rdp": ["user", "password"],
"vnc": ["password"],
}
def _get_network_interfaces() -> list[tuple[str, str]]:
"""Return list of (name, ipv4_address) for available network interfaces."""
try:
@@ -30,30 +49,31 @@ class ServerDialog(ctk.CTkToplevel):
self.result = None
self.title(t("edit_server") if server else t("add_server"))
self.geometry("450x680")
self.geometry("450x720")
self.resizable(False, False)
self.grab_set()
# Center on parent
self.transient(master)
self._field_frames: dict[str, ctk.CTkFrame] = {}
self._build_ui(server)
def _build_ui(self, server: dict | None):
pad = {"padx": 20, "pady": (5, 0)}
entry_pad = {"padx": 20, "pady": (2, 5)}
# Alias
# ── Always visible: Alias ──
ctk.CTkLabel(self, text=t("alias"), anchor="w").pack(fill="x", **pad)
self.alias_entry = ctk.CTkEntry(self, placeholder_text=t("placeholder_alias"))
self.alias_entry.pack(fill="x", **entry_pad)
# IP
# ── Always visible: IP ──
ctk.CTkLabel(self, text=t("ip"), anchor="w").pack(fill="x", **pad)
self.ip_entry = ctk.CTkEntry(self, placeholder_text=t("placeholder_ip"))
self.ip_entry.pack(fill="x", **entry_pad)
# Type + Port row
# ── Always visible: Type + Port row ──
row = ctk.CTkFrame(self, fg_color="transparent")
row.pack(fill="x", padx=20, pady=(5, 5))
@@ -73,9 +93,13 @@ class ServerDialog(ctk.CTkToplevel):
self.port_entry = ctk.CTkEntry(port_frame, placeholder_text=t("placeholder_port"))
self.port_entry.pack(fill="x")
# Network interface
ctk.CTkLabel(self, text=t("network_interface"), anchor="w").pack(fill="x", **pad)
self._iface_map: dict[str, str] = {} # display_name -> ip
# ── Conditional fields container — all packed here, shown/hidden dynamically ──
# We use self as parent but wrap each field group in a frame for easy show/hide.
# --- bind_interface ---
f = ctk.CTkFrame(self, fg_color="transparent")
ctk.CTkLabel(f, text=t("network_interface"), anchor="w").pack(fill="x", **pad)
self._iface_map: dict[str, str] = {}
ifaces = _get_network_interfaces()
auto_label = t("auto_default")
iface_values = [auto_label]
@@ -84,43 +108,78 @@ class ServerDialog(ctk.CTkToplevel):
iface_values.append(label)
self._iface_map[label] = ip
self._iface_var = ctk.StringVar(value=auto_label)
self._iface_menu = ctk.CTkOptionMenu(self, values=iface_values, variable=self._iface_var)
self._iface_menu = ctk.CTkOptionMenu(f, values=iface_values, variable=self._iface_var)
self._iface_menu.pack(fill="x", **entry_pad)
self._field_frames["bind_interface"] = f
# User
ctk.CTkLabel(self, text=t("username"), anchor="w").pack(fill="x", **pad)
self.user_entry = ctk.CTkEntry(self, placeholder_text=t("placeholder_user"))
# --- user ---
f = ctk.CTkFrame(self, fg_color="transparent")
ctk.CTkLabel(f, text=t("username"), anchor="w").pack(fill="x", **pad)
self.user_entry = ctk.CTkEntry(f, placeholder_text=t("placeholder_user"))
self.user_entry.pack(fill="x", **entry_pad)
self._field_frames["user"] = f
# Password
ctk.CTkLabel(self, text=t("password"), anchor="w").pack(fill="x", **pad)
pass_frame = ctk.CTkFrame(self, fg_color="transparent")
pass_frame.pack(fill="x", padx=20, pady=(2, 5))
self.password_entry = ctk.CTkEntry(pass_frame, show="*", placeholder_text=t("placeholder_password"))
# --- password ---
f = ctk.CTkFrame(self, fg_color="transparent")
ctk.CTkLabel(f, text=t("password"), anchor="w").pack(fill="x", **pad)
pass_inner = ctk.CTkFrame(f, fg_color="transparent")
pass_inner.pack(fill="x", padx=20, pady=(2, 5))
self.password_entry = ctk.CTkEntry(pass_inner, show="*", placeholder_text=t("placeholder_password"))
self.password_entry.pack(side="left", fill="x", expand=True, padx=(0, 5))
self.show_pass = ctk.CTkButton(pass_frame, text=t("show"), width=60, command=self._toggle_password)
self.show_pass = ctk.CTkButton(pass_inner, text=t("show"), width=60, command=self._toggle_password)
self.show_pass.pack(side="right")
self._pass_visible = False
self._field_frames["password"] = f
# TOTP Secret
ctk.CTkLabel(self, text=t("totp_secret_dialog"), anchor="w").pack(fill="x", **pad)
self.totp_entry = ctk.CTkEntry(self, placeholder_text=t("placeholder_totp_secret"),
# --- totp ---
f = ctk.CTkFrame(self, fg_color="transparent")
ctk.CTkLabel(f, text=t("totp_secret_dialog"), anchor="w").pack(fill="x", **pad)
self.totp_entry = ctk.CTkEntry(f, placeholder_text=t("placeholder_totp_secret"),
font=ctk.CTkFont(family="Consolas", size=12))
self.totp_entry.pack(fill="x", **entry_pad)
self._field_frames["totp"] = f
# Skip status checks
# --- database ---
f = ctk.CTkFrame(self, fg_color="transparent")
ctk.CTkLabel(f, text=t("database"), anchor="w").pack(fill="x", **pad)
self.database_entry = ctk.CTkEntry(f, placeholder_text="mydb")
self.database_entry.pack(fill="x", **entry_pad)
self._field_frames["database"] = f
# --- db_index ---
f = ctk.CTkFrame(self, fg_color="transparent")
ctk.CTkLabel(f, text=t("db_index"), anchor="w").pack(fill="x", **pad)
self.db_index_entry = ctk.CTkEntry(f, placeholder_text="0")
self.db_index_entry.pack(fill="x", **entry_pad)
self._field_frames["db_index"] = f
# --- api_token ---
f = ctk.CTkFrame(self, fg_color="transparent")
ctk.CTkLabel(f, text=t("api_token"), anchor="w").pack(fill="x", **pad)
self.api_token_entry = ctk.CTkEntry(f, show="*", placeholder_text=t("placeholder_api_token"))
self.api_token_entry.pack(fill="x", **entry_pad)
self._field_frames["api_token"] = f
# --- use_ssl ---
f = ctk.CTkFrame(self, fg_color="transparent")
self.use_ssl_var = ctk.BooleanVar(value=False)
self.use_ssl_cb = ctk.CTkCheckBox(f, text=t("use_ssl"), variable=self.use_ssl_var)
self.use_ssl_cb.pack(fill="x", padx=20, pady=(8, 2))
self._field_frames["use_ssl"] = f
# ── Always visible: Skip status checks ──
self.skip_check_var = ctk.BooleanVar(value=False)
self.skip_check_cb = ctk.CTkCheckBox(
self, text=t("skip_check"), variable=self.skip_check_var
)
self.skip_check_cb.pack(fill="x", padx=20, pady=(8, 2))
# Notes
# ── Always visible: Notes ──
ctk.CTkLabel(self, text=t("notes"), anchor="w").pack(fill="x", **pad)
self.notes_entry = ctk.CTkEntry(self, placeholder_text=t("placeholder_notes"))
self.notes_entry.pack(fill="x", **entry_pad)
# Buttons
# ── Always visible: Buttons ──
btn_frame = ctk.CTkFrame(self, fg_color="transparent")
btn_frame.pack(fill="x", padx=20, pady=(15, 20))
ctk.CTkButton(btn_frame, text=t("cancel"), fg_color="#6b7280", command=self.destroy).pack(side="left", expand=True, padx=(0, 5))
@@ -137,6 +196,10 @@ class ServerDialog(ctk.CTkToplevel):
self.totp_entry.insert(0, server.get("totp_secret", ""))
self.skip_check_var.set(server.get("skip_check", False))
self.notes_entry.insert(0, server.get("notes", ""))
self.database_entry.insert(0, server.get("database", ""))
self.db_index_entry.insert(0, str(server.get("db_index", "")))
self.api_token_entry.insert(0, server.get("api_token", ""))
self.use_ssl_var.set(server.get("use_ssl", False))
# Restore network interface selection
saved_ip = server.get("bind_interface")
@@ -155,10 +218,23 @@ class ServerDialog(ctk.CTkToplevel):
self._iface_menu.configure(values=current_values)
self._iface_var.set(unavail_label)
# Apply field visibility for initial type
self._apply_field_visibility(self.type_var.get())
def _apply_field_visibility(self, server_type: str):
"""Hide all conditional fields, then show only those for the given type."""
visible = set(FIELD_MAP.get(server_type, []))
for name, frame in self._field_frames.items():
if name in visible:
frame.pack(fill="x", before=self.skip_check_cb)
else:
frame.pack_forget()
def _on_type_change(self, value):
default_port = DEFAULT_PORTS.get(value, 22)
self.port_entry.delete(0, "end")
self.port_entry.insert(0, str(default_port))
self._apply_field_visibility(value)
def _toggle_password(self):
self._pass_visible = not self._pass_visible
@@ -211,6 +287,32 @@ class ServerDialog(ctk.CTkToplevel):
if bind_ip:
server_data["bind_interface"] = bind_ip
# New conditional fields
visible = set(FIELD_MAP.get(server_type, []))
if "database" in visible:
db = self.database_entry.get().strip()
if db:
server_data["database"] = db
if "db_index" in visible:
db_idx = self.db_index_entry.get().strip()
if db_idx:
try:
server_data["db_index"] = int(db_idx)
except ValueError:
self._show_error(t("db_index_must_be_number"))
return
if "api_token" in visible:
token = self.api_token_entry.get().strip()
if token:
server_data["api_token"] = token
if "use_ssl" in visible:
if self.use_ssl_var.get():
server_data["use_ssl"] = True
try:
if self.editing:
if alias != self._original_alias and self.store.get_server(alias):

View File

@@ -6,6 +6,34 @@ import customtkinter as ctk
from core.i18n import t
from gui.widgets.status_badge import StatusBadge
TYPE_COLORS = {
"ssh": "#22c55e",
"telnet": "#a855f7",
"rdp": "#3b82f6",
"vnc": "#6366f1",
"winrm": "#0ea5e9",
"mariadb": "#f59e0b",
"mssql": "#ef4444",
"postgresql": "#3b82f6",
"redis": "#dc2626",
"grafana": "#f97316",
"prometheus": "#e11d48",
}
TYPE_LABELS = {
"ssh": "SSH",
"telnet": "TEL",
"rdp": "RDP",
"vnc": "VNC",
"winrm": "PS",
"mariadb": "MDB",
"mssql": "SQL",
"postgresql": "PG",
"redis": "RDS",
"grafana": "GRF",
"prometheus": "PRM",
}
class Sidebar(ctk.CTkFrame):
def __init__(self, master, store, on_select=None, session_pool=None):
@@ -101,6 +129,17 @@ class Sidebar(ctk.CTkFrame):
badge.pack(side="left", padx=(10, 5), pady=10)
self._badges[alias] = badge
# Type badge (colored short label)
type_color = TYPE_COLORS.get(stype, "#6b7280")
type_label_text = TYPE_LABELS.get(stype, stype.upper()[:3])
type_badge = ctk.CTkLabel(
frame, text=type_label_text,
font=ctk.CTkFont(size=9, weight="bold"),
text_color=type_color,
width=30
)
type_badge.pack(side="left", padx=(0, 2), pady=10)
# Active session indicator (right side)
session_ind = ctk.CTkLabel(
frame, text="", width=12, height=12,
@@ -117,12 +156,11 @@ class Sidebar(ctk.CTkFrame):
name_label = ctk.CTkLabel(info, text=alias, font=ctk.CTkFont(size=13, weight="bold"), anchor="w")
name_label.pack(fill="x")
detail = f"{ip} [{stype}]"
detail_label = ctk.CTkLabel(info, text=detail, font=ctk.CTkFont(size=10), text_color="#9ca3af", anchor="w")
detail_label = ctk.CTkLabel(info, text=ip, font=ctk.CTkFont(size=10), text_color="#9ca3af", anchor="w")
detail_label.pack(fill="x")
# Click handlers
for widget in [frame, info, name_label, detail_label, badge, session_ind]:
for widget in [frame, info, name_label, detail_label, badge, type_badge, session_ind]:
widget.bind("<Button-1>", lambda e, a=alias: self._select(a))
self._server_frames[alias] = frame

202
gui/tabs/grafana_tab.py Normal file
View File

@@ -0,0 +1,202 @@
"""
Grafana tab — dashboards browser and alerts overview.
"""
import threading
import webbrowser
from tkinter import ttk
import customtkinter as ctk
from core.grafana_client import GrafanaClient
from core.i18n import t
class GrafanaTab(ctk.CTkFrame):
def __init__(self, master, store):
super().__init__(master, fg_color="transparent")
self.store = store
self._current_alias: str | None = None
self._client: GrafanaClient | None = None
self._dashboards: list[dict] = []
self._build_ui()
def _build_ui(self):
# ── Header + Refresh ──
header_frame = ctk.CTkFrame(self, fg_color="transparent")
header_frame.pack(fill="x", padx=15, pady=(15, 5))
title = ctk.CTkLabel(header_frame, text=t("grafana_title"),
font=ctk.CTkFont(size=18, weight="bold"))
title.pack(side="left")
self._refresh_btn = ctk.CTkButton(header_frame, text=t("grafana_refresh"), width=100,
command=self._refresh)
self._refresh_btn.pack(side="right")
# ── Dashboards section ──
dash_label = ctk.CTkLabel(self, text=t("grafana_dashboards"),
font=ctk.CTkFont(size=14, weight="bold"), anchor="w")
dash_label.pack(fill="x", padx=15, pady=(10, 3))
dash_frame = ctk.CTkFrame(self, fg_color="transparent")
dash_frame.pack(fill="both", expand=True, padx=15, pady=(0, 5))
columns = ("uid", "title", "folder")
self._dash_tree = ttk.Treeview(dash_frame, columns=columns, show="headings",
selectmode="browse", height=8)
self._dash_tree.heading("uid", text="UID")
self._dash_tree.heading("title", text=t("grafana_dash_title"))
self._dash_tree.heading("folder", text=t("grafana_dash_folder"))
self._dash_tree.column("uid", width=120, minwidth=80)
self._dash_tree.column("title", width=300, minwidth=150)
self._dash_tree.column("folder", width=150, minwidth=80)
self._dash_tree.pack(side="left", fill="both", expand=True)
dash_scroll = ttk.Scrollbar(dash_frame, orient="vertical", command=self._dash_tree.yview)
dash_scroll.pack(side="right", fill="y")
self._dash_tree.configure(yscrollcommand=dash_scroll.set)
self._dash_tree.bind("<Double-1>", self._on_dashboard_click)
# ── Alerts section ──
alerts_label = ctk.CTkLabel(self, text=t("grafana_alerts"),
font=ctk.CTkFont(size=14, weight="bold"), anchor="w")
alerts_label.pack(fill="x", padx=15, pady=(10, 3))
alerts_frame = ctk.CTkFrame(self, fg_color="transparent")
alerts_frame.pack(fill="both", expand=True, padx=15, pady=(0, 5))
alert_columns = ("state", "name", "severity")
self._alerts_tree = ttk.Treeview(alerts_frame, columns=alert_columns, show="headings",
selectmode="browse", height=6)
self._alerts_tree.heading("state", text=t("grafana_alert_state"))
self._alerts_tree.heading("name", text=t("grafana_alert_name"))
self._alerts_tree.heading("severity", text=t("grafana_alert_severity"))
self._alerts_tree.column("state", width=100, minwidth=60)
self._alerts_tree.column("name", width=300, minwidth=150)
self._alerts_tree.column("severity", width=100, minwidth=60)
self._alerts_tree.pack(side="left", fill="both", expand=True)
alerts_scroll = ttk.Scrollbar(alerts_frame, orient="vertical", command=self._alerts_tree.yview)
alerts_scroll.pack(side="right", fill="y")
self._alerts_tree.configure(yscrollcommand=alerts_scroll.set)
# ── Status bar ──
self._status_bar = ctk.CTkLabel(self, text=t("grafana_no_server"), anchor="w",
font=ctk.CTkFont(size=11), text_color="#9ca3af")
self._status_bar.pack(fill="x", padx=15, pady=(5, 10))
# ── Public API ──
def set_server(self, alias: str | None):
"""Called when user selects a server in sidebar."""
self._current_alias = alias
self._client = None
self._dashboards.clear()
self._clear_tables()
if alias:
self._set_status(t("grafana_connected").format(alias=alias), "#22c55e")
self._refresh()
else:
self._set_status(t("grafana_no_server"), "#9ca3af")
# ── Refresh ──
def _refresh(self):
if not self._current_alias:
self._set_status(t("no_server_selected"), "#ef4444")
return
self._refresh_btn.configure(state="disabled", text=t("grafana_loading"))
self._set_status(t("grafana_loading"), "#ccaa00")
def _do():
try:
client = self._get_client()
dashboards = client.list_dashboards()
alerts = client.list_alerts()
self.after(0, lambda: self._populate_dashboards(dashboards))
self.after(0, lambda: self._populate_alerts(alerts))
self.after(0, lambda: self._set_status(
t("grafana_loaded").format(
dashboards=len(dashboards), alerts=len(alerts)
), "#22c55e"))
except Exception as e:
self.after(0, lambda: self._set_status(f"(error) {e}", "#ef4444"))
finally:
self.after(0, lambda: self._refresh_btn.configure(
state="normal", text=t("grafana_refresh")))
threading.Thread(target=_do, daemon=True).start()
def _get_client(self) -> GrafanaClient:
if self._client is None:
self._client = GrafanaClient(self._current_alias, self.store)
return self._client
# ── Table population ──
def _populate_dashboards(self, dashboards: list[dict]):
self._dash_tree.delete(*self._dash_tree.get_children())
self._dashboards = dashboards
for d in dashboards:
uid = d.get("uid", "")
title = d.get("title", "")
folder = d.get("folderTitle", d.get("folder", "General"))
self._dash_tree.insert("", "end", values=(uid, title, folder))
def _populate_alerts(self, alerts: list[dict]):
self._alerts_tree.delete(*self._alerts_tree.get_children())
for a in alerts:
state = a.get("state", a.get("status", "unknown"))
name = a.get("name", a.get("title", ""))
severity = a.get("severity", a.get("labels", {}).get("severity", ""))
tag = ""
if state in ("alerting", "firing"):
tag = "alerting"
elif state in ("ok", "normal", "inactive"):
tag = "ok"
self._alerts_tree.insert("", "end", values=(state, name, severity), tags=(tag,))
# Color-code alert states
self._alerts_tree.tag_configure("alerting", foreground="#ef4444")
self._alerts_tree.tag_configure("ok", foreground="#22c55e")
def _clear_tables(self):
self._dash_tree.delete(*self._dash_tree.get_children())
self._alerts_tree.delete(*self._alerts_tree.get_children())
# ── Events ──
def _on_dashboard_click(self, _event):
"""Open dashboard URL in browser on double-click."""
selection = self._dash_tree.selection()
if not selection:
return
item = self._dash_tree.item(selection[0])
uid = item["values"][0] if item["values"] else None
if not uid:
return
# Find the dashboard data to get the URL
for d in self._dashboards:
if d.get("uid") == uid:
url = d.get("url", "")
if url:
try:
client = self._get_client()
full_url = client.get_dashboard_url(url)
webbrowser.open(full_url)
except Exception:
# Fallback: just open relative URL
webbrowser.open(url)
break
# ── Helpers ──
def _set_status(self, text: str, color: str = "#9ca3af"):
self._status_bar.configure(text=text, text_color=color)

View File

@@ -8,17 +8,25 @@ from core.i18n import t
class InfoTab(ctk.CTkFrame):
# Map field keys to i18n keys
_FIELD_KEYS = ["alias", "ip", "port", "user", "type", "notes", "status"]
_FIELD_KEYS = ["alias", "ip", "port", "user", "type", "database", "db_index", "ssl", "notes", "status"]
_FIELD_I18N = {
"alias": "info_alias",
"ip": "info_ip",
"port": "info_port",
"user": "info_user",
"type": "info_type",
"database": "info_database",
"db_index": "info_db_index",
"ssl": "info_ssl",
"notes": "info_notes",
"status": "info_status",
}
# Which fields are relevant per server type
_SQL_TYPES = {"mariadb", "mssql", "postgresql"}
_SSL_TYPES = {"grafana", "prometheus", "winrm"}
_NO_USER_TYPES = {"redis", "grafana", "prometheus"}
def __init__(self, master, store, edit_callback=None):
super().__init__(master, fg_color="transparent")
self.store = store
@@ -65,12 +73,39 @@ class InfoTab(ctk.CTkFrame):
if not server:
return
stype = server.get("type", "ssh").lower()
self.header.configure(text=server["alias"])
self._fields["alias"].configure(text=server.get("alias", "-"))
self._fields["ip"].configure(text=server.get("ip", "-"))
self._fields["port"].configure(text=str(server.get("port", 22)))
self._fields["user"].configure(text=server.get("user", "root"))
self._fields["type"].configure(text=server.get("type", "ssh").upper())
# Hide user for types that don't use it
if stype in self._NO_USER_TYPES:
self._fields["user"].configure(text="-")
else:
self._fields["user"].configure(text=server.get("user", "root"))
self._fields["type"].configure(text=stype.upper())
# Database field — relevant for SQL types
if stype in self._SQL_TYPES:
self._fields["database"].configure(text=server.get("database", "-"))
else:
self._fields["database"].configure(text="-")
# DB index — relevant for redis
if stype == "redis":
self._fields["db_index"].configure(text=str(server.get("db_index", 0)))
else:
self._fields["db_index"].configure(text="-")
# SSL — relevant for grafana, prometheus, winrm
if stype in self._SSL_TYPES:
self._fields["ssl"].configure(text="Yes" if server.get("use_ssl") else "No")
else:
self._fields["ssl"].configure(text="-")
self._fields["notes"].configure(text=server.get("notes", "-") or "-")
status = self.store.get_status(self._current_alias)

110
gui/tabs/launch_tab.py Normal file
View File

@@ -0,0 +1,110 @@
"""
Launch tab — connect button for RDP/VNC remote desktop sessions.
"""
import threading
import customtkinter as ctk
from core.remote_desktop import RemoteDesktopLauncher
from core.i18n import t
class LaunchTab(ctk.CTkFrame):
"""Minimal tab: server info + big Connect button for RDP/VNC."""
def __init__(self, master, store):
super().__init__(master, fg_color="transparent")
self.store = store
self._current_alias: str | None = None
self._server_type: str | None = None # "rdp" or "vnc"
self._build_ui()
def _build_ui(self):
# Server info label
self._info_label = ctk.CTkLabel(
self, text=t("no_server_selected_info"),
font=ctk.CTkFont(size=16), wraplength=400,
)
self._info_label.pack(padx=20, pady=(40, 20))
# Big connect button
self._connect_btn = ctk.CTkButton(
self, text=t("launch_connect"),
font=ctk.CTkFont(size=18, weight="bold"),
width=220, height=50,
command=self._on_connect,
)
self._connect_btn.pack(pady=20)
self._connect_btn.configure(state="disabled")
# Status / result label
self._status_label = ctk.CTkLabel(
self, text="", font=ctk.CTkFont(size=13),
text_color="#888888", wraplength=400,
)
self._status_label.pack(padx=20, pady=(10, 0))
def set_server(self, alias: str | None):
self._current_alias = alias
self._status_label.configure(text="", text_color="#888888")
if alias is None:
self._info_label.configure(text=t("no_server_selected_info"))
self._connect_btn.configure(state="disabled")
self._server_type = None
return
server = self.store.get_server(alias)
if not server:
self._info_label.configure(text=t("server_not_found").format(alias=alias))
self._connect_btn.configure(state="disabled")
self._server_type = None
return
stype = server.get("type", "").lower()
self._server_type = stype
if stype == "rdp":
info_text = t("launch_rdp_info").format(alias=alias)
elif stype == "vnc":
info_text = t("launch_vnc_info").format(alias=alias)
else:
info_text = f"{alias} ({stype.upper()})"
self._info_label.configure(text=info_text)
self._connect_btn.configure(state="normal")
def _on_connect(self):
if not self._current_alias or not self._server_type:
return
server = self.store.get_server(self._current_alias)
if not server:
return
self._connect_btn.configure(state="disabled")
self._status_label.configure(
text=t("launch_starting"), text_color="#ccaa00",
)
stype = self._server_type
def _do():
try:
if stype == "rdp":
RemoteDesktopLauncher.launch_rdp(server)
elif stype == "vnc":
RemoteDesktopLauncher.launch_vnc(server)
self.after(0, lambda: self._status_label.configure(
text=t("launch_started"), text_color="#44cc44",
))
except Exception as exc:
self.after(0, lambda: self._status_label.configure(
text=t("launch_error").format(error=str(exc)),
text_color="#ff4444",
))
finally:
self.after(0, lambda: self._connect_btn.configure(state="normal"))
threading.Thread(target=_do, daemon=True).start()

242
gui/tabs/powershell_tab.py Normal file
View File

@@ -0,0 +1,242 @@
"""
PowerShell/CMD tab — request-response terminal for WinRM servers.
No pyte needed: WinRM is not an interactive PTY, just command → output.
"""
import threading
import customtkinter as ctk
from core.winrm_client import WinRMClient
from core.i18n import t
class PowershellTab(ctk.CTkFrame):
"""Simplified terminal for WinRM command execution (PS or CMD)."""
_MAX_HISTORY = 200
def __init__(self, master, store):
super().__init__(master, fg_color="transparent")
self.store = store
self._current_alias: str | None = None
self._client: WinRMClient | None = None
self._mode: str = "ps" # "ps" or "cmd"
self._history: list[str] = []
self._history_index: int = -1
self._running = False
self._build_ui()
# ── UI construction ──────────────────────────────────────────────
def _build_ui(self):
# Top bar: mode toggle
top = ctk.CTkFrame(self, fg_color="transparent")
top.pack(fill="x", padx=8, pady=(8, 0))
self._mode_var = ctk.StringVar(value="ps")
self._ps_radio = ctk.CTkRadioButton(
top, text=t("ps_mode_ps"), variable=self._mode_var,
value="ps", command=self._on_mode_changed,
)
self._ps_radio.pack(side="left", padx=(0, 12))
self._cmd_radio = ctk.CTkRadioButton(
top, text=t("ps_mode_cmd"), variable=self._mode_var,
value="cmd", command=self._on_mode_changed,
)
self._cmd_radio.pack(side="left")
# Output console
self._output = ctk.CTkTextbox(
self, font=ctk.CTkFont(family="Consolas", size=13),
state="disabled", wrap="word",
)
self._output.pack(fill="both", expand=True, padx=8, pady=8)
# Input row: entry + execute button
input_row = ctk.CTkFrame(self, fg_color="transparent")
input_row.pack(fill="x", padx=8, pady=(0, 4))
self._entry = ctk.CTkEntry(
input_row, placeholder_text="PS> ...",
font=ctk.CTkFont(family="Consolas", size=13),
)
self._entry.pack(side="left", fill="x", expand=True, padx=(0, 6))
self._entry.bind("<Return>", lambda e: self._execute())
self._entry.bind("<Up>", lambda e: self._history_navigate(-1))
self._entry.bind("<Down>", lambda e: self._history_navigate(1))
self._exec_btn = ctk.CTkButton(
input_row, text=t("ps_execute"), width=90,
command=self._execute,
)
self._exec_btn.pack(side="right")
# Status bar
self._status = ctk.CTkLabel(
self, text="", anchor="w",
font=ctk.CTkFont(size=11), text_color="#888888",
)
self._status.pack(fill="x", padx=10, pady=(0, 6))
# ── Public API ───────────────────────────────────────────────────
def set_server(self, alias: str | None):
"""Switch to a different server (or None to disconnect)."""
if alias == self._current_alias:
return
self._disconnect()
self._current_alias = alias
self._history.clear()
self._history_index = -1
if alias is None:
self._set_status(t("ps_disconnected"), "#888888")
return
self._connect(alias)
# ── Connection ───────────────────────────────────────────────────
def _connect(self, alias: str):
server = self.store.get_server(alias)
if not server:
self._set_status(t("server_not_found").format(alias=alias), "#ff4444")
return
self._set_status(t("ps_connecting").format(alias=alias), "#ccaa00")
def _do():
try:
client = WinRMClient(server)
client.connect()
self._client = client
self.after(0, lambda: self._set_status(
t("ps_connected").format(alias=alias), "#44cc44",
))
self.after(0, lambda: self._entry.focus())
except Exception as exc:
self.after(0, lambda: self._set_status(
t("ps_connect_failed").format(error=str(exc)), "#ff4444",
))
threading.Thread(target=_do, daemon=True).start()
def _disconnect(self):
if self._client:
try:
self._client.close()
except Exception:
pass
self._client = None
# ── Command execution ────────────────────────────────────────────
def _execute(self):
cmd = self._entry.get().strip()
if not cmd:
return
if not self._client:
self._set_status(t("ps_not_connected"), "#ff4444")
return
if self._running:
return
# Save to history
if not self._history or self._history[-1] != cmd:
self._history.append(cmd)
if len(self._history) > self._MAX_HISTORY:
self._history.pop(0)
self._history_index = -1
# Show command in output
prompt = "PS>" if self._mode == "ps" else "CMD>"
self._append_output(f"\n{prompt} {cmd}\n")
self._entry.delete(0, "end")
self._running = True
self._exec_btn.configure(state="disabled")
self._set_status(t("ps_running"), "#ccaa00")
mode = self._mode
client = self._client
def _run():
try:
if mode == "ps":
result = client.exec_ps(cmd)
else:
result = client.exec_cmd(cmd)
stdout = result.get("stdout", "")
stderr = result.get("stderr", "")
rc = result.get("return_code", None)
def _show():
if stdout:
self._append_output(stdout)
if stderr:
self._append_output(f"[STDERR] {stderr}")
if rc is not None and rc != 0:
self._append_output(f"[Exit code: {rc}]")
self._set_status(t("ps_done"), "#44cc44")
self.after(0, _show)
except Exception as exc:
self.after(0, lambda: self._append_output(
f"\n[ERROR] {exc}\n"
))
self.after(0, lambda: self._set_status(
t("ps_exec_error"), "#ff4444",
))
finally:
self._running = False
self.after(0, lambda: self._exec_btn.configure(state="normal"))
threading.Thread(target=_run, daemon=True).start()
# ── History navigation ───────────────────────────────────────────
def _history_navigate(self, direction: int):
"""Navigate command history. direction: -1 = older, +1 = newer."""
if not self._history:
self._set_status(t("ps_history_empty"), "#888888")
return
if self._history_index == -1:
if direction == -1:
self._history_index = len(self._history) - 1
else:
return
else:
self._history_index += direction
if self._history_index < 0:
self._history_index = 0
elif self._history_index >= len(self._history):
self._history_index = -1
self._entry.delete(0, "end")
return
self._entry.delete(0, "end")
self._entry.insert(0, self._history[self._history_index])
# ── Mode toggle ──────────────────────────────────────────────────
def _on_mode_changed(self):
self._mode = self._mode_var.get()
placeholder = "PS> ..." if self._mode == "ps" else "CMD> ..."
self._entry.configure(placeholder_text=placeholder)
# ── Helpers ──────────────────────────────────────────────────────
def _append_output(self, text: str):
self._output.configure(state="normal")
self._output.insert("end", text)
self._output.see("end")
self._output.configure(state="disabled")
def _set_status(self, text: str, color: str = "#888888"):
self._status.configure(text=text, text_color=color)

266
gui/tabs/prometheus_tab.py Normal file
View File

@@ -0,0 +1,266 @@
"""
Prometheus tab — PromQL query executor, targets overview, and alerts.
"""
import threading
from tkinter import ttk
import customtkinter as ctk
from core.prometheus_client import PrometheusClient
from core.i18n import t
class PrometheusTab(ctk.CTkFrame):
def __init__(self, master, store):
super().__init__(master, fg_color="transparent")
self.store = store
self._current_alias: str | None = None
self._client: PrometheusClient | None = None
self._build_ui()
def _build_ui(self):
# ── PromQL query section ──
query_frame = ctk.CTkFrame(self, fg_color="transparent")
query_frame.pack(fill="x", padx=15, pady=(15, 5))
query_label = ctk.CTkLabel(query_frame, text=t("prom_query"),
font=ctk.CTkFont(size=14, weight="bold"), anchor="w")
query_label.pack(side="left", padx=(0, 10))
self._query_entry = ctk.CTkEntry(query_frame,
placeholder_text=t("prom_query_placeholder"),
font=ctk.CTkFont(family="Consolas", size=13))
self._query_entry.pack(side="left", fill="x", expand=True, padx=(0, 10))
self._query_entry.bind("<Return>", lambda e: self._execute_query())
self._exec_btn = ctk.CTkButton(query_frame, text=t("prom_execute"), width=90,
command=self._execute_query)
self._exec_btn.pack(side="left")
# ── Query results ──
results_label = ctk.CTkLabel(self, text=t("prom_results"),
font=ctk.CTkFont(size=12, weight="bold"), anchor="w")
results_label.pack(fill="x", padx=15, pady=(10, 3))
self._results_box = ctk.CTkTextbox(self, height=150,
font=ctk.CTkFont(family="Consolas", size=12),
state="disabled")
self._results_box.pack(fill="x", padx=15, pady=(0, 5))
# ── Targets section ──
targets_header = ctk.CTkFrame(self, fg_color="transparent")
targets_header.pack(fill="x", padx=15, pady=(10, 3))
targets_label = ctk.CTkLabel(targets_header, text=t("prom_targets"),
font=ctk.CTkFont(size=14, weight="bold"), anchor="w")
targets_label.pack(side="left")
self._refresh_btn = ctk.CTkButton(targets_header, text=t("prom_refresh"), width=90,
command=self._refresh_all)
self._refresh_btn.pack(side="right")
targets_frame = ctk.CTkFrame(self, fg_color="transparent")
targets_frame.pack(fill="both", expand=True, padx=15, pady=(0, 5))
target_columns = ("job", "instance", "health", "last_scrape")
self._targets_tree = ttk.Treeview(targets_frame, columns=target_columns, show="headings",
selectmode="browse", height=6)
self._targets_tree.heading("job", text=t("prom_target_job"))
self._targets_tree.heading("instance", text=t("prom_target_instance"))
self._targets_tree.heading("health", text=t("prom_target_health"))
self._targets_tree.heading("last_scrape", text=t("prom_target_scrape"))
self._targets_tree.column("job", width=120, minwidth=80)
self._targets_tree.column("instance", width=200, minwidth=120)
self._targets_tree.column("health", width=80, minwidth=60)
self._targets_tree.column("last_scrape", width=150, minwidth=80)
self._targets_tree.pack(side="left", fill="both", expand=True)
targets_scroll = ttk.Scrollbar(targets_frame, orient="vertical",
command=self._targets_tree.yview)
targets_scroll.pack(side="right", fill="y")
self._targets_tree.configure(yscrollcommand=targets_scroll.set)
# ── Alerts section ──
alerts_label = ctk.CTkLabel(self, text=t("prom_alerts"),
font=ctk.CTkFont(size=14, weight="bold"), anchor="w")
alerts_label.pack(fill="x", padx=15, pady=(10, 3))
self._alerts_box = ctk.CTkTextbox(self, height=100,
font=ctk.CTkFont(family="Consolas", size=12),
state="disabled")
self._alerts_box.pack(fill="x", padx=15, pady=(0, 5))
# ── Status bar ──
self._status_bar = ctk.CTkLabel(self, text=t("prom_no_server"), anchor="w",
font=ctk.CTkFont(size=11), text_color="#9ca3af")
self._status_bar.pack(fill="x", padx=15, pady=(5, 10))
# ── Public API ──
def set_server(self, alias: str | None):
"""Called when user selects a server in sidebar."""
self._current_alias = alias
self._client = None
self._clear_all()
if alias:
self._set_status(t("prom_connected").format(alias=alias), "#22c55e")
self._refresh_all()
else:
self._set_status(t("prom_no_server"), "#9ca3af")
# ── PromQL execution ──
def _execute_query(self):
query = self._query_entry.get().strip()
if not query:
return
if not self._current_alias:
self._set_results(t("no_server_selected"))
return
self._exec_btn.configure(state="disabled")
self._set_results(t("prom_executing"))
def _do():
try:
client = self._get_client()
result = client.query(query)
formatted = self._format_query_result(result)
self.after(0, lambda: self._set_results(formatted))
except Exception as e:
self.after(0, lambda: self._set_results(f"(error) {e}"))
finally:
self.after(0, lambda: self._exec_btn.configure(state="normal"))
threading.Thread(target=_do, daemon=True).start()
def _format_query_result(self, result: dict) -> str:
"""Format Prometheus query API response for display."""
status = result.get("status", "unknown")
if status != "success":
error = result.get("error", "Unknown error")
return f"Error: {error}"
data = result.get("data", {})
result_type = data.get("resultType", "")
results = data.get("result", [])
if not results:
return "(empty result)"
lines = [f"# Type: {result_type}", f"# Results: {len(results)}", ""]
for item in results:
metric = item.get("metric", {})
metric_str = ", ".join(f'{k}="{v}"' for k, v in metric.items())
if result_type == "vector":
value = item.get("value", [None, ""])[1]
lines.append(f"{{{metric_str}}} => {value}")
elif result_type == "matrix":
values = item.get("values", [])
lines.append(f"{{{metric_str}}}")
for ts, val in values[-10:]: # Show last 10 points
lines.append(f" @{ts} => {val}")
if len(values) > 10:
lines.append(f" ... ({len(values)} total points)")
else:
lines.append(f"{{{metric_str}}} => {item}")
return "\n".join(lines)
# ── Refresh targets & alerts ──
def _refresh_all(self):
if not self._current_alias:
self._set_status(t("no_server_selected"), "#ef4444")
return
self._refresh_btn.configure(state="disabled", text=t("prom_loading"))
self._set_status(t("prom_loading"), "#ccaa00")
def _do():
try:
client = self._get_client()
targets = client.get_targets()
alerts = client.get_alerts()
self.after(0, lambda: self._populate_targets(targets))
self.after(0, lambda: self._populate_alerts(alerts))
self.after(0, lambda: self._set_status(
t("prom_loaded").format(
targets=len(targets), alerts=len(alerts)
), "#22c55e"))
except Exception as e:
self.after(0, lambda: self._set_status(f"(error) {e}", "#ef4444"))
finally:
self.after(0, lambda: self._refresh_btn.configure(
state="normal", text=t("prom_refresh")))
threading.Thread(target=_do, daemon=True).start()
def _get_client(self) -> PrometheusClient:
if self._client is None:
self._client = PrometheusClient(self._current_alias, self.store)
return self._client
# ── Table population ──
def _populate_targets(self, targets: list[dict]):
self._targets_tree.delete(*self._targets_tree.get_children())
for target in targets:
job = target.get("labels", {}).get("job", "")
instance = target.get("labels", {}).get("instance", "")
health = target.get("health", "unknown")
last_scrape = target.get("lastScrape", "")
tag = ""
if health == "up":
tag = "up"
elif health == "down":
tag = "down"
self._targets_tree.insert("", "end",
values=(job, instance, health, last_scrape),
tags=(tag,))
self._targets_tree.tag_configure("up", foreground="#22c55e")
self._targets_tree.tag_configure("down", foreground="#ef4444")
def _populate_alerts(self, alerts: list[dict]):
self._alerts_box.configure(state="normal")
self._alerts_box.delete("1.0", "end")
if not alerts:
self._alerts_box.insert("1.0", t("prom_no_alerts"))
else:
lines = []
for a in alerts:
name = a.get("labels", {}).get("alertname", a.get("name", "unknown"))
state = a.get("state", "unknown")
severity = a.get("labels", {}).get("severity", "")
lines.append(f"[{state.upper()}] {name} (severity: {severity})")
self._alerts_box.insert("1.0", "\n".join(lines))
self._alerts_box.configure(state="disabled")
# ── Helpers ──
def _set_results(self, text: str):
self._results_box.configure(state="normal")
self._results_box.delete("1.0", "end")
self._results_box.insert("1.0", text)
self._results_box.configure(state="disabled")
def _clear_all(self):
self._targets_tree.delete(*self._targets_tree.get_children())
self._set_results("")
self._alerts_box.configure(state="normal")
self._alerts_box.delete("1.0", "end")
self._alerts_box.configure(state="disabled")
def _set_status(self, text: str, color: str = "#9ca3af"):
self._status_bar.configure(text=text, text_color=color)

336
gui/tabs/query_tab.py Normal file
View File

@@ -0,0 +1,336 @@
"""
Query tab — SQL database interaction with editor, results grid, and export.
"""
import csv
import io
import time
import threading
from tkinter import ttk, filedialog
import customtkinter as ctk
from core.i18n import t
from core.sql_client import SQLClient
class QueryTab(ctk.CTkFrame):
def __init__(self, master, store):
super().__init__(master, fg_color="transparent")
self._current_alias: str | None = None
self.store = store
self._client: SQLClient | None = None
self._results: list[list] = []
self._columns: list[str] = []
self._executing = False
self._build_ui()
# ── UI construction ────────────────────────────────────────────
def _build_ui(self):
# === Database selector row ===
db_row = ctk.CTkFrame(self, fg_color="transparent")
db_row.pack(fill="x", padx=10, pady=(10, 5))
ctk.CTkLabel(db_row, text=t("query_database"), anchor="w").pack(
side="left", padx=(0, 8)
)
self._db_var = ctk.StringVar(value="")
self._db_combo = ctk.CTkComboBox(
db_row,
variable=self._db_var,
values=[],
width=220,
command=self._on_db_selected,
)
self._db_combo.pack(side="left")
# === SQL Editor ===
editor_frame = ctk.CTkFrame(self, fg_color="transparent")
editor_frame.pack(fill="both", expand=True, padx=10, pady=5, side="top")
# Give editor roughly 1/3 of space
editor_frame.pack_configure(expand=False)
self._editor = ctk.CTkTextbox(
editor_frame,
font=ctk.CTkFont(family="Consolas", size=13),
height=160,
wrap="none",
)
self._editor.pack(fill="both", expand=True)
self._editor.insert("0.0", t("query_editor_placeholder"))
self._editor.bind("<FocusIn>", self._on_editor_focus)
# Bind keyboard shortcuts
self._editor.bind("<F5>", lambda e: self._execute_query())
self._editor.bind("<Control-Return>", lambda e: self._execute_query())
# === Button row ===
btn_row = ctk.CTkFrame(self, fg_color="transparent")
btn_row.pack(fill="x", padx=10, pady=5)
self._exec_btn = ctk.CTkButton(
btn_row,
text=f"{t('query_execute')} (F5)",
command=self._execute_query,
width=130,
fg_color="#2563eb",
hover_color="#1d4ed8",
)
self._exec_btn.pack(side="left", padx=(0, 6))
self._clear_btn = ctk.CTkButton(
btn_row,
text=t("query_clear"),
command=self._clear_all,
width=80,
fg_color="#6b7280",
hover_color="#4b5563",
)
self._clear_btn.pack(side="left", padx=(0, 6))
self._export_btn = ctk.CTkButton(
btn_row,
text=t("query_export_csv"),
command=self._export_csv,
width=110,
fg_color="#059669",
hover_color="#047857",
)
self._export_btn.pack(side="left")
# === Results area (Treeview) ===
results_frame = ctk.CTkFrame(self, fg_color="transparent")
results_frame.pack(fill="both", expand=True, padx=10, pady=(5, 5))
# Horizontal scrollbar
self._tree_xscroll = ttk.Scrollbar(results_frame, orient="horizontal")
self._tree_xscroll.pack(side="bottom", fill="x")
# Vertical scrollbar
self._tree_yscroll = ttk.Scrollbar(results_frame, orient="vertical")
self._tree_yscroll.pack(side="right", fill="y")
self._tree = ttk.Treeview(
results_frame,
show="headings",
xscrollcommand=self._tree_xscroll.set,
yscrollcommand=self._tree_yscroll.set,
)
self._tree.pack(fill="both", expand=True)
self._tree_xscroll.config(command=self._tree.xview)
self._tree_yscroll.config(command=self._tree.yview)
# === Status bar ===
self._status_label = ctk.CTkLabel(
self,
text="",
anchor="w",
font=ctk.CTkFont(size=12),
text_color="#9ca3af",
)
self._status_label.pack(fill="x", padx=12, pady=(0, 8))
# ── Editor placeholder logic ───────────────────────────────────
def _on_editor_focus(self, event=None):
content = self._editor.get("0.0", "end").strip()
placeholder = t("query_editor_placeholder")
if content == placeholder:
self._editor.delete("0.0", "end")
# ── Server / database connection ───────────────────────────────
def set_server(self, alias: str | None):
"""Called when user selects a server in the sidebar."""
self._current_alias = alias
self._disconnect()
self._clear_results()
self._set_status("")
if not alias:
self._db_combo.configure(values=[])
self._db_var.set("")
return
self._set_status(f"Connecting to {alias}...")
threading.Thread(
target=self._connect_and_list_dbs,
args=(alias,),
daemon=True,
).start()
def _connect_and_list_dbs(self, alias: str):
"""Background: create SQLClient, fetch database list."""
try:
server = self.store.get_server(alias)
if not server:
self._schedule(self._set_status, t("query_error"), error=True)
return
client = SQLClient(server)
databases = client.list_databases()
def _update():
if self._current_alias != alias:
return # switched away
self._client = client
self._db_combo.configure(values=databases)
if databases:
self._db_var.set(databases[0])
self._switch_database(databases[0])
self._set_status("OK")
self._schedule(_update)
except Exception as exc:
self._schedule(self._set_status, str(exc), error=True)
def _on_db_selected(self, value: str):
if value:
self._switch_database(value)
def _switch_database(self, db_name: str):
"""Switch active database on the current client."""
if not self._client:
return
try:
self._client.use_database(db_name)
self._set_status(f"Database: {db_name}")
except Exception as exc:
self._set_status(str(exc), error=True)
def _disconnect(self):
if self._client:
try:
self._client.close()
except Exception:
pass
self._client = None
# ── Query execution ────────────────────────────────────────────
def _execute_query(self):
"""Run the SQL query in a background thread."""
if self._executing or not self._client:
return
sql = self._editor.get("0.0", "end").strip()
if not sql or sql == t("query_editor_placeholder"):
return
self._executing = True
self._exec_btn.configure(state="disabled")
self._set_status("Executing...")
threading.Thread(
target=self._run_query,
args=(sql,),
daemon=True,
).start()
def _run_query(self, sql: str):
"""Background thread: execute SQL, measure time, post results."""
start = time.perf_counter()
try:
columns, rows = self._client.execute(sql)
elapsed = time.perf_counter() - start
def _update():
self._columns = columns
self._results = rows
self._populate_tree(columns, rows)
row_count = len(rows)
self._set_status(
t("query_status_rows").format(
rows=row_count, time=f"{elapsed:.3f}"
)
)
self._executing = False
self._exec_btn.configure(state="normal")
self._schedule(_update)
except Exception as exc:
elapsed = time.perf_counter() - start
def _update_err():
self._set_status(
f"{t('query_error')}: {exc}", error=True
)
self._executing = False
self._exec_btn.configure(state="normal")
self._schedule(_update_err)
# ── Treeview population ────────────────────────────────────────
def _populate_tree(self, columns: list[str], rows: list[list]):
"""Clear and populate the Treeview with query results."""
self._tree.delete(*self._tree.get_children())
if not columns:
self._tree["columns"] = ()
return
self._tree["columns"] = columns
for col in columns:
self._tree.heading(col, text=col, anchor="w")
self._tree.column(col, width=120, minwidth=60, anchor="w")
for row in rows:
display = [str(v) if v is not None else "NULL" for v in row]
self._tree.insert("", "end", values=display)
def _clear_results(self):
"""Remove all rows and columns from the Treeview."""
self._tree.delete(*self._tree.get_children())
self._tree["columns"] = ()
self._columns = []
self._results = []
# ── Button actions ─────────────────────────────────────────────
def _clear_all(self):
"""Clear editor content and results."""
self._editor.delete("0.0", "end")
self._clear_results()
self._set_status("")
def _export_csv(self):
"""Export current results to a CSV file via save dialog."""
if not self._columns or not self._results:
return
path = filedialog.asksaveasfilename(
defaultextension=".csv",
filetypes=[("CSV files", "*.csv"), ("All files", "*.*")],
title=t("query_export_csv"),
)
if not path:
return
try:
with open(path, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(self._columns)
for row in self._results:
writer.writerow(
[str(v) if v is not None else "" for v in row]
)
self._set_status(f"Exported {len(self._results)} rows to {path}")
except Exception as exc:
self._set_status(f"{t('query_error')}: {exc}", error=True)
# ── Status bar ─────────────────────────────────────────────────
def _set_status(self, text: str, error: bool = False):
color = "#ef4444" if error else "#9ca3af"
self._status_label.configure(text=text, text_color=color)
# ── Thread-safe scheduling ─────────────────────────────────────
def _schedule(self, func, *args, **kwargs):
"""Schedule a function to run on the main (tkinter) thread."""
self.after(0, lambda: func(*args, **kwargs))

266
gui/tabs/redis_tab.py Normal file
View File

@@ -0,0 +1,266 @@
"""
Redis tab — interactive Redis CLI with DB selector, command history, and output console.
"""
import threading
import customtkinter as ctk
from core.redis_client import RedisClient
from core.i18n import t
class RedisTab(ctk.CTkFrame):
def __init__(self, master, store):
super().__init__(master, fg_color="transparent")
self.store = store
self._current_alias: str | None = None
self._client: RedisClient | None = None
self._command_history: list[str] = []
self._history_index: int = -1
self._build_ui()
def _build_ui(self):
# ── Top bar: DB selector + stats ──
top_frame = ctk.CTkFrame(self, fg_color="transparent")
top_frame.pack(fill="x", padx=15, pady=(15, 5))
# DB selector
db_label = ctk.CTkLabel(top_frame, text=t("redis_db"), anchor="w",
font=ctk.CTkFont(size=12, weight="bold"))
db_label.pack(side="left", padx=(0, 5))
self._db_var = ctk.StringVar(value="0")
self._db_selector = ctk.CTkOptionMenu(
top_frame, values=[str(i) for i in range(16)],
variable=self._db_var, width=70,
command=self._on_db_changed,
)
self._db_selector.pack(side="left", padx=(0, 15))
# Keys count
self._keys_label = ctk.CTkLabel(top_frame, text=t("redis_keys") + ": —",
font=ctk.CTkFont(size=12), text_color="#9ca3af")
self._keys_label.pack(side="left", padx=(0, 15))
# Memory usage
self._memory_label = ctk.CTkLabel(top_frame, text=t("redis_memory") + ": —",
font=ctk.CTkFont(size=12), text_color="#9ca3af")
self._memory_label.pack(side="left")
# ── Command input row ──
cmd_frame = ctk.CTkFrame(self, fg_color="transparent")
cmd_frame.pack(fill="x", padx=15, pady=5)
prompt_label = ctk.CTkLabel(cmd_frame, text="redis>", font=ctk.CTkFont(family="Consolas", size=13),
text_color="#ef4444")
prompt_label.pack(side="left", padx=(0, 5))
self._cmd_entry = ctk.CTkEntry(cmd_frame, placeholder_text=t("redis_command_placeholder"),
font=ctk.CTkFont(family="Consolas", size=13))
self._cmd_entry.pack(side="left", fill="x", expand=True, padx=(0, 10))
self._cmd_entry.bind("<Return>", lambda e: self._execute_command())
self._cmd_entry.bind("<Up>", self._history_up)
self._cmd_entry.bind("<Down>", self._history_down)
# ── Buttons row ──
btn_frame = ctk.CTkFrame(self, fg_color="transparent")
btn_frame.pack(fill="x", padx=15, pady=5)
self._exec_btn = ctk.CTkButton(btn_frame, text=t("redis_execute"), width=90,
command=self._execute_command)
self._exec_btn.pack(side="left", padx=(0, 5))
self._info_btn = ctk.CTkButton(btn_frame, text="INFO", width=70,
fg_color="#6b7280", hover_color="#4b5563",
command=lambda: self._run_quick("INFO"))
self._info_btn.pack(side="left", padx=(0, 5))
self._dbsize_btn = ctk.CTkButton(btn_frame, text="DBSIZE", width=80,
fg_color="#6b7280", hover_color="#4b5563",
command=lambda: self._run_quick("DBSIZE"))
self._dbsize_btn.pack(side="left", padx=(0, 5))
self._scan_btn = ctk.CTkButton(btn_frame, text="SCAN", width=70,
fg_color="#6b7280", hover_color="#4b5563",
command=lambda: self._run_quick("SCAN 0 COUNT 100"))
self._scan_btn.pack(side="left", padx=(0, 5))
self._clear_btn = ctk.CTkButton(btn_frame, text=t("redis_clear"), width=70,
fg_color="#374151", hover_color="#1f2937",
command=self._clear_output)
self._clear_btn.pack(side="right")
# ── Output console ──
self._output = ctk.CTkTextbox(self, font=ctk.CTkFont(family="Consolas", size=12),
state="disabled")
self._output.pack(fill="both", expand=True, padx=15, pady=(5, 5))
# ── Status bar ──
self._status_bar = ctk.CTkLabel(self, text=t("redis_disconnected"), anchor="w",
font=ctk.CTkFont(size=11), text_color="#9ca3af")
self._status_bar.pack(fill="x", padx=15, pady=(0, 10))
# ── Public API ──
def set_server(self, alias: str | None):
"""Called when user selects a server in sidebar."""
self._current_alias = alias
self._client = None
self._command_history.clear()
self._history_index = -1
self._clear_output()
if alias:
self._set_status(t("redis_ready").format(alias=alias), "#22c55e")
self._refresh_stats()
else:
self._set_status(t("redis_disconnected"), "#9ca3af")
self._keys_label.configure(text=t("redis_keys") + ": —")
self._memory_label.configure(text=t("redis_memory") + ": —")
# ── Command execution ──
def _execute_command(self):
cmd = self._cmd_entry.get().strip()
if not cmd:
return
if not self._current_alias:
self._append_output(t("no_server_selected"))
return
# Add to history
if not self._command_history or self._command_history[-1] != cmd:
self._command_history.append(cmd)
self._history_index = -1
self._cmd_entry.delete(0, "end")
self._append_output(f"redis> {cmd}")
self._set_buttons_state("disabled")
db = int(self._db_var.get())
def _do():
try:
client = self._get_client()
result = client.execute(cmd, db=db)
formatted = self._format_result(result)
self.after(0, lambda: self._append_output(formatted))
except Exception as e:
self.after(0, lambda: self._append_output(f"(error) {e}"))
finally:
self.after(0, lambda: self._set_buttons_state("normal"))
self.after(0, self._refresh_stats)
threading.Thread(target=_do, daemon=True).start()
def _run_quick(self, cmd: str):
"""Execute a preset command."""
self._cmd_entry.delete(0, "end")
self._cmd_entry.insert(0, cmd)
self._execute_command()
def _get_client(self) -> RedisClient:
if self._client is None:
self._client = RedisClient(self._current_alias, self.store)
return self._client
# ── Stats refresh ──
def _refresh_stats(self):
if not self._current_alias:
return
def _do():
try:
client = self._get_client()
db = int(self._db_var.get())
keys_count = client.execute("DBSIZE", db=db)
info = client.execute("INFO memory", db=db)
# Parse memory from INFO output
memory = ""
if isinstance(info, str):
for line in info.split("\r\n"):
if line.startswith("used_memory_human:"):
memory = line.split(":")[1].strip()
break
keys_text = str(keys_count) if keys_count is not None else ""
self.after(0, lambda: self._keys_label.configure(
text=t("redis_keys") + f": {keys_text}"))
self.after(0, lambda: self._memory_label.configure(
text=t("redis_memory") + f": {memory}"))
except Exception:
pass
threading.Thread(target=_do, daemon=True).start()
def _on_db_changed(self, _value: str):
self._refresh_stats()
# ── History navigation ──
def _history_up(self, _event):
if not self._command_history:
return "break"
if self._history_index == -1:
self._history_index = len(self._command_history) - 1
elif self._history_index > 0:
self._history_index -= 1
self._cmd_entry.delete(0, "end")
self._cmd_entry.insert(0, self._command_history[self._history_index])
return "break"
def _history_down(self, _event):
if not self._command_history:
return "break"
if self._history_index == -1:
return "break"
if self._history_index < len(self._command_history) - 1:
self._history_index += 1
self._cmd_entry.delete(0, "end")
self._cmd_entry.insert(0, self._command_history[self._history_index])
else:
self._history_index = -1
self._cmd_entry.delete(0, "end")
return "break"
# ── Output helpers ──
def _format_result(self, result) -> str:
"""Format Redis response for display."""
if result is None:
return "(nil)"
if isinstance(result, bytes):
return result.decode("utf-8", errors="replace")
if isinstance(result, int):
return f"(integer) {result}"
if isinstance(result, list):
if not result:
return "(empty list or set)"
lines = []
for i, item in enumerate(result, 1):
val = item.decode("utf-8", errors="replace") if isinstance(item, bytes) else str(item)
lines.append(f"{i}) \"{val}\"")
return "\n".join(lines)
if isinstance(result, str):
return result
return str(result)
def _append_output(self, text: str):
self._output.configure(state="normal")
self._output.insert("end", text + "\n")
self._output.configure(state="disabled")
self._output.see("end")
def _clear_output(self):
self._output.configure(state="normal")
self._output.delete("1.0", "end")
self._output.configure(state="disabled")
def _set_status(self, text: str, color: str = "#9ca3af"):
self._status_bar.configure(text=text, text_color=color)
def _set_buttons_state(self, state: str):
for btn in (self._exec_btn, self._info_btn, self._dbsize_btn, self._scan_btn):
btn.configure(state=state)

View File

@@ -1,5 +1,5 @@
"""
Terminal tab — persistent interactive SSH shell via ShellSession + TerminalWidget.
Terminal tab — persistent interactive SSH/Telnet shell via ShellSession/TelnetSession + TerminalWidget.
"""
import queue
@@ -8,6 +8,7 @@ import threading
import time
import customtkinter as ctk
from core.ssh_client import ShellSession
from core.telnet_client import TelnetSession
from core.i18n import t
# Regex to strip ANSI escape sequences
@@ -20,7 +21,7 @@ class TerminalTab(ctk.CTkFrame):
self.store = store
self.session_pool = session_pool
self._current_alias: str | None = None
self._session: ShellSession | None = None
self._session: ShellSession | TelnetSession | None = None
self._reconnect_count = 0
self._max_reconnect = 5
self._intentional_disconnect = False
@@ -76,16 +77,25 @@ class TerminalTab(ctk.CTkFrame):
return
alias = self._current_alias
server_type = server.get("type", "ssh")
self._terminal.set_status(t("term_connecting").format(alias=alias), "#ccaa00")
self._intentional_disconnect = False
def _do_connect():
try:
key_path = self.store.get_ssh_key_path()
cols, rows = self._terminal.get_size()
# Use session pool if available
if self.session_pool:
cols, rows = self._terminal.get_size()
if server_type == "telnet":
# Telnet — direct session, no pool (pool is SSH-specific)
self.after(0, self._terminal.reset)
session = TelnetSession(server, cols=cols, rows=rows)
session.on_data = self._on_data_received
session.on_disconnect = self._on_disconnected
session.connect()
self._session = session
elif self.session_pool:
# SSH with session pool
session, is_new = self.session_pool.get_or_create_shell_session(alias, server, key_path)
if is_new:
# New session — reset terminal for clean start
@@ -108,9 +118,8 @@ class TerminalTab(ctk.CTkFrame):
session.on_disconnect = self._on_disconnected
self._session = session
else:
# Legacy behavior without session pool
# SSH without pool (legacy)
self.after(0, self._terminal.reset)
cols, rows = self._terminal.get_size()
session = ShellSession(server, key_path, cols=cols, rows=rows)
session.on_data = self._on_data_received
session.on_disconnect = self._on_disconnected
@@ -136,12 +145,18 @@ class TerminalTab(ctk.CTkFrame):
def _disconnect(self):
self._intentional_disconnect = True
# Only disconnect if we don't have a session pool (otherwise session stays alive)
if not self.session_pool and self._session:
if not self._session:
return
# Telnet sessions are never pooled — always disconnect directly
if isinstance(self._session, TelnetSession):
self._session.disconnect()
self._session = None
# If using session pool, session remains active in the pool
elif self.session_pool and self._session:
# SSH without session pool — disconnect directly
elif not self.session_pool:
self._session.disconnect()
self._session = None
# SSH with session pool — session remains active in the pool
else:
# Remove callbacks to prevent processing data after switch
self._session.on_data = None
self._session.on_disconnect = None