#!/usr/bin/env python3 """ SSH utility for Claude Code — connects to servers by alias. Credentials stored locally in servers.json (encrypted), NEVER exposed to AI API. Usage: python ssh.py ALIAS "command" # run as configured user (auto-sudo if needed) python ssh.py ALIAS --no-sudo "command" # run without sudo elevation python ssh.py ALIAS --upload LOCAL REMOTE python ssh.py ALIAS --download REMOTE LOCAL python ssh.py ALIAS --install-key python ssh.py ALIAS --ping python ssh.py --list python ssh.py --status python ssh.py --info ALIAS # full info (no passwords) python ssh.py --set-note ALIAS "desc" # update server notes python ssh.py --add ALIAS IP PORT USER PASSWORD [--note "desc"] python ssh.py --remove ALIAS """ import sys import os import json import time import paramiko # Shared config — same file used by ServerManager GUI SHARED_DIR = os.path.expanduser("~/.server-connections") SETTINGS_FILE = os.path.join(SHARED_DIR, "settings.json") DEFAULT_SERVERS_FILE = os.path.join(SHARED_DIR, "servers.json") SSH_KEY_PATH = os.path.expanduser("~/.ssh/id_ed25519") SSH_CONFIG_PATH = os.path.expanduser("~/.ssh/config") # Encryption support — encryption.py is copied to SHARED_DIR by GUI setup if SHARED_DIR not in sys.path: sys.path.insert(0, SHARED_DIR) try: from encryption import decrypt, encrypt, is_encrypted HAS_ENCRYPTION = True except ImportError: HAS_ENCRYPTION = False def _get_servers_file() -> str: """Get servers file path from settings.json or use default.""" if os.path.exists(SETTINGS_FILE): try: with open(SETTINGS_FILE, "r", encoding="utf-8") as f: settings = json.load(f) path = settings.get("servers_path", "") if path and os.path.exists(path): return path except Exception: pass return DEFAULT_SERVERS_FILE # ── Data ────────────────────────────────────────────── def load_servers(): servers_file = _get_servers_file() with open(servers_file, "rb") as f: raw = f.read() if HAS_ENCRYPTION and is_encrypted(raw): text = decrypt(raw) data = json.loads(text) else: data = json.loads(raw.decode("utf-8")) return data, {s["alias"]: s for s in data.get("servers", [])} def save_servers(data): servers_file = _get_servers_file() text = json.dumps(data, indent=2, ensure_ascii=False) if HAS_ENCRYPTION: encrypted = encrypt(text) with open(servers_file, "wb") as f: f.write(encrypted) else: with open(servers_file, "w", encoding="utf-8") as f: f.write(text) # ── Connection ──────────────────────────────────────── def get_client(server: dict) -> paramiko.SSHClient: client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) kwargs = { "hostname": server["ip"], "port": server.get("port", 22), "username": server.get("user", "root"), "timeout": 15, "banner_timeout": 15, } # Try key first if os.path.exists(SSH_KEY_PATH): try: kwargs["key_filename"] = SSH_KEY_PATH client.connect(**kwargs) return client except Exception: del kwargs["key_filename"] client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # Fallback to password password = server.get("password", "") if password: kwargs["password"] = password kwargs["look_for_keys"] = False kwargs["allow_agent"] = False client.connect(**kwargs) return client raise Exception(f"No auth method for {server['alias']}") # ── Command execution ───────────────────────────────── def run_command(server: dict, command: str, use_sudo: bool = True) -> tuple: """Execute command. If user != root and use_sudo=True, auto-elevates via sudo. Password is fed through stdin (not visible in process list).""" client = get_client(server) try: user = server.get("user", "root") need_sudo = use_sudo and user != "root" if need_sudo: # Use sudo -S to read password from stdin # -p '' suppresses the password prompt text full_cmd = f"sudo -S -p '' bash -c {_shell_quote(command)}" else: full_cmd = command stdin, stdout, stderr = client.exec_command(full_cmd, timeout=120) if need_sudo: password = server.get("password", "") stdin.write(password + "\n") stdin.flush() exit_code = stdout.channel.recv_exit_status() out = stdout.read().decode("utf-8", errors="replace") err = stderr.read().decode("utf-8", errors="replace") # Strip sudo noise from stderr err_lines = [l for l in err.splitlines() if not l.startswith("[sudo]") and "password for" not in l.lower()] err = "\n".join(err_lines).strip() return out, err, exit_code finally: client.close() def _shell_quote(s: str) -> str: """Safely quote a string for bash -c.""" return "'" + s.replace("'", "'\\''") + "'" # ── File transfer ───────────────────────────────────── def _normalize_remote_path(remote_path: str) -> str: """Normalize remote path by detecting and fixing MSYS path conversions.""" # If the path looks like a Windows path that was converted by MSYS, fix it back if ':' in remote_path and ('Program Files/Git' in remote_path or (len(remote_path) > 3 and remote_path[1] == ':' and remote_path[2] == '/')): # Convert C:/Program Files/Git/tmp/file.txt back to /tmp/file.txt # Find the position where Git path starts if 'Program Files/Git' in remote_path: git_pos = remote_path.find('Program Files/Git') if git_pos != -1: # Extract the part after Program Files/Git actual_path = remote_path[git_pos + len('Program Files/Git'):] return actual_path # If it's just a drive letter followed by :, convert it too if len(remote_path) > 3 and remote_path[1] == ':' and remote_path[2] == '/': # This is a Windows-style path like C:/something # Try to determine if it's supposed to be a Unix path potential_unix_path = remote_path[3:] # Remove drive prefix like "C:" # If the resulting path starts with a common Unix directory, assume it should be Unix path common_unix_prefixes = ['/tmp/', '/home/', '/etc/', '/var/', '/usr/', '/opt/', '/root/', '/bin/', '/sbin/', '/lib/', '/lib64/'] for prefix in common_unix_prefixes: if potential_unix_path.startswith(prefix): return potential_unix_path return remote_path def upload_file(server: dict, local_path: str, remote_path: str): # Normalize the remote path to handle MSYS conversion issues normalized_remote_path = _normalize_remote_path(remote_path) client = get_client(server) try: sftp = client.open_sftp() sftp.put(local_path, normalized_remote_path) sftp.chmod(normalized_remote_path, 0o664) sftp.close() print(f"OK: {local_path} -> {server['alias']}:{normalized_remote_path}") finally: client.close() def download_file(server: dict, remote_path: str, local_path: str): # Normalize the remote path to handle MSYS conversion issues normalized_remote_path = _normalize_remote_path(remote_path) client = get_client(server) try: sftp = client.open_sftp() sftp.get(normalized_remote_path, local_path) sftp.close() print(f"OK: {server['alias']}:{normalized_remote_path} -> {local_path}") finally: client.close() def install_key(server: dict): pub_key_path = SSH_KEY_PATH + ".pub" if not os.path.exists(pub_key_path): print(f"ERROR: No public key at {pub_key_path}") sys.exit(1) with open(pub_key_path, "r") as f: pub_key = f.read().strip() check_cmd = f'grep -c "{pub_key}" ~/.ssh/authorized_keys 2>/dev/null || echo 0' out, _, _ = run_command(server, check_cmd, use_sudo=False) if out.strip() != "0": print(f"Key already installed on {server['alias']}") return command = ( f'mkdir -p ~/.ssh && chmod 700 ~/.ssh && ' f'echo "{pub_key}" >> ~/.ssh/authorized_keys && ' f'chmod 600 ~/.ssh/authorized_keys && ' f'echo "KEY_OK"' ) out, err, code = run_command(server, command, use_sudo=False) if "KEY_OK" in out: print(f"SSH key installed on {server['alias']}") else: print(f"ERROR: {err or out}") sys.exit(1) # ── Server management ───────────────────────────────── def ping_server(server: dict): try: client = get_client(server) client.close() print(f"{server['alias']}: ONLINE") except Exception as e: print(f"{server['alias']}: OFFLINE ({type(e).__name__})") def list_servers(full=False): _, servers = load_servers() if full: # WARNING: full mode shows sensitive data (IP, port, user) # Only for local/manual use, NEVER through AI API print("WARNING: Full mode — contains sensitive data. Do NOT pipe to AI.") print(f"{'Alias':<20} {'IP':<20} {'Port':<8} {'User':<10} {'Key':<6}") print("-" * 64) for alias, s in servers.items(): has_key = "yes" if os.path.exists(SSH_KEY_PATH) else "no" print(f"{alias:<20} {s['ip']:<20} {s.get('port', 22):<8} {s.get('user', 'root'):<10} {has_key:<6}") else: # Safe mode: only aliases (no IPs, ports, users) print(f"{'Alias':<20} {'Type':<10} {'Key':<6} {'Notes'}") print("-" * 70) for alias, s in servers.items(): has_key = "yes" if os.path.exists(SSH_KEY_PATH) else "no" stype = s.get("type", "ssh") notes = s.get("notes", "") print(f"{alias:<20} {stype:<10} {has_key:<6} {notes}") def _resolve_alias(alias: str, servers: dict) -> str: """Resolve alias — exact match, then whole-word search, then substring fallback.""" if alias in servers: return alias query = alias.lower() # 1) Whole-word match (e.g. "tor" matches "API TOR contabo" but NOT "investor") import re word_re = re.compile(r'\b' + re.escape(query) + r'\b', re.IGNORECASE) word_matches = [a for a in servers if word_re.search(a)] if len(word_matches) == 1: return word_matches[0] if len(word_matches) > 1: print(f"Ambiguous: '{alias}' matches multiple servers:") for m in word_matches: print(f" - {m}") sys.exit(1) # 2) Substring fallback (e.g. "cont" matches "contabo") sub_matches = [a for a in servers if query in a.lower()] if len(sub_matches) == 1: return sub_matches[0] if len(sub_matches) > 1: print(f"Ambiguous: '{alias}' matches multiple servers:") for m in sub_matches: print(f" - {m}") sys.exit(1) print(f"Unknown: '{alias}'. Available: {', '.join(servers.keys())}") sys.exit(1) def server_info(alias: str): """Show server info safe for AI context — NO ip, user, password, port, totp_secret.""" _, servers = load_servers() alias = _resolve_alias(alias, servers) s = servers[alias] has_key = "yes" if os.path.exists(SSH_KEY_PATH) else "no" print(f"Alias: {s['alias']}") print(f"Type: {s.get('type', 'ssh')}") print(f"Key: {has_key}") print(f"Auth: {s.get('auth', 'password')}") print(f"2FA: {'yes' if s.get('totp_secret') else 'no'}") notes = s.get("notes", "") if notes: print(f"Notes: {notes}") def check_status(): _, servers = load_servers() print(f"{'Alias':<20} {'Status':<10}") print("-" * 30) for alias, s in servers.items(): try: client = get_client(s) client.close() status = "ONLINE" except Exception: status = "OFFLINE" print(f"{alias:<20} {status:<10}") def add_server(args): if len(args) < 5: print("Usage: --add ALIAS IP PORT USER PASSWORD [--note \"desc\"]") sys.exit(1) alias, ip, port, user, password = args[0], args[1], int(args[2]), args[3], args[4] note = "" if "--note" in args: idx = args.index("--note") if idx + 1 < len(args): note = args[idx + 1] data, servers = load_servers() if alias in servers: print(f"ERROR: '{alias}' already exists") sys.exit(1) new_server = { "alias": alias, "ip": ip, "port": port, "user": user, "auth": "ssh-key", "password": password, "notes": note } data["servers"].append(new_server) save_servers(data) update_ssh_config(alias, ip, port, user) print(f"Added: {alias}") try: install_key(new_server) except Exception as e: print(f"Warning: key not installed ({e}). Run: ssh.py {alias} --install-key") def set_note(alias: str, note: str): """Update server notes — safe for AI (no credentials exposed).""" data, servers = load_servers() alias = _resolve_alias(alias, servers) for s in data["servers"]: if s["alias"] == alias: s["notes"] = note break save_servers(data) print(f"OK: notes updated for {alias}") def remove_server(alias: str): data, servers = load_servers() alias = _resolve_alias(alias, servers) data["servers"] = [s for s in data["servers"] if s["alias"] != alias] save_servers(data) remove_from_ssh_config(alias) print(f"Removed: {alias}") # ── SSH config ──────────────────────────────────────── def update_ssh_config(alias, ip, port, user): if not os.path.exists(SSH_CONFIG_PATH): return with open(SSH_CONFIG_PATH, "r") as f: content = f.read() if f"Host {alias}\n" in content: return with open(SSH_CONFIG_PATH, "a") as f: f.write(f"\nHost {alias}\n HostName {ip}\n User {user}\n Port {port}\n") def remove_from_ssh_config(alias): if not os.path.exists(SSH_CONFIG_PATH): return with open(SSH_CONFIG_PATH, "r") as f: lines = f.readlines() new_lines, skip = [], False for line in lines: if line.strip() == f"Host {alias}": skip = True continue if skip and line.startswith(" "): continue skip = False new_lines.append(line) with open(SSH_CONFIG_PATH, "w") as f: f.writelines(new_lines) # ── Main ────────────────────────────────────────────── def main(): if len(sys.argv) < 2: print(__doc__) sys.exit(1) cmd = sys.argv[1] if cmd == "--list": list_servers(); sys.exit(0) if cmd == "--list-full": list_servers(full=True); sys.exit(0) if cmd == "--status": check_status(); sys.exit(0) if cmd == "--info" and len(sys.argv) >= 3: server_info(sys.argv[2]); sys.exit(0) if cmd == "--set-note" and len(sys.argv) >= 4: set_note(sys.argv[2], sys.argv[3]); sys.exit(0) if cmd == "--add": add_server(sys.argv[2:]); sys.exit(0) if cmd == "--remove" and len(sys.argv) >= 3: remove_server(sys.argv[2]); sys.exit(0) # Server commands — exact match first, then fuzzy search by keyword alias = cmd _, servers = load_servers() alias = _resolve_alias(alias, servers) server = servers[alias] if len(sys.argv) < 3: print(f"Usage: ssh.py {alias} ") sys.exit(1) action = sys.argv[2] if action == "--install-key": install_key(server) elif action == "--ping": ping_server(server) elif action == "--upload" and len(sys.argv) >= 5: upload_file(server, sys.argv[3], sys.argv[4]) elif action == "--download" and len(sys.argv) >= 5: download_file(server, sys.argv[3], sys.argv[4]) elif action == "--no-sudo": command = " ".join(sys.argv[3:]) out, err, code = run_command(server, command, use_sudo=False) if out: print(out, end="") if err: print(err, end="", file=sys.stderr) sys.exit(code) else: command = " ".join(sys.argv[2:]) out, err, code = run_command(server, command) if out: print(out, end="") if err: print(err, end="", file=sys.stderr) sys.exit(code) if __name__ == "__main__": try: main() except SystemExit: raise except Exception as e: print(f"ERROR: {type(e).__name__}: {e}", file=sys.stderr) sys.exit(1)