Files
server-manager/tools/ssh.py
chrome-storm-c442 bf39fd7b67 v1.2.0 + v1.3.0: Localization, About dialog, TOTP/2FA, stability improvements
v1.2.0:
- GUI localization (EN/RU/ZH) with language switcher and persistent selection
- About dialog (ⓘ) with app info, features, quick start guide
- core/i18n.py — internationalization module with t() function
- All GUI components translated via t() keys

v1.3.0:
- TOTP/2FA tab — Google Authenticator compatible codes with live 30s countdown,
  one-click copy, per-server secret management
- core/totp.py — TOTP module (pyotp, RFC 6238)
- core/logger.py — rotating file logger (5MB, 3 backups)
- Stronger Fernet encryption key with automatic migration from old key
- Thread-safe server store with locks, atomic writes, auto-restore on corruption
- Parallel status checks via ThreadPoolExecutor (up to 10 concurrent)
- SSH client: explicit channel cleanup, Unix key permissions
- Server dialog: port validation (1-65535), TOTP secret field
- Language change preserves active tab and server selection
- pyotp dependency added

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 11:07:51 -05:00

383 lines
12 KiB
Python

#!/usr/bin/env python3
"""
SSH utility for Claude Code — connects to servers by alias.
Credentials stored locally in servers.json (encrypted), NEVER exposed to AI API.
Usage:
python ssh.py ALIAS "command" # run as configured user (auto-sudo if needed)
python ssh.py ALIAS --no-sudo "command" # run without sudo elevation
python ssh.py ALIAS --upload LOCAL REMOTE
python ssh.py ALIAS --download REMOTE LOCAL
python ssh.py ALIAS --install-key
python ssh.py ALIAS --ping
python ssh.py --list
python ssh.py --status
python ssh.py --add ALIAS IP PORT USER PASSWORD [--note "desc"]
python ssh.py --remove ALIAS
"""
import sys
import os
import json
import time
import paramiko
# Shared config — same file used by ServerManager GUI
SHARED_DIR = os.path.expanduser("~/.server-connections")
SETTINGS_FILE = os.path.join(SHARED_DIR, "settings.json")
DEFAULT_SERVERS_FILE = os.path.join(SHARED_DIR, "servers.json")
SSH_KEY_PATH = os.path.expanduser("~/.ssh/id_ed25519")
SSH_CONFIG_PATH = os.path.expanduser("~/.ssh/config")
# Encryption support — encryption.py is copied to SHARED_DIR by GUI setup
if SHARED_DIR not in sys.path:
sys.path.insert(0, SHARED_DIR)
try:
from encryption import decrypt, encrypt, is_encrypted
HAS_ENCRYPTION = True
except ImportError:
HAS_ENCRYPTION = False
def _get_servers_file() -> str:
"""Get servers file path from settings.json or use default."""
if os.path.exists(SETTINGS_FILE):
try:
with open(SETTINGS_FILE, "r", encoding="utf-8") as f:
settings = json.load(f)
path = settings.get("servers_path", "")
if path and os.path.exists(path):
return path
except Exception:
pass
return DEFAULT_SERVERS_FILE
# ── Data ──────────────────────────────────────────────
def load_servers():
servers_file = _get_servers_file()
with open(servers_file, "rb") as f:
raw = f.read()
if HAS_ENCRYPTION and is_encrypted(raw):
text = decrypt(raw)
data = json.loads(text)
else:
data = json.loads(raw.decode("utf-8"))
return data, {s["alias"]: s for s in data.get("servers", [])}
def save_servers(data):
servers_file = _get_servers_file()
text = json.dumps(data, indent=2, ensure_ascii=False)
if HAS_ENCRYPTION:
encrypted = encrypt(text)
with open(servers_file, "wb") as f:
f.write(encrypted)
else:
with open(servers_file, "w", encoding="utf-8") as f:
f.write(text)
# ── Connection ────────────────────────────────────────
def get_client(server: dict) -> paramiko.SSHClient:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
kwargs = {
"hostname": server["ip"],
"port": server.get("port", 22),
"username": server.get("user", "root"),
"timeout": 15,
"banner_timeout": 15,
}
# Try key first
if os.path.exists(SSH_KEY_PATH):
try:
kwargs["key_filename"] = SSH_KEY_PATH
client.connect(**kwargs)
return client
except Exception:
del kwargs["key_filename"]
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# Fallback to password
password = server.get("password", "")
if password:
kwargs["password"] = password
kwargs["look_for_keys"] = False
kwargs["allow_agent"] = False
client.connect(**kwargs)
return client
raise Exception(f"No auth method for {server['alias']}")
# ── Command execution ─────────────────────────────────
def run_command(server: dict, command: str, use_sudo: bool = True) -> tuple:
"""Execute command. If user != root and use_sudo=True, auto-elevates via sudo.
Password is fed through stdin (not visible in process list)."""
client = get_client(server)
try:
user = server.get("user", "root")
need_sudo = use_sudo and user != "root"
if need_sudo:
# Use sudo -S to read password from stdin
# -p '' suppresses the password prompt text
full_cmd = f"sudo -S -p '' bash -c {_shell_quote(command)}"
else:
full_cmd = command
stdin, stdout, stderr = client.exec_command(full_cmd, timeout=120)
if need_sudo:
password = server.get("password", "")
stdin.write(password + "\n")
stdin.flush()
exit_code = stdout.channel.recv_exit_status()
out = stdout.read().decode("utf-8", errors="replace")
err = stderr.read().decode("utf-8", errors="replace")
# Strip sudo noise from stderr
err_lines = [l for l in err.splitlines()
if not l.startswith("[sudo]") and "password for" not in l.lower()]
err = "\n".join(err_lines).strip()
return out, err, exit_code
finally:
client.close()
def _shell_quote(s: str) -> str:
"""Safely quote a string for bash -c."""
return "'" + s.replace("'", "'\\''") + "'"
# ── File transfer ─────────────────────────────────────
def upload_file(server: dict, local_path: str, remote_path: str):
client = get_client(server)
try:
sftp = client.open_sftp()
sftp.put(local_path, remote_path)
sftp.chmod(remote_path, 0o664)
sftp.close()
print(f"OK: {local_path} -> {server['alias']}:{remote_path}")
finally:
client.close()
def download_file(server: dict, remote_path: str, local_path: str):
client = get_client(server)
try:
sftp = client.open_sftp()
sftp.get(remote_path, local_path)
sftp.close()
print(f"OK: {server['alias']}:{remote_path} -> {local_path}")
finally:
client.close()
# ── Key management ────────────────────────────────────
def install_key(server: dict):
pub_key_path = SSH_KEY_PATH + ".pub"
if not os.path.exists(pub_key_path):
print(f"ERROR: No public key at {pub_key_path}")
sys.exit(1)
with open(pub_key_path, "r") as f:
pub_key = f.read().strip()
check_cmd = f'grep -c "{pub_key}" ~/.ssh/authorized_keys 2>/dev/null || echo 0'
out, _, _ = run_command(server, check_cmd, use_sudo=False)
if out.strip() != "0":
print(f"Key already installed on {server['alias']}")
return
command = (
f'mkdir -p ~/.ssh && chmod 700 ~/.ssh && '
f'echo "{pub_key}" >> ~/.ssh/authorized_keys && '
f'chmod 600 ~/.ssh/authorized_keys && '
f'echo "KEY_OK"'
)
out, err, code = run_command(server, command, use_sudo=False)
if "KEY_OK" in out:
print(f"SSH key installed on {server['alias']} ({server['ip']})")
else:
print(f"ERROR: {err or out}")
sys.exit(1)
# ── Server management ─────────────────────────────────
def ping_server(server: dict):
try:
client = get_client(server)
client.close()
print(f"{server['alias']}: ONLINE")
except Exception as e:
print(f"{server['alias']}: OFFLINE ({type(e).__name__})")
def list_servers():
_, servers = load_servers()
print(f"{'Alias':<20} {'IP':<20} {'Port':<8} {'User':<10} {'Key':<6}")
print("-" * 64)
for alias, s in servers.items():
has_key = "yes" if os.path.exists(SSH_KEY_PATH) else "no"
print(f"{alias:<20} {s['ip']:<20} {s.get('port', 22):<8} {s.get('user', 'root'):<10} {has_key:<6}")
def check_status():
_, servers = load_servers()
print(f"{'Alias':<20} {'IP':<20} {'Status':<10}")
print("-" * 50)
for alias, s in servers.items():
try:
client = get_client(s)
client.close()
status = "ONLINE"
except Exception:
status = "OFFLINE"
print(f"{alias:<20} {s['ip']:<20} {status:<10}")
def add_server(args):
if len(args) < 5:
print("Usage: --add ALIAS IP PORT USER PASSWORD [--note \"desc\"]")
sys.exit(1)
alias, ip, port, user, password = args[0], args[1], int(args[2]), args[3], args[4]
note = ""
if "--note" in args:
idx = args.index("--note")
if idx + 1 < len(args):
note = args[idx + 1]
data, servers = load_servers()
if alias in servers:
print(f"ERROR: '{alias}' already exists")
sys.exit(1)
new_server = {
"alias": alias, "ip": ip, "port": port,
"user": user, "auth": "ssh-key", "password": password,
"notes": note
}
data["servers"].append(new_server)
save_servers(data)
update_ssh_config(alias, ip, port, user)
print(f"Added: {alias} ({user}@{ip}:{port})")
try:
install_key(new_server)
except Exception as e:
print(f"Warning: key not installed ({e}). Run: ssh.py {alias} --install-key")
def remove_server(alias: str):
data, servers = load_servers()
if alias not in servers:
print(f"ERROR: Unknown '{alias}'")
sys.exit(1)
data["servers"] = [s for s in data["servers"] if s["alias"] != alias]
save_servers(data)
remove_from_ssh_config(alias)
print(f"Removed: {alias}")
# ── SSH config ────────────────────────────────────────
def update_ssh_config(alias, ip, port, user):
if not os.path.exists(SSH_CONFIG_PATH):
return
with open(SSH_CONFIG_PATH, "r") as f:
content = f.read()
if f"Host {alias}\n" in content:
return
with open(SSH_CONFIG_PATH, "a") as f:
f.write(f"\nHost {alias}\n HostName {ip}\n User {user}\n Port {port}\n")
def remove_from_ssh_config(alias):
if not os.path.exists(SSH_CONFIG_PATH):
return
with open(SSH_CONFIG_PATH, "r") as f:
lines = f.readlines()
new_lines, skip = [], False
for line in lines:
if line.strip() == f"Host {alias}":
skip = True
continue
if skip and line.startswith(" "):
continue
skip = False
new_lines.append(line)
with open(SSH_CONFIG_PATH, "w") as f:
f.writelines(new_lines)
# ── Main ──────────────────────────────────────────────
def main():
if len(sys.argv) < 2:
print(__doc__)
sys.exit(1)
cmd = sys.argv[1]
if cmd == "--list":
list_servers(); sys.exit(0)
if cmd == "--status":
check_status(); sys.exit(0)
if cmd == "--add":
add_server(sys.argv[2:]); sys.exit(0)
if cmd == "--remove" and len(sys.argv) >= 3:
remove_server(sys.argv[2]); sys.exit(0)
# Server commands
alias = cmd
_, servers = load_servers()
if alias not in servers:
print(f"Unknown: {alias}. Available: {', '.join(servers.keys())}")
sys.exit(1)
server = servers[alias]
if len(sys.argv) < 3:
print(f"Usage: ssh.py {alias} <command>")
sys.exit(1)
action = sys.argv[2]
if action == "--install-key":
install_key(server)
elif action == "--ping":
ping_server(server)
elif action == "--upload" and len(sys.argv) >= 5:
upload_file(server, sys.argv[3], sys.argv[4])
elif action == "--download" and len(sys.argv) >= 5:
download_file(server, sys.argv[3], sys.argv[4])
elif action == "--no-sudo":
command = " ".join(sys.argv[3:])
out, err, code = run_command(server, command, use_sudo=False)
if out: print(out, end="")
if err: print(err, end="", file=sys.stderr)
sys.exit(code)
else:
command = " ".join(sys.argv[2:])
out, err, code = run_command(server, command)
if out: print(out, end="")
if err: print(err, end="", file=sys.stderr)
sys.exit(code)
if __name__ == "__main__":
main()