#!/usr/bin/env python3 """ SSH utility for Claude Code — connects to servers by alias. Credentials stored locally in servers.json (encrypted), NEVER exposed to AI API. Usage (SSH): python ssh.py ALIAS "command" # run as configured user (auto-sudo if needed) python ssh.py ALIAS --no-sudo "command" # run without sudo elevation python ssh.py ALIAS --upload LOCAL REMOTE python ssh.py ALIAS --download REMOTE LOCAL python ssh.py ALIAS --install-key python ssh.py ALIAS --ping python ssh.py --list python ssh.py --status python ssh.py --info ALIAS # full info (no passwords) python ssh.py --set-note ALIAS "desc" # update server notes python ssh.py --add ALIAS IP PORT USER PASSWORD [--note "desc"] python ssh.py --remove ALIAS SQL (type: mariadb / mssql / postgresql): python ssh.py --sql ALIAS "SELECT * FROM users" # execute SQL query python ssh.py --sql-databases ALIAS # list databases python ssh.py --sql-tables ALIAS [database] # list tables Redis (type: redis): python ssh.py --redis ALIAS "GET mykey" # execute Redis command python ssh.py --redis-info ALIAS # Redis INFO python ssh.py --redis-keys ALIAS "user:*" # SCAN keys by pattern Grafana (type: grafana): python ssh.py --grafana-dashboards ALIAS # list dashboards python ssh.py --grafana-alerts ALIAS # list alerts Prometheus (type: prometheus): python ssh.py --prom-query ALIAS "up" # execute PromQL query python ssh.py --prom-targets ALIAS # list targets python ssh.py --prom-alerts ALIAS # list alerts WinRM (type: winrm): python ssh.py --ps ALIAS "Get-Process" # PowerShell via WinRM python ssh.py --cmd ALIAS "dir" # CMD via WinRM """ import sys import os import json import time import paramiko # Shared config — same file used by ServerManager GUI SHARED_DIR = os.path.expanduser("~/.server-connections") SETTINGS_FILE = os.path.join(SHARED_DIR, "settings.json") DEFAULT_SERVERS_FILE = os.path.join(SHARED_DIR, "servers.json") SSH_KEY_PATH = os.path.expanduser("~/.ssh/id_ed25519") SSH_CONFIG_PATH = os.path.expanduser("~/.ssh/config") # Encryption support — encryption.py is copied to SHARED_DIR by GUI setup if SHARED_DIR not in sys.path: sys.path.insert(0, SHARED_DIR) try: from encryption import decrypt, encrypt, is_encrypted HAS_ENCRYPTION = True except ImportError: HAS_ENCRYPTION = False def _get_servers_file() -> str: """Get servers file path from settings.json or use default.""" if os.path.exists(SETTINGS_FILE): try: with open(SETTINGS_FILE, "r", encoding="utf-8") as f: settings = json.load(f) path = settings.get("servers_path", "") if path and os.path.exists(path): return path except Exception: pass return DEFAULT_SERVERS_FILE # ── Data ────────────────────────────────────────────── def load_servers(): servers_file = _get_servers_file() with open(servers_file, "rb") as f: raw = f.read() if HAS_ENCRYPTION and is_encrypted(raw): text = decrypt(raw) data = json.loads(text) else: data = json.loads(raw.decode("utf-8")) return data, {s["alias"]: s for s in data.get("servers", [])} def save_servers(data): servers_file = _get_servers_file() text = json.dumps(data, indent=2, ensure_ascii=False) if HAS_ENCRYPTION: encrypted = encrypt(text) with open(servers_file, "wb") as f: f.write(encrypted) else: with open(servers_file, "w", encoding="utf-8") as f: f.write(text) # ── Connection ──────────────────────────────────────── def get_client(server: dict) -> paramiko.SSHClient: client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) kwargs = { "hostname": server["ip"], "port": server.get("port", 22), "username": server.get("user", "root"), "timeout": 15, "banner_timeout": 15, } # Try key first if os.path.exists(SSH_KEY_PATH): try: kwargs["key_filename"] = SSH_KEY_PATH client.connect(**kwargs) return client except Exception: del kwargs["key_filename"] client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # Fallback to password password = server.get("password", "") if password: kwargs["password"] = password kwargs["look_for_keys"] = False kwargs["allow_agent"] = False client.connect(**kwargs) return client raise Exception(f"No auth method for {server['alias']}") # ── Command execution ───────────────────────────────── def run_command(server: dict, command: str, use_sudo: bool = True) -> tuple: """Execute command. If user != root and use_sudo=True, auto-elevates via sudo. Password is fed through stdin (not visible in process list).""" client = get_client(server) try: user = server.get("user", "root") need_sudo = use_sudo and user != "root" if need_sudo: # Use sudo -S to read password from stdin # -p '' suppresses the password prompt text full_cmd = f"sudo -S -p '' bash -c {_shell_quote(command)}" else: full_cmd = command stdin, stdout, stderr = client.exec_command(full_cmd, timeout=120) if need_sudo: password = server.get("password", "") stdin.write(password + "\n") stdin.flush() exit_code = stdout.channel.recv_exit_status() out = stdout.read().decode("utf-8", errors="replace") err = stderr.read().decode("utf-8", errors="replace") # Strip sudo noise from stderr err_lines = [l for l in err.splitlines() if not l.startswith("[sudo]") and "password for" not in l.lower()] err = "\n".join(err_lines).strip() return out, err, exit_code finally: client.close() def _shell_quote(s: str) -> str: """Safely quote a string for bash -c.""" return "'" + s.replace("'", "'\\''") + "'" # ── File transfer ───────────────────────────────────── def _normalize_remote_path(remote_path: str) -> str: """Normalize remote path by detecting and fixing MSYS path conversions.""" # If the path looks like a Windows path that was converted by MSYS, fix it back if ':' in remote_path and ('Program Files/Git' in remote_path or (len(remote_path) > 3 and remote_path[1] == ':' and remote_path[2] == '/')): # Convert C:/Program Files/Git/tmp/file.txt back to /tmp/file.txt # Find the position where Git path starts if 'Program Files/Git' in remote_path: git_pos = remote_path.find('Program Files/Git') if git_pos != -1: # Extract the part after Program Files/Git actual_path = remote_path[git_pos + len('Program Files/Git'):] return actual_path # If it's just a drive letter followed by :, convert it too if len(remote_path) > 3 and remote_path[1] == ':' and remote_path[2] == '/': # This is a Windows-style path like C:/something # Try to determine if it's supposed to be a Unix path potential_unix_path = remote_path[3:] # Remove drive prefix like "C:" # If the resulting path starts with a common Unix directory, assume it should be Unix path common_unix_prefixes = ['/tmp/', '/home/', '/etc/', '/var/', '/usr/', '/opt/', '/root/', '/bin/', '/sbin/', '/lib/', '/lib64/'] for prefix in common_unix_prefixes: if potential_unix_path.startswith(prefix): return potential_unix_path return remote_path def _fmt_size(nbytes: int) -> str: """Format byte count for human display.""" if nbytes < 1024: return f"{nbytes} B" elif nbytes < 1024 * 1024: return f"{nbytes / 1024:.1f} KB" elif nbytes < 1024 * 1024 * 1024: return f"{nbytes / (1024 * 1024):.1f} MB" else: return f"{nbytes / (1024 * 1024 * 1024):.2f} GB" def _progress_cb(total_bytes: int): """Return a Paramiko-compatible progress callback. For files >= 1 MB, prints at 25%, 50%, 75% milestones. For files < 1 MB, stays silent.""" threshold = 1024 * 1024 # 1 MB reported = set() def callback(transferred: int, total: int): if total < threshold: return pct = int(transferred * 100 / total) for milestone in (25, 50, 75): if pct >= milestone and milestone not in reported: reported.add(milestone) print(f"{milestone}% ({_fmt_size(transferred)}/{_fmt_size(total)})") return callback def upload_file(server: dict, local_path: str, remote_path: str): normalized_remote_path = _normalize_remote_path(remote_path) file_size = os.path.getsize(local_path) client = get_client(server) try: sftp = client.open_sftp() t0 = time.time() sftp.put(local_path, normalized_remote_path, callback=_progress_cb(file_size)) elapsed = time.time() - t0 sftp.chmod(normalized_remote_path, 0o664) sftp.close() info = f"{_fmt_size(file_size)}, {elapsed:.1f}s" if file_size >= 1024 * 1024 and elapsed > 0: speed = file_size / elapsed info += f", {_fmt_size(int(speed))}/s" print(f"OK: {local_path} -> {server['alias']}:{normalized_remote_path} ({info})") finally: client.close() def download_file(server: dict, remote_path: str, local_path: str): normalized_remote_path = _normalize_remote_path(remote_path) client = get_client(server) try: sftp = client.open_sftp() file_size = sftp.stat(normalized_remote_path).st_size t0 = time.time() sftp.get(normalized_remote_path, local_path, callback=_progress_cb(file_size)) elapsed = time.time() - t0 sftp.close() info = f"{_fmt_size(file_size)}, {elapsed:.1f}s" if file_size >= 1024 * 1024 and elapsed > 0: speed = file_size / elapsed info += f", {_fmt_size(int(speed))}/s" print(f"OK: {server['alias']}:{normalized_remote_path} -> {local_path} ({info})") finally: client.close() def install_key(server: dict): pub_key_path = SSH_KEY_PATH + ".pub" if not os.path.exists(pub_key_path): print(f"ERROR: No public key at {pub_key_path}") sys.exit(1) with open(pub_key_path, "r") as f: pub_key = f.read().strip() check_cmd = f'grep -c "{pub_key}" ~/.ssh/authorized_keys 2>/dev/null || echo 0' out, _, _ = run_command(server, check_cmd, use_sudo=False) if out.strip() != "0": print(f"Key already installed on {server['alias']}") return command = ( f'mkdir -p ~/.ssh && chmod 700 ~/.ssh && ' f'echo "{pub_key}" >> ~/.ssh/authorized_keys && ' f'chmod 600 ~/.ssh/authorized_keys && ' f'echo "KEY_OK"' ) out, err, code = run_command(server, command, use_sudo=False) if "KEY_OK" in out: print(f"SSH key installed on {server['alias']}") else: print(f"ERROR: {err or out}") sys.exit(1) # ── Server management ───────────────────────────────── def ping_server(server: dict): try: client = get_client(server) client.close() print(f"{server['alias']}: ONLINE") except Exception as e: print(f"{server['alias']}: OFFLINE ({type(e).__name__})") def list_servers(full=False): _, servers = load_servers() if full: # WARNING: full mode shows sensitive data (IP, port, user) # Only for local/manual use, NEVER through AI API print("WARNING: Full mode — contains sensitive data. Do NOT pipe to AI.") print(f"{'Alias':<20} {'IP':<20} {'Port':<8} {'User':<10} {'Key':<6}") print("-" * 64) for alias, s in servers.items(): has_key = "yes" if os.path.exists(SSH_KEY_PATH) else "no" print(f"{alias:<20} {s['ip']:<20} {s.get('port', 22):<8} {s.get('user', 'root'):<10} {has_key:<6}") else: # Safe mode: only aliases (no IPs, ports, users) print(f"{'Alias':<20} {'Type':<10} {'Key':<6} {'Notes'}") print("-" * 70) for alias, s in servers.items(): has_key = "yes" if os.path.exists(SSH_KEY_PATH) else "no" stype = s.get("type", "ssh") notes = s.get("notes", "") print(f"{alias:<20} {stype:<10} {has_key:<6} {notes}") def _resolve_alias(alias: str, servers: dict) -> str: """Resolve alias — exact match, then whole-word search, then substring fallback.""" if alias in servers: return alias query = alias.lower() # 1) Whole-word match (e.g. "tor" matches "API TOR contabo" but NOT "investor") import re word_re = re.compile(r'\b' + re.escape(query) + r'\b', re.IGNORECASE) word_matches = [a for a in servers if word_re.search(a)] if len(word_matches) == 1: return word_matches[0] if len(word_matches) > 1: print(f"Ambiguous: '{alias}' matches multiple servers:") for m in word_matches: print(f" - {m}") sys.exit(1) # 2) Substring fallback (e.g. "cont" matches "contabo") sub_matches = [a for a in servers if query in a.lower()] if len(sub_matches) == 1: return sub_matches[0] if len(sub_matches) > 1: print(f"Ambiguous: '{alias}' matches multiple servers:") for m in sub_matches: print(f" - {m}") sys.exit(1) print(f"Unknown: '{alias}'. Available: {', '.join(servers.keys())}") sys.exit(1) def server_info(alias: str): """Show server info safe for AI context — NO ip, user, password, port, totp_secret.""" _, servers = load_servers() alias = _resolve_alias(alias, servers) s = servers[alias] has_key = "yes" if os.path.exists(SSH_KEY_PATH) else "no" print(f"Alias: {s['alias']}") print(f"Type: {s.get('type', 'ssh')}") print(f"Key: {has_key}") print(f"Auth: {s.get('auth', 'password')}") print(f"2FA: {'yes' if s.get('totp_secret') else 'no'}") notes = s.get("notes", "") if notes: print(f"Notes: {notes}") def check_status(): _, servers = load_servers() print(f"{'Alias':<20} {'Status':<10}") print("-" * 30) for alias, s in servers.items(): try: client = get_client(s) client.close() status = "ONLINE" except Exception: status = "OFFLINE" print(f"{alias:<20} {status:<10}") def add_server(args): if len(args) < 5: print("Usage: --add ALIAS IP PORT USER PASSWORD [--note \"desc\"]") sys.exit(1) alias, ip, port, user, password = args[0], args[1], int(args[2]), args[3], args[4] note = "" if "--note" in args: idx = args.index("--note") if idx + 1 < len(args): note = args[idx + 1] data, servers = load_servers() if alias in servers: print(f"ERROR: '{alias}' already exists") sys.exit(1) new_server = { "alias": alias, "ip": ip, "port": port, "user": user, "auth": "ssh-key", "password": password, "notes": note } data["servers"].append(new_server) save_servers(data) update_ssh_config(alias, ip, port, user) print(f"Added: {alias}") try: install_key(new_server) except Exception as e: print(f"Warning: key not installed ({e}). Run: ssh.py {alias} --install-key") def set_note(alias: str, note: str): """Update server notes — safe for AI (no credentials exposed).""" data, servers = load_servers() alias = _resolve_alias(alias, servers) for s in data["servers"]: if s["alias"] == alias: s["notes"] = note break save_servers(data) print(f"OK: notes updated for {alias}") def remove_server(alias: str): data, servers = load_servers() alias = _resolve_alias(alias, servers) data["servers"] = [s for s in data["servers"] if s["alias"] != alias] save_servers(data) remove_from_ssh_config(alias) print(f"Removed: {alias}") # ── SSH config ──────────────────────────────────────── def update_ssh_config(alias, ip, port, user): if not os.path.exists(SSH_CONFIG_PATH): return with open(SSH_CONFIG_PATH, "r") as f: content = f.read() if f"Host {alias}\n" in content: return with open(SSH_CONFIG_PATH, "a") as f: f.write(f"\nHost {alias}\n HostName {ip}\n User {user}\n Port {port}\n") def remove_from_ssh_config(alias): if not os.path.exists(SSH_CONFIG_PATH): return with open(SSH_CONFIG_PATH, "r") as f: lines = f.readlines() new_lines, skip = [], False for line in lines: if line.strip() == f"Host {alias}": skip = True continue if skip and line.startswith(" "): continue skip = False new_lines.append(line) with open(SSH_CONFIG_PATH, "w") as f: f.writelines(new_lines) # ── SQL commands ────────────────────────────────────── def _print_table(headers: list, rows: list): """Print a formatted ASCII table.""" if not rows: print("(no rows)") return widths = [len(str(h)) for h in headers] for row in rows: for i, val in enumerate(row): widths[i] = max(widths[i], len(str(val))) fmt = " ".join(f"{{:<{w}}}" for w in widths) print(fmt.format(*headers)) print(" ".join("-" * w for w in widths)) for row in rows: print(fmt.format(*[str(v) for v in row])) def run_sql(server: dict, query: str): """Execute SQL query against mariadb/mssql/postgresql server.""" stype = server.get("type", "mariadb") host = server["ip"] port = server.get("port", 3306) user = server.get("user", "root") password = server.get("password", "") database = server.get("database", "") if stype in ("mariadb", "mysql"): import pymysql conn = pymysql.connect(host=host, port=port, user=user, password=password, database=database or None, connect_timeout=15, charset="utf8mb4", cursorclass=pymysql.cursors.Cursor) elif stype == "mssql": import pymssql conn = pymssql.connect(server=host, port=port, user=user, password=password, database=database or None, login_timeout=15) elif stype == "postgresql": import psycopg2 port = server.get("port", 5432) conn = psycopg2.connect(host=host, port=port, user=user, password=password, dbname=database or None, connect_timeout=15) else: print(f"ERROR: Unsupported SQL type '{stype}'. Use mariadb, mssql, or postgresql.") sys.exit(1) try: cur = conn.cursor() cur.execute(query) if cur.description: headers = [desc[0] for desc in cur.description] rows = cur.fetchall() _print_table(headers, rows) print(f"\n({len(rows)} row{'s' if len(rows) != 1 else ''})") else: conn.commit() affected = cur.rowcount print(f"OK: {affected} row{'s' if affected != 1 else ''} affected") cur.close() finally: conn.close() def sql_databases(server: dict): """List databases on SQL server.""" stype = server.get("type", "mariadb") if stype in ("mariadb", "mysql"): run_sql(server, "SHOW DATABASES") elif stype == "mssql": run_sql(server, "SELECT name FROM sys.databases ORDER BY name") elif stype == "postgresql": run_sql(server, "SELECT datname AS database FROM pg_database WHERE datistemplate = false ORDER BY datname") else: print(f"ERROR: Unsupported SQL type '{stype}'.") sys.exit(1) def sql_tables(server: dict, database: str = None): """List tables on SQL server, optionally for a specific database.""" stype = server.get("type", "mariadb") if database: server = dict(server) server["database"] = database if stype in ("mariadb", "mysql"): if database: run_sql(server, f"SHOW TABLES FROM `{database}`") else: run_sql(server, "SHOW TABLES") elif stype == "mssql": run_sql(server, "SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE FROM INFORMATION_SCHEMA.TABLES ORDER BY TABLE_SCHEMA, TABLE_NAME") elif stype == "postgresql": run_sql(server, "SELECT schemaname, tablename FROM pg_tables WHERE schemaname NOT IN ('pg_catalog', 'information_schema') ORDER BY schemaname, tablename") else: print(f"ERROR: Unsupported SQL type '{stype}'.") sys.exit(1) # ── Redis commands ──────────────────────────────────── def run_redis_cmd(server: dict, command: str): """Execute a Redis command.""" import redis as redis_lib host = server["ip"] port = server.get("port", 6379) password = server.get("password", "") or None db_index = server.get("db_index", 0) ssl_enabled = server.get("ssl", False) r = redis_lib.Redis(host=host, port=port, password=password, db=db_index, decode_responses=True, socket_timeout=10, ssl=ssl_enabled) try: parts = command.split() if not parts: print("ERROR: Empty Redis command") sys.exit(1) result = r.execute_command(*parts) if isinstance(result, list): for i, item in enumerate(result): print(f"{i + 1}) {item}") print(f"\n({len(result)} items)") elif isinstance(result, dict): for k, v in result.items(): print(f"{k}: {v}") elif isinstance(result, bytes): print(result.decode("utf-8", errors="replace")) else: print(result) finally: r.close() def redis_info(server: dict): """Show Redis INFO.""" import redis as redis_lib host = server["ip"] port = server.get("port", 6379) password = server.get("password", "") or None db_index = server.get("db_index", 0) ssl_enabled = server.get("ssl", False) r = redis_lib.Redis(host=host, port=port, password=password, db=db_index, decode_responses=True, socket_timeout=10, ssl=ssl_enabled) try: info = r.info() # Print key sections sections = ["redis_version", "redis_mode", "os", "uptime_in_seconds", "connected_clients", "used_memory_human", "used_memory_peak_human", "total_connections_received", "total_commands_processed", "keyspace_hits", "keyspace_misses", "role"] print(f"{'Key':<35} {'Value'}") print("-" * 60) for key in sections: if key in info: print(f"{key:<35} {info[key]}") # Print keyspace info (db0, db1, etc.) for key in sorted(info.keys()): if key.startswith("db"): print(f"{key:<35} {info[key]}") finally: r.close() def redis_keys(server: dict, pattern: str): """SCAN keys matching a pattern.""" import redis as redis_lib host = server["ip"] port = server.get("port", 6379) password = server.get("password", "") or None db_index = server.get("db_index", 0) ssl_enabled = server.get("ssl", False) r = redis_lib.Redis(host=host, port=port, password=password, db=db_index, decode_responses=True, socket_timeout=10, ssl=ssl_enabled) try: keys = [] cursor = 0 while True: cursor, batch = r.scan(cursor=cursor, match=pattern, count=200) keys.extend(batch) if cursor == 0: break if len(keys) >= 1000: print("(truncated at 1000 keys)") break keys.sort() for k in keys: print(k) print(f"\n({len(keys)} key{'s' if len(keys) != 1 else ''})") finally: r.close() # ── Grafana commands ────────────────────────────────── def _grafana_request(server: dict, endpoint: str) -> dict: """Make an authenticated GET request to Grafana API.""" import requests host = server["ip"] port = server.get("port", 3000) protocol = "https" if server.get("ssl", False) else "http" base_url = server.get("base_url", f"{protocol}://{host}:{port}") api_key = server.get("api_key", server.get("password", "")) headers = {} if api_key: headers["Authorization"] = f"Bearer {api_key}" url = f"{base_url.rstrip('/')}/api/{endpoint.lstrip('/')}" resp = requests.get(url, headers=headers, timeout=15, verify=server.get("ssl_verify", True)) resp.raise_for_status() return resp.json() def grafana_dashboards(server: dict): """List Grafana dashboards.""" data = _grafana_request(server, "search?type=dash-db") if not data: print("(no dashboards found)") return headers = ["UID", "Title", "Folder", "URL"] rows = [] for d in data: rows.append([ d.get("uid", ""), d.get("title", ""), d.get("folderTitle", "(root)"), d.get("url", ""), ]) _print_table(headers, rows) print(f"\n({len(rows)} dashboard{'s' if len(rows) != 1 else ''})") def grafana_alerts(server: dict): """List Grafana alert rules.""" data = _grafana_request(server, "alertmanager/grafana/api/v2/alerts") if not data: print("(no alerts)") return headers = ["Status", "Name", "Severity", "Summary"] rows = [] for alert in data: status = alert.get("status", {}).get("state", "unknown") labels = alert.get("labels", {}) annotations = alert.get("annotations", {}) rows.append([ status, labels.get("alertname", ""), labels.get("severity", ""), annotations.get("summary", "")[:80], ]) _print_table(headers, rows) print(f"\n({len(rows)} alert{'s' if len(rows) != 1 else ''})") # ── Prometheus commands ─────────────────────────────── def _prom_request(server: dict, endpoint: str, params: dict = None) -> dict: """Make a GET request to Prometheus API.""" import requests host = server["ip"] port = server.get("port", 9090) protocol = "https" if server.get("ssl", False) else "http" base_url = server.get("base_url", f"{protocol}://{host}:{port}") auth = None user = server.get("user", "") password = server.get("password", "") if user and password: auth = (user, password) url = f"{base_url.rstrip('/')}/api/v1/{endpoint.lstrip('/')}" resp = requests.get(url, params=params, auth=auth, timeout=15, verify=server.get("ssl_verify", True)) resp.raise_for_status() return resp.json() def prom_query(server: dict, query: str): """Execute a PromQL instant query.""" data = _prom_request(server, "query", {"query": query}) status = data.get("status", "") if status != "success": print(f"ERROR: Prometheus returned status '{status}'") if "error" in data: print(f" {data['error']}") sys.exit(1) result = data.get("data", {}) result_type = result.get("resultType", "") results = result.get("result", []) if not results: print("(no results)") return if result_type == "vector": headers = ["Metric", "Value", "Timestamp"] rows = [] for r in results: metric = r.get("metric", {}) label_str = ", ".join(f'{k}="{v}"' for k, v in metric.items()) ts, val = r.get("value", [0, ""]) rows.append([label_str or "{}", val, ts]) _print_table(headers, rows) elif result_type == "scalar": ts, val = results print(f"Scalar: {val} (at {ts})") elif result_type == "string": ts, val = results print(f"String: {val} (at {ts})") elif result_type == "matrix": for series in results: metric = series.get("metric", {}) label_str = ", ".join(f'{k}="{v}"' for k, v in metric.items()) print(f"\n--- {label_str or '{}'} ---") values = series.get("values", []) for ts, val in values[-20:]: # last 20 samples print(f" [{ts}] {val}") if len(values) > 20: print(f" ... ({len(values)} total samples, showing last 20)") print(f"\n({len(results)} result{'s' if len(results) != 1 else ''}, type: {result_type})") def prom_targets(server: dict): """List Prometheus scrape targets.""" data = _prom_request(server, "targets") active = data.get("data", {}).get("activeTargets", []) if not active: print("(no active targets)") return headers = ["Job", "Instance", "State", "Health", "Last Scrape"] rows = [] for t in active: labels = t.get("labels", {}) rows.append([ labels.get("job", ""), labels.get("instance", ""), t.get("scrapePool", ""), t.get("health", ""), t.get("lastScrape", "")[:19], ]) _print_table(headers, rows) print(f"\n({len(rows)} target{'s' if len(rows) != 1 else ''})") def prom_alerts(server: dict): """List Prometheus alerts.""" data = _prom_request(server, "alerts") alerts = data.get("data", {}).get("alerts", []) if not alerts: print("(no alerts)") return headers = ["State", "Name", "Severity", "Active Since"] rows = [] for a in alerts: labels = a.get("labels", {}) rows.append([ a.get("state", ""), labels.get("alertname", ""), labels.get("severity", ""), a.get("activeAt", "")[:19], ]) _print_table(headers, rows) print(f"\n({len(rows)} alert{'s' if len(rows) != 1 else ''})") # ── WinRM commands ──────────────────────────────────── def _get_winrm_session(server: dict): """Create a WinRM session.""" import winrm host = server["ip"] port = server.get("port", 5985) user = server.get("user", "Administrator") password = server.get("password", "") protocol = "https" if server.get("ssl", False) or port == 5986 else "http" transport = server.get("transport", "ntlm") endpoint = f"{protocol}://{host}:{port}/wsman" session = winrm.Session(endpoint, auth=(user, password), transport=transport, server_cert_validation="ignore" if protocol == "https" else "validate") return session def run_winrm_ps(server: dict, command: str): """Execute PowerShell command via WinRM.""" session = _get_winrm_session(server) result = session.run_ps(command) out = result.std_out.decode("utf-8", errors="replace").strip() err = result.std_err.decode("utf-8", errors="replace").strip() if out: print(out) if err: print(err, file=sys.stderr) sys.exit(result.status_code) def run_winrm_cmd(server: dict, command: str): """Execute CMD command via WinRM.""" session = _get_winrm_session(server) result = session.run_cmd(command) out = result.std_out.decode("utf-8", errors="replace").strip() err = result.std_err.decode("utf-8", errors="replace").strip() if out: print(out) if err: print(err, file=sys.stderr) sys.exit(result.status_code) # ── Main ────────────────────────────────────────────── def main(): if len(sys.argv) < 2: print(__doc__) sys.exit(1) cmd = sys.argv[1] if cmd == "--list": list_servers(); sys.exit(0) if cmd == "--list-full": list_servers(full=True); sys.exit(0) if cmd == "--status": check_status(); sys.exit(0) if cmd == "--info" and len(sys.argv) >= 3: server_info(sys.argv[2]); sys.exit(0) if cmd == "--set-note" and len(sys.argv) >= 4: set_note(sys.argv[2], sys.argv[3]); sys.exit(0) if cmd == "--add": add_server(sys.argv[2:]); sys.exit(0) if cmd == "--remove" and len(sys.argv) >= 3: remove_server(sys.argv[2]); sys.exit(0) # ── SQL commands (global-style: --sql ALIAS ...) ── if cmd == "--sql" and len(sys.argv) >= 4: _, servers = load_servers() alias = _resolve_alias(sys.argv[2], servers) run_sql(servers[alias], sys.argv[3]) sys.exit(0) if cmd == "--sql-databases" and len(sys.argv) >= 3: _, servers = load_servers() alias = _resolve_alias(sys.argv[2], servers) sql_databases(servers[alias]) sys.exit(0) if cmd == "--sql-tables" and len(sys.argv) >= 3: _, servers = load_servers() alias = _resolve_alias(sys.argv[2], servers) db = sys.argv[3] if len(sys.argv) >= 4 else None sql_tables(servers[alias], db) sys.exit(0) # ── Redis commands ── if cmd == "--redis" and len(sys.argv) >= 4: _, servers = load_servers() alias = _resolve_alias(sys.argv[2], servers) run_redis_cmd(servers[alias], sys.argv[3]) sys.exit(0) if cmd == "--redis-info" and len(sys.argv) >= 3: _, servers = load_servers() alias = _resolve_alias(sys.argv[2], servers) redis_info(servers[alias]) sys.exit(0) if cmd == "--redis-keys" and len(sys.argv) >= 4: _, servers = load_servers() alias = _resolve_alias(sys.argv[2], servers) redis_keys(servers[alias], sys.argv[3]) sys.exit(0) # ── Grafana commands ── if cmd == "--grafana-dashboards" and len(sys.argv) >= 3: _, servers = load_servers() alias = _resolve_alias(sys.argv[2], servers) grafana_dashboards(servers[alias]) sys.exit(0) if cmd == "--grafana-alerts" and len(sys.argv) >= 3: _, servers = load_servers() alias = _resolve_alias(sys.argv[2], servers) grafana_alerts(servers[alias]) sys.exit(0) # ── Prometheus commands ── if cmd == "--prom-query" and len(sys.argv) >= 4: _, servers = load_servers() alias = _resolve_alias(sys.argv[2], servers) prom_query(servers[alias], sys.argv[3]) sys.exit(0) if cmd == "--prom-targets" and len(sys.argv) >= 3: _, servers = load_servers() alias = _resolve_alias(sys.argv[2], servers) prom_targets(servers[alias]) sys.exit(0) if cmd == "--prom-alerts" and len(sys.argv) >= 3: _, servers = load_servers() alias = _resolve_alias(sys.argv[2], servers) prom_alerts(servers[alias]) sys.exit(0) # ── WinRM commands ── if cmd == "--ps" and len(sys.argv) >= 4: _, servers = load_servers() alias = _resolve_alias(sys.argv[2], servers) run_winrm_ps(servers[alias], sys.argv[3]) if cmd == "--cmd" and len(sys.argv) >= 4: _, servers = load_servers() alias = _resolve_alias(sys.argv[2], servers) run_winrm_cmd(servers[alias], sys.argv[3]) # Server commands — exact match first, then fuzzy search by keyword alias = cmd _, servers = load_servers() alias = _resolve_alias(alias, servers) server = servers[alias] if len(sys.argv) < 3: print(f"Usage: ssh.py {alias} ") sys.exit(1) action = sys.argv[2] if action == "--install-key": install_key(server) elif action == "--ping": ping_server(server) elif action == "--upload" and len(sys.argv) >= 5: upload_file(server, sys.argv[3], sys.argv[4]) elif action == "--download" and len(sys.argv) >= 5: download_file(server, sys.argv[3], sys.argv[4]) elif action == "--no-sudo": command = " ".join(sys.argv[3:]) out, err, code = run_command(server, command, use_sudo=False) if out: print(out, end="") if err: print(err, end="", file=sys.stderr) sys.exit(code) else: command = " ".join(sys.argv[2:]) out, err, code = run_command(server, command) if out: print(out, end="") if err: print(err, end="", file=sys.stderr) sys.exit(code) if __name__ == "__main__": try: main() except SystemExit: raise except Exception as e: print(f"ERROR: {type(e).__name__}: {e}", file=sys.stderr) sys.exit(1)