Files
server-manager/tools/ssh.py
chrome-storm-c442 eede67e6a9 feat: multi-type server support — SQL, Redis, Grafana, Prometheus, Telnet, WinRM, RDP/VNC
Full implementation of multi-type server management across GUI and CLI:

New clients: SQLClient (MariaDB/MSSQL/PostgreSQL), RedisClient, GrafanaClient,
PrometheusClient, TelnetSession, WinRMClient, RemoteDesktopLauncher.

New GUI tabs: QueryTab (SQL editor + Treeview), RedisTab (console + history),
GrafanaTab (dashboards + alerts), PrometheusTab (PromQL + targets),
PowershellTab (PS/CMD), LaunchTab (RDP/VNC external client).

Infrastructure: TAB_REGISTRY for conditional tabs per server type,
adaptive server_dialog fields, colored type badges in sidebar,
status checker for all types (SSH/TCP/SQL/Redis/HTTP), 100+ i18n keys.

CLI: ssh.py extended with --sql, --redis, --grafana-*, --prom-*, --ps, --cmd.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 09:35:24 -05:00

1038 lines
37 KiB
Python

#!/usr/bin/env python3
"""
SSH utility for Claude Code — connects to servers by alias.
Credentials stored locally in servers.json (encrypted), NEVER exposed to AI API.
Usage (SSH):
python ssh.py ALIAS "command" # run as configured user (auto-sudo if needed)
python ssh.py ALIAS --no-sudo "command" # run without sudo elevation
python ssh.py ALIAS --upload LOCAL REMOTE
python ssh.py ALIAS --download REMOTE LOCAL
python ssh.py ALIAS --install-key
python ssh.py ALIAS --ping
python ssh.py --list
python ssh.py --status
python ssh.py --info ALIAS # full info (no passwords)
python ssh.py --set-note ALIAS "desc" # update server notes
python ssh.py --add ALIAS IP PORT USER PASSWORD [--note "desc"]
python ssh.py --remove ALIAS
SQL (type: mariadb / mssql / postgresql):
python ssh.py --sql ALIAS "SELECT * FROM users" # execute SQL query
python ssh.py --sql-databases ALIAS # list databases
python ssh.py --sql-tables ALIAS [database] # list tables
Redis (type: redis):
python ssh.py --redis ALIAS "GET mykey" # execute Redis command
python ssh.py --redis-info ALIAS # Redis INFO
python ssh.py --redis-keys ALIAS "user:*" # SCAN keys by pattern
Grafana (type: grafana):
python ssh.py --grafana-dashboards ALIAS # list dashboards
python ssh.py --grafana-alerts ALIAS # list alerts
Prometheus (type: prometheus):
python ssh.py --prom-query ALIAS "up" # execute PromQL query
python ssh.py --prom-targets ALIAS # list targets
python ssh.py --prom-alerts ALIAS # list alerts
WinRM (type: winrm):
python ssh.py --ps ALIAS "Get-Process" # PowerShell via WinRM
python ssh.py --cmd ALIAS "dir" # CMD via WinRM
"""
import sys
import os
import json
import time
import paramiko
# Shared config — same file used by ServerManager GUI
SHARED_DIR = os.path.expanduser("~/.server-connections")
SETTINGS_FILE = os.path.join(SHARED_DIR, "settings.json")
DEFAULT_SERVERS_FILE = os.path.join(SHARED_DIR, "servers.json")
SSH_KEY_PATH = os.path.expanduser("~/.ssh/id_ed25519")
SSH_CONFIG_PATH = os.path.expanduser("~/.ssh/config")
# Encryption support — encryption.py is copied to SHARED_DIR by GUI setup
if SHARED_DIR not in sys.path:
sys.path.insert(0, SHARED_DIR)
try:
from encryption import decrypt, encrypt, is_encrypted
HAS_ENCRYPTION = True
except ImportError:
HAS_ENCRYPTION = False
def _get_servers_file() -> str:
"""Get servers file path from settings.json or use default."""
if os.path.exists(SETTINGS_FILE):
try:
with open(SETTINGS_FILE, "r", encoding="utf-8") as f:
settings = json.load(f)
path = settings.get("servers_path", "")
if path and os.path.exists(path):
return path
except Exception:
pass
return DEFAULT_SERVERS_FILE
# ── Data ──────────────────────────────────────────────
def load_servers():
servers_file = _get_servers_file()
with open(servers_file, "rb") as f:
raw = f.read()
if HAS_ENCRYPTION and is_encrypted(raw):
text = decrypt(raw)
data = json.loads(text)
else:
data = json.loads(raw.decode("utf-8"))
return data, {s["alias"]: s for s in data.get("servers", [])}
def save_servers(data):
servers_file = _get_servers_file()
text = json.dumps(data, indent=2, ensure_ascii=False)
if HAS_ENCRYPTION:
encrypted = encrypt(text)
with open(servers_file, "wb") as f:
f.write(encrypted)
else:
with open(servers_file, "w", encoding="utf-8") as f:
f.write(text)
# ── Connection ────────────────────────────────────────
def get_client(server: dict) -> paramiko.SSHClient:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
kwargs = {
"hostname": server["ip"],
"port": server.get("port", 22),
"username": server.get("user", "root"),
"timeout": 15,
"banner_timeout": 15,
}
# Try key first
if os.path.exists(SSH_KEY_PATH):
try:
kwargs["key_filename"] = SSH_KEY_PATH
client.connect(**kwargs)
return client
except Exception:
del kwargs["key_filename"]
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# Fallback to password
password = server.get("password", "")
if password:
kwargs["password"] = password
kwargs["look_for_keys"] = False
kwargs["allow_agent"] = False
client.connect(**kwargs)
return client
raise Exception(f"No auth method for {server['alias']}")
# ── Command execution ─────────────────────────────────
def run_command(server: dict, command: str, use_sudo: bool = True) -> tuple:
"""Execute command. If user != root and use_sudo=True, auto-elevates via sudo.
Password is fed through stdin (not visible in process list)."""
client = get_client(server)
try:
user = server.get("user", "root")
need_sudo = use_sudo and user != "root"
if need_sudo:
# Use sudo -S to read password from stdin
# -p '' suppresses the password prompt text
full_cmd = f"sudo -S -p '' bash -c {_shell_quote(command)}"
else:
full_cmd = command
stdin, stdout, stderr = client.exec_command(full_cmd, timeout=120)
if need_sudo:
password = server.get("password", "")
stdin.write(password + "\n")
stdin.flush()
exit_code = stdout.channel.recv_exit_status()
out = stdout.read().decode("utf-8", errors="replace")
err = stderr.read().decode("utf-8", errors="replace")
# Strip sudo noise from stderr
err_lines = [l for l in err.splitlines()
if not l.startswith("[sudo]") and "password for" not in l.lower()]
err = "\n".join(err_lines).strip()
return out, err, exit_code
finally:
client.close()
def _shell_quote(s: str) -> str:
"""Safely quote a string for bash -c."""
return "'" + s.replace("'", "'\\''") + "'"
# ── File transfer ─────────────────────────────────────
def _normalize_remote_path(remote_path: str) -> str:
"""Normalize remote path by detecting and fixing MSYS path conversions."""
# If the path looks like a Windows path that was converted by MSYS, fix it back
if ':' in remote_path and ('Program Files/Git' in remote_path or (len(remote_path) > 3 and remote_path[1] == ':' and remote_path[2] == '/')):
# Convert C:/Program Files/Git/tmp/file.txt back to /tmp/file.txt
# Find the position where Git path starts
if 'Program Files/Git' in remote_path:
git_pos = remote_path.find('Program Files/Git')
if git_pos != -1:
# Extract the part after Program Files/Git
actual_path = remote_path[git_pos + len('Program Files/Git'):]
return actual_path
# If it's just a drive letter followed by :, convert it too
if len(remote_path) > 3 and remote_path[1] == ':' and remote_path[2] == '/':
# This is a Windows-style path like C:/something
# Try to determine if it's supposed to be a Unix path
potential_unix_path = remote_path[3:] # Remove drive prefix like "C:"
# If the resulting path starts with a common Unix directory, assume it should be Unix path
common_unix_prefixes = ['/tmp/', '/home/', '/etc/', '/var/', '/usr/', '/opt/', '/root/', '/bin/', '/sbin/', '/lib/', '/lib64/']
for prefix in common_unix_prefixes:
if potential_unix_path.startswith(prefix):
return potential_unix_path
return remote_path
def _fmt_size(nbytes: int) -> str:
"""Format byte count for human display."""
if nbytes < 1024:
return f"{nbytes} B"
elif nbytes < 1024 * 1024:
return f"{nbytes / 1024:.1f} KB"
elif nbytes < 1024 * 1024 * 1024:
return f"{nbytes / (1024 * 1024):.1f} MB"
else:
return f"{nbytes / (1024 * 1024 * 1024):.2f} GB"
def _progress_cb(total_bytes: int):
"""Return a Paramiko-compatible progress callback.
For files >= 1 MB, prints at 25%, 50%, 75% milestones.
For files < 1 MB, stays silent."""
threshold = 1024 * 1024 # 1 MB
reported = set()
def callback(transferred: int, total: int):
if total < threshold:
return
pct = int(transferred * 100 / total)
for milestone in (25, 50, 75):
if pct >= milestone and milestone not in reported:
reported.add(milestone)
print(f"{milestone}% ({_fmt_size(transferred)}/{_fmt_size(total)})")
return callback
def upload_file(server: dict, local_path: str, remote_path: str):
normalized_remote_path = _normalize_remote_path(remote_path)
file_size = os.path.getsize(local_path)
client = get_client(server)
try:
sftp = client.open_sftp()
t0 = time.time()
sftp.put(local_path, normalized_remote_path, callback=_progress_cb(file_size))
elapsed = time.time() - t0
sftp.chmod(normalized_remote_path, 0o664)
sftp.close()
info = f"{_fmt_size(file_size)}, {elapsed:.1f}s"
if file_size >= 1024 * 1024 and elapsed > 0:
speed = file_size / elapsed
info += f", {_fmt_size(int(speed))}/s"
print(f"OK: {local_path} -> {server['alias']}:{normalized_remote_path} ({info})")
finally:
client.close()
def download_file(server: dict, remote_path: str, local_path: str):
normalized_remote_path = _normalize_remote_path(remote_path)
client = get_client(server)
try:
sftp = client.open_sftp()
file_size = sftp.stat(normalized_remote_path).st_size
t0 = time.time()
sftp.get(normalized_remote_path, local_path, callback=_progress_cb(file_size))
elapsed = time.time() - t0
sftp.close()
info = f"{_fmt_size(file_size)}, {elapsed:.1f}s"
if file_size >= 1024 * 1024 and elapsed > 0:
speed = file_size / elapsed
info += f", {_fmt_size(int(speed))}/s"
print(f"OK: {server['alias']}:{normalized_remote_path} -> {local_path} ({info})")
finally:
client.close()
def install_key(server: dict):
pub_key_path = SSH_KEY_PATH + ".pub"
if not os.path.exists(pub_key_path):
print(f"ERROR: No public key at {pub_key_path}")
sys.exit(1)
with open(pub_key_path, "r") as f:
pub_key = f.read().strip()
check_cmd = f'grep -c "{pub_key}" ~/.ssh/authorized_keys 2>/dev/null || echo 0'
out, _, _ = run_command(server, check_cmd, use_sudo=False)
if out.strip() != "0":
print(f"Key already installed on {server['alias']}")
return
command = (
f'mkdir -p ~/.ssh && chmod 700 ~/.ssh && '
f'echo "{pub_key}" >> ~/.ssh/authorized_keys && '
f'chmod 600 ~/.ssh/authorized_keys && '
f'echo "KEY_OK"'
)
out, err, code = run_command(server, command, use_sudo=False)
if "KEY_OK" in out:
print(f"SSH key installed on {server['alias']}")
else:
print(f"ERROR: {err or out}")
sys.exit(1)
# ── Server management ─────────────────────────────────
def ping_server(server: dict):
try:
client = get_client(server)
client.close()
print(f"{server['alias']}: ONLINE")
except Exception as e:
print(f"{server['alias']}: OFFLINE ({type(e).__name__})")
def list_servers(full=False):
_, servers = load_servers()
if full:
# WARNING: full mode shows sensitive data (IP, port, user)
# Only for local/manual use, NEVER through AI API
print("WARNING: Full mode — contains sensitive data. Do NOT pipe to AI.")
print(f"{'Alias':<20} {'IP':<20} {'Port':<8} {'User':<10} {'Key':<6}")
print("-" * 64)
for alias, s in servers.items():
has_key = "yes" if os.path.exists(SSH_KEY_PATH) else "no"
print(f"{alias:<20} {s['ip']:<20} {s.get('port', 22):<8} {s.get('user', 'root'):<10} {has_key:<6}")
else:
# Safe mode: only aliases (no IPs, ports, users)
print(f"{'Alias':<20} {'Type':<10} {'Key':<6} {'Notes'}")
print("-" * 70)
for alias, s in servers.items():
has_key = "yes" if os.path.exists(SSH_KEY_PATH) else "no"
stype = s.get("type", "ssh")
notes = s.get("notes", "")
print(f"{alias:<20} {stype:<10} {has_key:<6} {notes}")
def _resolve_alias(alias: str, servers: dict) -> str:
"""Resolve alias — exact match, then whole-word search, then substring fallback."""
if alias in servers:
return alias
query = alias.lower()
# 1) Whole-word match (e.g. "tor" matches "API TOR contabo" but NOT "investor")
import re
word_re = re.compile(r'\b' + re.escape(query) + r'\b', re.IGNORECASE)
word_matches = [a for a in servers if word_re.search(a)]
if len(word_matches) == 1:
return word_matches[0]
if len(word_matches) > 1:
print(f"Ambiguous: '{alias}' matches multiple servers:")
for m in word_matches:
print(f" - {m}")
sys.exit(1)
# 2) Substring fallback (e.g. "cont" matches "contabo")
sub_matches = [a for a in servers if query in a.lower()]
if len(sub_matches) == 1:
return sub_matches[0]
if len(sub_matches) > 1:
print(f"Ambiguous: '{alias}' matches multiple servers:")
for m in sub_matches:
print(f" - {m}")
sys.exit(1)
print(f"Unknown: '{alias}'. Available: {', '.join(servers.keys())}")
sys.exit(1)
def server_info(alias: str):
"""Show server info safe for AI context — NO ip, user, password, port, totp_secret."""
_, servers = load_servers()
alias = _resolve_alias(alias, servers)
s = servers[alias]
has_key = "yes" if os.path.exists(SSH_KEY_PATH) else "no"
print(f"Alias: {s['alias']}")
print(f"Type: {s.get('type', 'ssh')}")
print(f"Key: {has_key}")
print(f"Auth: {s.get('auth', 'password')}")
print(f"2FA: {'yes' if s.get('totp_secret') else 'no'}")
notes = s.get("notes", "")
if notes:
print(f"Notes: {notes}")
def check_status():
_, servers = load_servers()
print(f"{'Alias':<20} {'Status':<10}")
print("-" * 30)
for alias, s in servers.items():
try:
client = get_client(s)
client.close()
status = "ONLINE"
except Exception:
status = "OFFLINE"
print(f"{alias:<20} {status:<10}")
def add_server(args):
if len(args) < 5:
print("Usage: --add ALIAS IP PORT USER PASSWORD [--note \"desc\"]")
sys.exit(1)
alias, ip, port, user, password = args[0], args[1], int(args[2]), args[3], args[4]
note = ""
if "--note" in args:
idx = args.index("--note")
if idx + 1 < len(args):
note = args[idx + 1]
data, servers = load_servers()
if alias in servers:
print(f"ERROR: '{alias}' already exists")
sys.exit(1)
new_server = {
"alias": alias, "ip": ip, "port": port,
"user": user, "auth": "ssh-key", "password": password,
"notes": note
}
data["servers"].append(new_server)
save_servers(data)
update_ssh_config(alias, ip, port, user)
print(f"Added: {alias}")
try:
install_key(new_server)
except Exception as e:
print(f"Warning: key not installed ({e}). Run: ssh.py {alias} --install-key")
def set_note(alias: str, note: str):
"""Update server notes — safe for AI (no credentials exposed)."""
data, servers = load_servers()
alias = _resolve_alias(alias, servers)
for s in data["servers"]:
if s["alias"] == alias:
s["notes"] = note
break
save_servers(data)
print(f"OK: notes updated for {alias}")
def remove_server(alias: str):
data, servers = load_servers()
alias = _resolve_alias(alias, servers)
data["servers"] = [s for s in data["servers"] if s["alias"] != alias]
save_servers(data)
remove_from_ssh_config(alias)
print(f"Removed: {alias}")
# ── SSH config ────────────────────────────────────────
def update_ssh_config(alias, ip, port, user):
if not os.path.exists(SSH_CONFIG_PATH):
return
with open(SSH_CONFIG_PATH, "r") as f:
content = f.read()
if f"Host {alias}\n" in content:
return
with open(SSH_CONFIG_PATH, "a") as f:
f.write(f"\nHost {alias}\n HostName {ip}\n User {user}\n Port {port}\n")
def remove_from_ssh_config(alias):
if not os.path.exists(SSH_CONFIG_PATH):
return
with open(SSH_CONFIG_PATH, "r") as f:
lines = f.readlines()
new_lines, skip = [], False
for line in lines:
if line.strip() == f"Host {alias}":
skip = True
continue
if skip and line.startswith(" "):
continue
skip = False
new_lines.append(line)
with open(SSH_CONFIG_PATH, "w") as f:
f.writelines(new_lines)
# ── SQL commands ──────────────────────────────────────
def _print_table(headers: list, rows: list):
"""Print a formatted ASCII table."""
if not rows:
print("(no rows)")
return
widths = [len(str(h)) for h in headers]
for row in rows:
for i, val in enumerate(row):
widths[i] = max(widths[i], len(str(val)))
fmt = " ".join(f"{{:<{w}}}" for w in widths)
print(fmt.format(*headers))
print(" ".join("-" * w for w in widths))
for row in rows:
print(fmt.format(*[str(v) for v in row]))
def run_sql(server: dict, query: str):
"""Execute SQL query against mariadb/mssql/postgresql server."""
stype = server.get("type", "mariadb")
host = server["ip"]
port = server.get("port", 3306)
user = server.get("user", "root")
password = server.get("password", "")
database = server.get("database", "")
if stype in ("mariadb", "mysql"):
import pymysql
conn = pymysql.connect(host=host, port=port, user=user, password=password,
database=database or None, connect_timeout=15,
charset="utf8mb4", cursorclass=pymysql.cursors.Cursor)
elif stype == "mssql":
import pymssql
conn = pymssql.connect(server=host, port=port, user=user, password=password,
database=database or None, login_timeout=15)
elif stype == "postgresql":
import psycopg2
port = server.get("port", 5432)
conn = psycopg2.connect(host=host, port=port, user=user, password=password,
dbname=database or None, connect_timeout=15)
else:
print(f"ERROR: Unsupported SQL type '{stype}'. Use mariadb, mssql, or postgresql.")
sys.exit(1)
try:
cur = conn.cursor()
cur.execute(query)
if cur.description:
headers = [desc[0] for desc in cur.description]
rows = cur.fetchall()
_print_table(headers, rows)
print(f"\n({len(rows)} row{'s' if len(rows) != 1 else ''})")
else:
conn.commit()
affected = cur.rowcount
print(f"OK: {affected} row{'s' if affected != 1 else ''} affected")
cur.close()
finally:
conn.close()
def sql_databases(server: dict):
"""List databases on SQL server."""
stype = server.get("type", "mariadb")
if stype in ("mariadb", "mysql"):
run_sql(server, "SHOW DATABASES")
elif stype == "mssql":
run_sql(server, "SELECT name FROM sys.databases ORDER BY name")
elif stype == "postgresql":
run_sql(server, "SELECT datname AS database FROM pg_database WHERE datistemplate = false ORDER BY datname")
else:
print(f"ERROR: Unsupported SQL type '{stype}'.")
sys.exit(1)
def sql_tables(server: dict, database: str = None):
"""List tables on SQL server, optionally for a specific database."""
stype = server.get("type", "mariadb")
if database:
server = dict(server)
server["database"] = database
if stype in ("mariadb", "mysql"):
if database:
run_sql(server, f"SHOW TABLES FROM `{database}`")
else:
run_sql(server, "SHOW TABLES")
elif stype == "mssql":
run_sql(server, "SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE FROM INFORMATION_SCHEMA.TABLES ORDER BY TABLE_SCHEMA, TABLE_NAME")
elif stype == "postgresql":
run_sql(server, "SELECT schemaname, tablename FROM pg_tables WHERE schemaname NOT IN ('pg_catalog', 'information_schema') ORDER BY schemaname, tablename")
else:
print(f"ERROR: Unsupported SQL type '{stype}'.")
sys.exit(1)
# ── Redis commands ────────────────────────────────────
def run_redis_cmd(server: dict, command: str):
"""Execute a Redis command."""
import redis as redis_lib
host = server["ip"]
port = server.get("port", 6379)
password = server.get("password", "") or None
db_index = server.get("db_index", 0)
ssl_enabled = server.get("ssl", False)
r = redis_lib.Redis(host=host, port=port, password=password, db=db_index,
decode_responses=True, socket_timeout=10, ssl=ssl_enabled)
try:
parts = command.split()
if not parts:
print("ERROR: Empty Redis command")
sys.exit(1)
result = r.execute_command(*parts)
if isinstance(result, list):
for i, item in enumerate(result):
print(f"{i + 1}) {item}")
print(f"\n({len(result)} items)")
elif isinstance(result, dict):
for k, v in result.items():
print(f"{k}: {v}")
elif isinstance(result, bytes):
print(result.decode("utf-8", errors="replace"))
else:
print(result)
finally:
r.close()
def redis_info(server: dict):
"""Show Redis INFO."""
import redis as redis_lib
host = server["ip"]
port = server.get("port", 6379)
password = server.get("password", "") or None
db_index = server.get("db_index", 0)
ssl_enabled = server.get("ssl", False)
r = redis_lib.Redis(host=host, port=port, password=password, db=db_index,
decode_responses=True, socket_timeout=10, ssl=ssl_enabled)
try:
info = r.info()
# Print key sections
sections = ["redis_version", "redis_mode", "os", "uptime_in_seconds",
"connected_clients", "used_memory_human", "used_memory_peak_human",
"total_connections_received", "total_commands_processed",
"keyspace_hits", "keyspace_misses", "role"]
print(f"{'Key':<35} {'Value'}")
print("-" * 60)
for key in sections:
if key in info:
print(f"{key:<35} {info[key]}")
# Print keyspace info (db0, db1, etc.)
for key in sorted(info.keys()):
if key.startswith("db"):
print(f"{key:<35} {info[key]}")
finally:
r.close()
def redis_keys(server: dict, pattern: str):
"""SCAN keys matching a pattern."""
import redis as redis_lib
host = server["ip"]
port = server.get("port", 6379)
password = server.get("password", "") or None
db_index = server.get("db_index", 0)
ssl_enabled = server.get("ssl", False)
r = redis_lib.Redis(host=host, port=port, password=password, db=db_index,
decode_responses=True, socket_timeout=10, ssl=ssl_enabled)
try:
keys = []
cursor = 0
while True:
cursor, batch = r.scan(cursor=cursor, match=pattern, count=200)
keys.extend(batch)
if cursor == 0:
break
if len(keys) >= 1000:
print("(truncated at 1000 keys)")
break
keys.sort()
for k in keys:
print(k)
print(f"\n({len(keys)} key{'s' if len(keys) != 1 else ''})")
finally:
r.close()
# ── Grafana commands ──────────────────────────────────
def _grafana_request(server: dict, endpoint: str) -> dict:
"""Make an authenticated GET request to Grafana API."""
import requests
host = server["ip"]
port = server.get("port", 3000)
protocol = "https" if server.get("ssl", False) else "http"
base_url = server.get("base_url", f"{protocol}://{host}:{port}")
api_key = server.get("api_key", server.get("password", ""))
headers = {}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
url = f"{base_url.rstrip('/')}/api/{endpoint.lstrip('/')}"
resp = requests.get(url, headers=headers, timeout=15, verify=server.get("ssl_verify", True))
resp.raise_for_status()
return resp.json()
def grafana_dashboards(server: dict):
"""List Grafana dashboards."""
data = _grafana_request(server, "search?type=dash-db")
if not data:
print("(no dashboards found)")
return
headers = ["UID", "Title", "Folder", "URL"]
rows = []
for d in data:
rows.append([
d.get("uid", ""),
d.get("title", ""),
d.get("folderTitle", "(root)"),
d.get("url", ""),
])
_print_table(headers, rows)
print(f"\n({len(rows)} dashboard{'s' if len(rows) != 1 else ''})")
def grafana_alerts(server: dict):
"""List Grafana alert rules."""
data = _grafana_request(server, "alertmanager/grafana/api/v2/alerts")
if not data:
print("(no alerts)")
return
headers = ["Status", "Name", "Severity", "Summary"]
rows = []
for alert in data:
status = alert.get("status", {}).get("state", "unknown")
labels = alert.get("labels", {})
annotations = alert.get("annotations", {})
rows.append([
status,
labels.get("alertname", ""),
labels.get("severity", ""),
annotations.get("summary", "")[:80],
])
_print_table(headers, rows)
print(f"\n({len(rows)} alert{'s' if len(rows) != 1 else ''})")
# ── Prometheus commands ───────────────────────────────
def _prom_request(server: dict, endpoint: str, params: dict = None) -> dict:
"""Make a GET request to Prometheus API."""
import requests
host = server["ip"]
port = server.get("port", 9090)
protocol = "https" if server.get("ssl", False) else "http"
base_url = server.get("base_url", f"{protocol}://{host}:{port}")
auth = None
user = server.get("user", "")
password = server.get("password", "")
if user and password:
auth = (user, password)
url = f"{base_url.rstrip('/')}/api/v1/{endpoint.lstrip('/')}"
resp = requests.get(url, params=params, auth=auth, timeout=15,
verify=server.get("ssl_verify", True))
resp.raise_for_status()
return resp.json()
def prom_query(server: dict, query: str):
"""Execute a PromQL instant query."""
data = _prom_request(server, "query", {"query": query})
status = data.get("status", "")
if status != "success":
print(f"ERROR: Prometheus returned status '{status}'")
if "error" in data:
print(f" {data['error']}")
sys.exit(1)
result = data.get("data", {})
result_type = result.get("resultType", "")
results = result.get("result", [])
if not results:
print("(no results)")
return
if result_type == "vector":
headers = ["Metric", "Value", "Timestamp"]
rows = []
for r in results:
metric = r.get("metric", {})
label_str = ", ".join(f'{k}="{v}"' for k, v in metric.items())
ts, val = r.get("value", [0, ""])
rows.append([label_str or "{}", val, ts])
_print_table(headers, rows)
elif result_type == "scalar":
ts, val = results
print(f"Scalar: {val} (at {ts})")
elif result_type == "string":
ts, val = results
print(f"String: {val} (at {ts})")
elif result_type == "matrix":
for series in results:
metric = series.get("metric", {})
label_str = ", ".join(f'{k}="{v}"' for k, v in metric.items())
print(f"\n--- {label_str or '{}'} ---")
values = series.get("values", [])
for ts, val in values[-20:]: # last 20 samples
print(f" [{ts}] {val}")
if len(values) > 20:
print(f" ... ({len(values)} total samples, showing last 20)")
print(f"\n({len(results)} result{'s' if len(results) != 1 else ''}, type: {result_type})")
def prom_targets(server: dict):
"""List Prometheus scrape targets."""
data = _prom_request(server, "targets")
active = data.get("data", {}).get("activeTargets", [])
if not active:
print("(no active targets)")
return
headers = ["Job", "Instance", "State", "Health", "Last Scrape"]
rows = []
for t in active:
labels = t.get("labels", {})
rows.append([
labels.get("job", ""),
labels.get("instance", ""),
t.get("scrapePool", ""),
t.get("health", ""),
t.get("lastScrape", "")[:19],
])
_print_table(headers, rows)
print(f"\n({len(rows)} target{'s' if len(rows) != 1 else ''})")
def prom_alerts(server: dict):
"""List Prometheus alerts."""
data = _prom_request(server, "alerts")
alerts = data.get("data", {}).get("alerts", [])
if not alerts:
print("(no alerts)")
return
headers = ["State", "Name", "Severity", "Active Since"]
rows = []
for a in alerts:
labels = a.get("labels", {})
rows.append([
a.get("state", ""),
labels.get("alertname", ""),
labels.get("severity", ""),
a.get("activeAt", "")[:19],
])
_print_table(headers, rows)
print(f"\n({len(rows)} alert{'s' if len(rows) != 1 else ''})")
# ── WinRM commands ────────────────────────────────────
def _get_winrm_session(server: dict):
"""Create a WinRM session."""
import winrm
host = server["ip"]
port = server.get("port", 5985)
user = server.get("user", "Administrator")
password = server.get("password", "")
protocol = "https" if server.get("ssl", False) or port == 5986 else "http"
transport = server.get("transport", "ntlm")
endpoint = f"{protocol}://{host}:{port}/wsman"
session = winrm.Session(endpoint, auth=(user, password), transport=transport,
server_cert_validation="ignore" if protocol == "https" else "validate")
return session
def run_winrm_ps(server: dict, command: str):
"""Execute PowerShell command via WinRM."""
session = _get_winrm_session(server)
result = session.run_ps(command)
out = result.std_out.decode("utf-8", errors="replace").strip()
err = result.std_err.decode("utf-8", errors="replace").strip()
if out:
print(out)
if err:
print(err, file=sys.stderr)
sys.exit(result.status_code)
def run_winrm_cmd(server: dict, command: str):
"""Execute CMD command via WinRM."""
session = _get_winrm_session(server)
result = session.run_cmd(command)
out = result.std_out.decode("utf-8", errors="replace").strip()
err = result.std_err.decode("utf-8", errors="replace").strip()
if out:
print(out)
if err:
print(err, file=sys.stderr)
sys.exit(result.status_code)
# ── Main ──────────────────────────────────────────────
def main():
if len(sys.argv) < 2:
print(__doc__)
sys.exit(1)
cmd = sys.argv[1]
if cmd == "--list":
list_servers(); sys.exit(0)
if cmd == "--list-full":
list_servers(full=True); sys.exit(0)
if cmd == "--status":
check_status(); sys.exit(0)
if cmd == "--info" and len(sys.argv) >= 3:
server_info(sys.argv[2]); sys.exit(0)
if cmd == "--set-note" and len(sys.argv) >= 4:
set_note(sys.argv[2], sys.argv[3]); sys.exit(0)
if cmd == "--add":
add_server(sys.argv[2:]); sys.exit(0)
if cmd == "--remove" and len(sys.argv) >= 3:
remove_server(sys.argv[2]); sys.exit(0)
# ── SQL commands (global-style: --sql ALIAS ...) ──
if cmd == "--sql" and len(sys.argv) >= 4:
_, servers = load_servers()
alias = _resolve_alias(sys.argv[2], servers)
run_sql(servers[alias], sys.argv[3])
sys.exit(0)
if cmd == "--sql-databases" and len(sys.argv) >= 3:
_, servers = load_servers()
alias = _resolve_alias(sys.argv[2], servers)
sql_databases(servers[alias])
sys.exit(0)
if cmd == "--sql-tables" and len(sys.argv) >= 3:
_, servers = load_servers()
alias = _resolve_alias(sys.argv[2], servers)
db = sys.argv[3] if len(sys.argv) >= 4 else None
sql_tables(servers[alias], db)
sys.exit(0)
# ── Redis commands ──
if cmd == "--redis" and len(sys.argv) >= 4:
_, servers = load_servers()
alias = _resolve_alias(sys.argv[2], servers)
run_redis_cmd(servers[alias], sys.argv[3])
sys.exit(0)
if cmd == "--redis-info" and len(sys.argv) >= 3:
_, servers = load_servers()
alias = _resolve_alias(sys.argv[2], servers)
redis_info(servers[alias])
sys.exit(0)
if cmd == "--redis-keys" and len(sys.argv) >= 4:
_, servers = load_servers()
alias = _resolve_alias(sys.argv[2], servers)
redis_keys(servers[alias], sys.argv[3])
sys.exit(0)
# ── Grafana commands ──
if cmd == "--grafana-dashboards" and len(sys.argv) >= 3:
_, servers = load_servers()
alias = _resolve_alias(sys.argv[2], servers)
grafana_dashboards(servers[alias])
sys.exit(0)
if cmd == "--grafana-alerts" and len(sys.argv) >= 3:
_, servers = load_servers()
alias = _resolve_alias(sys.argv[2], servers)
grafana_alerts(servers[alias])
sys.exit(0)
# ── Prometheus commands ──
if cmd == "--prom-query" and len(sys.argv) >= 4:
_, servers = load_servers()
alias = _resolve_alias(sys.argv[2], servers)
prom_query(servers[alias], sys.argv[3])
sys.exit(0)
if cmd == "--prom-targets" and len(sys.argv) >= 3:
_, servers = load_servers()
alias = _resolve_alias(sys.argv[2], servers)
prom_targets(servers[alias])
sys.exit(0)
if cmd == "--prom-alerts" and len(sys.argv) >= 3:
_, servers = load_servers()
alias = _resolve_alias(sys.argv[2], servers)
prom_alerts(servers[alias])
sys.exit(0)
# ── WinRM commands ──
if cmd == "--ps" and len(sys.argv) >= 4:
_, servers = load_servers()
alias = _resolve_alias(sys.argv[2], servers)
run_winrm_ps(servers[alias], sys.argv[3])
if cmd == "--cmd" and len(sys.argv) >= 4:
_, servers = load_servers()
alias = _resolve_alias(sys.argv[2], servers)
run_winrm_cmd(servers[alias], sys.argv[3])
# Server commands — exact match first, then fuzzy search by keyword
alias = cmd
_, servers = load_servers()
alias = _resolve_alias(alias, servers)
server = servers[alias]
if len(sys.argv) < 3:
print(f"Usage: ssh.py {alias} <command>")
sys.exit(1)
action = sys.argv[2]
if action == "--install-key":
install_key(server)
elif action == "--ping":
ping_server(server)
elif action == "--upload" and len(sys.argv) >= 5:
upload_file(server, sys.argv[3], sys.argv[4])
elif action == "--download" and len(sys.argv) >= 5:
download_file(server, sys.argv[3], sys.argv[4])
elif action == "--no-sudo":
command = " ".join(sys.argv[3:])
out, err, code = run_command(server, command, use_sudo=False)
if out: print(out, end="")
if err: print(err, end="", file=sys.stderr)
sys.exit(code)
else:
command = " ".join(sys.argv[2:])
out, err, code = run_command(server, command)
if out: print(out, end="")
if err: print(err, end="", file=sys.stderr)
sys.exit(code)
if __name__ == "__main__":
try:
main()
except SystemExit:
raise
except Exception as e:
print(f"ERROR: {type(e).__name__}: {e}", file=sys.stderr)
sys.exit(1)