CustomTkinter desktop app for managing remote servers. Features: SSH terminal, SFTP file transfer, key management, background status monitoring, server CRUD with dark theme. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
202 lines
6.7 KiB
Python
202 lines
6.7 KiB
Python
"""
|
|
SSH client wrapper — refactored from ssh.py.
|
|
Handles connect, exec, sftp, key management via paramiko.
|
|
"""
|
|
|
|
import os
|
|
import paramiko
|
|
|
|
|
|
class SSHClientWrapper:
|
|
def __init__(self, server: dict, key_path: str = ""):
|
|
self.server = server
|
|
self.key_path = key_path or os.path.expanduser("~/.ssh/id_ed25519")
|
|
self._client: paramiko.SSHClient | None = None
|
|
|
|
def connect(self) -> paramiko.SSHClient:
|
|
client = paramiko.SSHClient()
|
|
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
|
|
|
kwargs = {
|
|
"hostname": self.server["ip"],
|
|
"port": self.server.get("port", 22),
|
|
"username": self.server.get("user", "root"),
|
|
"timeout": 15,
|
|
"banner_timeout": 15,
|
|
}
|
|
|
|
# Try key first
|
|
if os.path.exists(self.key_path):
|
|
try:
|
|
kwargs["key_filename"] = self.key_path
|
|
client.connect(**kwargs)
|
|
self._client = client
|
|
return client
|
|
except Exception:
|
|
del kwargs["key_filename"]
|
|
client = paramiko.SSHClient()
|
|
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
|
|
|
# Fallback to password
|
|
password = self.server.get("password", "")
|
|
if password:
|
|
kwargs["password"] = password
|
|
kwargs["look_for_keys"] = False
|
|
kwargs["allow_agent"] = False
|
|
client.connect(**kwargs)
|
|
self._client = client
|
|
return client
|
|
|
|
raise Exception(f"No auth method for {self.server.get('alias', 'unknown')}")
|
|
|
|
def disconnect(self):
|
|
if self._client:
|
|
try:
|
|
self._client.close()
|
|
except Exception:
|
|
pass
|
|
self._client = None
|
|
|
|
def exec_command(self, command: str, use_sudo: bool = True) -> tuple[str, str, int]:
|
|
"""Execute command. Auto-sudo if user != root and use_sudo=True."""
|
|
client = self.connect()
|
|
try:
|
|
user = self.server.get("user", "root")
|
|
need_sudo = use_sudo and user != "root"
|
|
|
|
if need_sudo:
|
|
full_cmd = f"sudo -S -p '' bash -c {_shell_quote(command)}"
|
|
else:
|
|
full_cmd = command
|
|
|
|
stdin, stdout, stderr = client.exec_command(full_cmd, timeout=120)
|
|
|
|
if need_sudo:
|
|
password = self.server.get("password", "")
|
|
stdin.write(password + "\n")
|
|
stdin.flush()
|
|
|
|
exit_code = stdout.channel.recv_exit_status()
|
|
out = stdout.read().decode("utf-8", errors="replace")
|
|
err = stderr.read().decode("utf-8", errors="replace")
|
|
|
|
# Strip sudo noise
|
|
err_lines = [l for l in err.splitlines()
|
|
if not l.startswith("[sudo]") and "password for" not in l.lower()]
|
|
err = "\n".join(err_lines).strip()
|
|
|
|
return out, err, exit_code
|
|
finally:
|
|
client.close()
|
|
|
|
def upload(self, local_path: str, remote_path: str, progress_cb=None):
|
|
client = self.connect()
|
|
try:
|
|
sftp = client.open_sftp()
|
|
if progress_cb:
|
|
sftp.put(local_path, remote_path, callback=progress_cb)
|
|
else:
|
|
sftp.put(local_path, remote_path)
|
|
sftp.chmod(remote_path, 0o664)
|
|
sftp.close()
|
|
finally:
|
|
client.close()
|
|
|
|
def download(self, remote_path: str, local_path: str, progress_cb=None):
|
|
client = self.connect()
|
|
try:
|
|
sftp = client.open_sftp()
|
|
if progress_cb:
|
|
sftp.get(remote_path, local_path, callback=progress_cb)
|
|
else:
|
|
sftp.get(remote_path, local_path)
|
|
sftp.close()
|
|
finally:
|
|
client.close()
|
|
|
|
def check_connection(self) -> bool:
|
|
"""Quick connection test."""
|
|
try:
|
|
client = paramiko.SSHClient()
|
|
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
|
|
|
kwargs = {
|
|
"hostname": self.server["ip"],
|
|
"port": self.server.get("port", 22),
|
|
"username": self.server.get("user", "root"),
|
|
"timeout": 5,
|
|
"banner_timeout": 5,
|
|
}
|
|
|
|
if os.path.exists(self.key_path):
|
|
try:
|
|
kwargs["key_filename"] = self.key_path
|
|
client.connect(**kwargs)
|
|
client.close()
|
|
return True
|
|
except Exception:
|
|
del kwargs["key_filename"]
|
|
client = paramiko.SSHClient()
|
|
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
|
|
|
password = self.server.get("password", "")
|
|
if password:
|
|
kwargs["password"] = password
|
|
kwargs["look_for_keys"] = False
|
|
kwargs["allow_agent"] = False
|
|
client.connect(**kwargs)
|
|
client.close()
|
|
return True
|
|
|
|
return False
|
|
except Exception:
|
|
return False
|
|
|
|
def install_key(self) -> str:
|
|
"""Install SSH public key on server. Returns status message."""
|
|
pub_key_path = self.key_path + ".pub"
|
|
if not os.path.exists(pub_key_path):
|
|
raise FileNotFoundError(f"No public key at {pub_key_path}")
|
|
|
|
with open(pub_key_path, "r") as f:
|
|
pub_key = f.read().strip()
|
|
|
|
# Check if already installed
|
|
out, _, _ = self.exec_command(
|
|
f'grep -c "{pub_key}" ~/.ssh/authorized_keys 2>/dev/null || echo 0',
|
|
use_sudo=False
|
|
)
|
|
if out.strip() != "0":
|
|
return "Key already installed"
|
|
|
|
# Install
|
|
command = (
|
|
f'mkdir -p ~/.ssh && chmod 700 ~/.ssh && '
|
|
f'echo "{pub_key}" >> ~/.ssh/authorized_keys && '
|
|
f'chmod 600 ~/.ssh/authorized_keys && '
|
|
f'echo "KEY_OK"'
|
|
)
|
|
out, err, code = self.exec_command(command, use_sudo=False)
|
|
if "KEY_OK" in out:
|
|
return "Key installed successfully"
|
|
raise Exception(f"Key install failed: {err or out}")
|
|
|
|
def generate_key(self) -> str:
|
|
"""Generate ed25519 SSH key pair if not exists."""
|
|
if os.path.exists(self.key_path):
|
|
return f"Key already exists: {self.key_path}"
|
|
|
|
key = paramiko.Ed25519Key.generate()
|
|
key.write_private_key_file(self.key_path)
|
|
|
|
# Write public key
|
|
pub_key = f"ssh-ed25519 {key.get_base64()} server-manager"
|
|
with open(self.key_path + ".pub", "w") as f:
|
|
f.write(pub_key + "\n")
|
|
|
|
return f"Key generated: {self.key_path}"
|
|
|
|
|
|
def _shell_quote(s: str) -> str:
|
|
return "'" + s.replace("'", "'\\''") + "'"
|